server_test.go 44 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654
  1. // Copyright 2015 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package etcdserver
  15. import (
  16. "context"
  17. "encoding/json"
  18. "fmt"
  19. "io/ioutil"
  20. "os"
  21. "path"
  22. "path/filepath"
  23. "reflect"
  24. "testing"
  25. "time"
  26. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  27. "github.com/coreos/etcd/etcdserver/membership"
  28. "github.com/coreos/etcd/lease"
  29. "github.com/coreos/etcd/mvcc"
  30. "github.com/coreos/etcd/mvcc/backend"
  31. "github.com/coreos/etcd/pkg/fileutil"
  32. "github.com/coreos/etcd/pkg/idutil"
  33. "github.com/coreos/etcd/pkg/mock/mockstorage"
  34. "github.com/coreos/etcd/pkg/mock/mockstore"
  35. "github.com/coreos/etcd/pkg/mock/mockwait"
  36. "github.com/coreos/etcd/pkg/pbutil"
  37. "github.com/coreos/etcd/pkg/testutil"
  38. "github.com/coreos/etcd/pkg/types"
  39. "github.com/coreos/etcd/pkg/wait"
  40. "github.com/coreos/etcd/raft"
  41. "github.com/coreos/etcd/raft/raftpb"
  42. "github.com/coreos/etcd/rafthttp"
  43. "github.com/coreos/etcd/snap"
  44. "github.com/coreos/etcd/store"
  45. )
  46. // TestDoLocalAction tests requests which do not need to go through raft to be applied,
  47. // and are served through local data.
  48. func TestDoLocalAction(t *testing.T) {
  49. tests := []struct {
  50. req pb.Request
  51. wresp Response
  52. werr error
  53. wactions []testutil.Action
  54. }{
  55. {
  56. pb.Request{Method: "GET", ID: 1, Wait: true},
  57. Response{Watcher: store.NewNopWatcher()}, nil, []testutil.Action{{Name: "Watch"}},
  58. },
  59. {
  60. pb.Request{Method: "GET", ID: 1},
  61. Response{Event: &store.Event{}}, nil,
  62. []testutil.Action{
  63. {
  64. Name: "Get",
  65. Params: []interface{}{"", false, false},
  66. },
  67. },
  68. },
  69. {
  70. pb.Request{Method: "HEAD", ID: 1},
  71. Response{Event: &store.Event{}}, nil,
  72. []testutil.Action{
  73. {
  74. Name: "Get",
  75. Params: []interface{}{"", false, false},
  76. },
  77. },
  78. },
  79. {
  80. pb.Request{Method: "BADMETHOD", ID: 1},
  81. Response{}, ErrUnknownMethod, []testutil.Action{},
  82. },
  83. }
  84. for i, tt := range tests {
  85. st := mockstore.NewRecorder()
  86. srv := &EtcdServer{
  87. store: st,
  88. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  89. }
  90. resp, err := srv.Do(context.TODO(), tt.req)
  91. if err != tt.werr {
  92. t.Fatalf("#%d: err = %+v, want %+v", i, err, tt.werr)
  93. }
  94. if !reflect.DeepEqual(resp, tt.wresp) {
  95. t.Errorf("#%d: resp = %+v, want %+v", i, resp, tt.wresp)
  96. }
  97. gaction := st.Action()
  98. if !reflect.DeepEqual(gaction, tt.wactions) {
  99. t.Errorf("#%d: action = %+v, want %+v", i, gaction, tt.wactions)
  100. }
  101. }
  102. }
  103. // TestDoBadLocalAction tests server requests which do not need to go through consensus,
  104. // and return errors when they fetch from local data.
  105. func TestDoBadLocalAction(t *testing.T) {
  106. storeErr := fmt.Errorf("bah")
  107. tests := []struct {
  108. req pb.Request
  109. wactions []testutil.Action
  110. }{
  111. {
  112. pb.Request{Method: "GET", ID: 1, Wait: true},
  113. []testutil.Action{{Name: "Watch"}},
  114. },
  115. {
  116. pb.Request{Method: "GET", ID: 1},
  117. []testutil.Action{
  118. {
  119. Name: "Get",
  120. Params: []interface{}{"", false, false},
  121. },
  122. },
  123. },
  124. {
  125. pb.Request{Method: "HEAD", ID: 1},
  126. []testutil.Action{
  127. {
  128. Name: "Get",
  129. Params: []interface{}{"", false, false},
  130. },
  131. },
  132. },
  133. }
  134. for i, tt := range tests {
  135. st := mockstore.NewErrRecorder(storeErr)
  136. srv := &EtcdServer{
  137. store: st,
  138. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  139. }
  140. resp, err := srv.Do(context.Background(), tt.req)
  141. if err != storeErr {
  142. t.Fatalf("#%d: err = %+v, want %+v", i, err, storeErr)
  143. }
  144. if !reflect.DeepEqual(resp, Response{}) {
  145. t.Errorf("#%d: resp = %+v, want %+v", i, resp, Response{})
  146. }
  147. gaction := st.Action()
  148. if !reflect.DeepEqual(gaction, tt.wactions) {
  149. t.Errorf("#%d: action = %+v, want %+v", i, gaction, tt.wactions)
  150. }
  151. }
  152. }
  153. // TestApplyRepeat tests that server handles repeat raft messages gracefully
  154. func TestApplyRepeat(t *testing.T) {
  155. n := newNodeConfChangeCommitterStream()
  156. n.readyc <- raft.Ready{
  157. SoftState: &raft.SoftState{RaftState: raft.StateLeader},
  158. }
  159. cl := newTestCluster(nil)
  160. st := store.New()
  161. cl.SetStore(store.New())
  162. cl.AddMember(&membership.Member{ID: 1234})
  163. r := newRaftNode(raftNodeConfig{
  164. Node: n,
  165. raftStorage: raft.NewMemoryStorage(),
  166. storage: mockstorage.NewStorageRecorder(""),
  167. transport: rafthttp.NewNopTransporter(),
  168. })
  169. s := &EtcdServer{
  170. r: *r,
  171. store: st,
  172. cluster: cl,
  173. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  174. SyncTicker: &time.Ticker{},
  175. }
  176. s.applyV2 = &applierV2store{store: s.store, cluster: s.cluster}
  177. s.start()
  178. req := &pb.Request{Method: "QGET", ID: uint64(1)}
  179. ents := []raftpb.Entry{{Index: 1, Data: pbutil.MustMarshal(req)}}
  180. n.readyc <- raft.Ready{CommittedEntries: ents}
  181. // dup msg
  182. n.readyc <- raft.Ready{CommittedEntries: ents}
  183. // use a conf change to block until dup msgs are all processed
  184. cc := &raftpb.ConfChange{Type: raftpb.ConfChangeRemoveNode, NodeID: 2}
  185. ents = []raftpb.Entry{{
  186. Index: 2,
  187. Type: raftpb.EntryConfChange,
  188. Data: pbutil.MustMarshal(cc),
  189. }}
  190. n.readyc <- raft.Ready{CommittedEntries: ents}
  191. // wait for conf change message
  192. act, err := n.Wait(1)
  193. // wait for stop message (async to avoid deadlock)
  194. stopc := make(chan error)
  195. go func() {
  196. _, werr := n.Wait(1)
  197. stopc <- werr
  198. }()
  199. s.Stop()
  200. // only want to confirm etcdserver won't panic; no data to check
  201. if err != nil {
  202. t.Fatal(err)
  203. }
  204. if len(act) == 0 {
  205. t.Fatalf("expected len(act)=0, got %d", len(act))
  206. }
  207. if err = <-stopc; err != nil {
  208. t.Fatalf("error on stop (%v)", err)
  209. }
  210. }
  211. func TestApplyRequest(t *testing.T) {
  212. tests := []struct {
  213. req pb.Request
  214. wresp Response
  215. wactions []testutil.Action
  216. }{
  217. // POST ==> Create
  218. {
  219. pb.Request{Method: "POST", ID: 1},
  220. Response{Event: &store.Event{}},
  221. []testutil.Action{
  222. {
  223. Name: "Create",
  224. Params: []interface{}{"", false, "", true, store.TTLOptionSet{ExpireTime: time.Time{}}},
  225. },
  226. },
  227. },
  228. // POST ==> Create, with expiration
  229. {
  230. pb.Request{Method: "POST", ID: 1, Expiration: 1337},
  231. Response{Event: &store.Event{}},
  232. []testutil.Action{
  233. {
  234. Name: "Create",
  235. Params: []interface{}{"", false, "", true, store.TTLOptionSet{ExpireTime: time.Unix(0, 1337)}},
  236. },
  237. },
  238. },
  239. // POST ==> Create, with dir
  240. {
  241. pb.Request{Method: "POST", ID: 1, Dir: true},
  242. Response{Event: &store.Event{}},
  243. []testutil.Action{
  244. {
  245. Name: "Create",
  246. Params: []interface{}{"", true, "", true, store.TTLOptionSet{ExpireTime: time.Time{}}},
  247. },
  248. },
  249. },
  250. // PUT ==> Set
  251. {
  252. pb.Request{Method: "PUT", ID: 1},
  253. Response{Event: &store.Event{}},
  254. []testutil.Action{
  255. {
  256. Name: "Set",
  257. Params: []interface{}{"", false, "", store.TTLOptionSet{ExpireTime: time.Time{}}},
  258. },
  259. },
  260. },
  261. // PUT ==> Set, with dir
  262. {
  263. pb.Request{Method: "PUT", ID: 1, Dir: true},
  264. Response{Event: &store.Event{}},
  265. []testutil.Action{
  266. {
  267. Name: "Set",
  268. Params: []interface{}{"", true, "", store.TTLOptionSet{ExpireTime: time.Time{}}},
  269. },
  270. },
  271. },
  272. // PUT with PrevExist=true ==> Update
  273. {
  274. pb.Request{Method: "PUT", ID: 1, PrevExist: pbutil.Boolp(true)},
  275. Response{Event: &store.Event{}},
  276. []testutil.Action{
  277. {
  278. Name: "Update",
  279. Params: []interface{}{"", "", store.TTLOptionSet{ExpireTime: time.Time{}}},
  280. },
  281. },
  282. },
  283. // PUT with PrevExist=false ==> Create
  284. {
  285. pb.Request{Method: "PUT", ID: 1, PrevExist: pbutil.Boolp(false)},
  286. Response{Event: &store.Event{}},
  287. []testutil.Action{
  288. {
  289. Name: "Create",
  290. Params: []interface{}{"", false, "", false, store.TTLOptionSet{ExpireTime: time.Time{}}},
  291. },
  292. },
  293. },
  294. // PUT with PrevExist=true *and* PrevIndex set ==> CompareAndSwap
  295. {
  296. pb.Request{Method: "PUT", ID: 1, PrevExist: pbutil.Boolp(true), PrevIndex: 1},
  297. Response{Event: &store.Event{}},
  298. []testutil.Action{
  299. {
  300. Name: "CompareAndSwap",
  301. Params: []interface{}{"", "", uint64(1), "", store.TTLOptionSet{ExpireTime: time.Time{}}},
  302. },
  303. },
  304. },
  305. // PUT with PrevExist=false *and* PrevIndex set ==> Create
  306. {
  307. pb.Request{Method: "PUT", ID: 1, PrevExist: pbutil.Boolp(false), PrevIndex: 1},
  308. Response{Event: &store.Event{}},
  309. []testutil.Action{
  310. {
  311. Name: "Create",
  312. Params: []interface{}{"", false, "", false, store.TTLOptionSet{ExpireTime: time.Time{}}},
  313. },
  314. },
  315. },
  316. // PUT with PrevIndex set ==> CompareAndSwap
  317. {
  318. pb.Request{Method: "PUT", ID: 1, PrevIndex: 1},
  319. Response{Event: &store.Event{}},
  320. []testutil.Action{
  321. {
  322. Name: "CompareAndSwap",
  323. Params: []interface{}{"", "", uint64(1), "", store.TTLOptionSet{ExpireTime: time.Time{}}},
  324. },
  325. },
  326. },
  327. // PUT with PrevValue set ==> CompareAndSwap
  328. {
  329. pb.Request{Method: "PUT", ID: 1, PrevValue: "bar"},
  330. Response{Event: &store.Event{}},
  331. []testutil.Action{
  332. {
  333. Name: "CompareAndSwap",
  334. Params: []interface{}{"", "bar", uint64(0), "", store.TTLOptionSet{ExpireTime: time.Time{}}},
  335. },
  336. },
  337. },
  338. // PUT with PrevIndex and PrevValue set ==> CompareAndSwap
  339. {
  340. pb.Request{Method: "PUT", ID: 1, PrevIndex: 1, PrevValue: "bar"},
  341. Response{Event: &store.Event{}},
  342. []testutil.Action{
  343. {
  344. Name: "CompareAndSwap",
  345. Params: []interface{}{"", "bar", uint64(1), "", store.TTLOptionSet{ExpireTime: time.Time{}}},
  346. },
  347. },
  348. },
  349. // DELETE ==> Delete
  350. {
  351. pb.Request{Method: "DELETE", ID: 1},
  352. Response{Event: &store.Event{}},
  353. []testutil.Action{
  354. {
  355. Name: "Delete",
  356. Params: []interface{}{"", false, false},
  357. },
  358. },
  359. },
  360. // DELETE with PrevIndex set ==> CompareAndDelete
  361. {
  362. pb.Request{Method: "DELETE", ID: 1, PrevIndex: 1},
  363. Response{Event: &store.Event{}},
  364. []testutil.Action{
  365. {
  366. Name: "CompareAndDelete",
  367. Params: []interface{}{"", "", uint64(1)},
  368. },
  369. },
  370. },
  371. // DELETE with PrevValue set ==> CompareAndDelete
  372. {
  373. pb.Request{Method: "DELETE", ID: 1, PrevValue: "bar"},
  374. Response{Event: &store.Event{}},
  375. []testutil.Action{
  376. {
  377. Name: "CompareAndDelete",
  378. Params: []interface{}{"", "bar", uint64(0)},
  379. },
  380. },
  381. },
  382. // DELETE with PrevIndex *and* PrevValue set ==> CompareAndDelete
  383. {
  384. pb.Request{Method: "DELETE", ID: 1, PrevIndex: 5, PrevValue: "bar"},
  385. Response{Event: &store.Event{}},
  386. []testutil.Action{
  387. {
  388. Name: "CompareAndDelete",
  389. Params: []interface{}{"", "bar", uint64(5)},
  390. },
  391. },
  392. },
  393. // QGET ==> Get
  394. {
  395. pb.Request{Method: "QGET", ID: 1},
  396. Response{Event: &store.Event{}},
  397. []testutil.Action{
  398. {
  399. Name: "Get",
  400. Params: []interface{}{"", false, false},
  401. },
  402. },
  403. },
  404. // SYNC ==> DeleteExpiredKeys
  405. {
  406. pb.Request{Method: "SYNC", ID: 1},
  407. Response{},
  408. []testutil.Action{
  409. {
  410. Name: "DeleteExpiredKeys",
  411. Params: []interface{}{time.Unix(0, 0)},
  412. },
  413. },
  414. },
  415. {
  416. pb.Request{Method: "SYNC", ID: 1, Time: 12345},
  417. Response{},
  418. []testutil.Action{
  419. {
  420. Name: "DeleteExpiredKeys",
  421. Params: []interface{}{time.Unix(0, 12345)},
  422. },
  423. },
  424. },
  425. // Unknown method - error
  426. {
  427. pb.Request{Method: "BADMETHOD", ID: 1},
  428. Response{Err: ErrUnknownMethod},
  429. []testutil.Action{},
  430. },
  431. }
  432. for i, tt := range tests {
  433. st := mockstore.NewRecorder()
  434. srv := &EtcdServer{store: st}
  435. srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}
  436. resp := srv.applyV2Request((*RequestV2)(&tt.req))
  437. if !reflect.DeepEqual(resp, tt.wresp) {
  438. t.Errorf("#%d: resp = %+v, want %+v", i, resp, tt.wresp)
  439. }
  440. gaction := st.Action()
  441. if !reflect.DeepEqual(gaction, tt.wactions) {
  442. t.Errorf("#%d: action = %#v, want %#v", i, gaction, tt.wactions)
  443. }
  444. }
  445. }
  446. func TestApplyRequestOnAdminMemberAttributes(t *testing.T) {
  447. cl := newTestCluster([]*membership.Member{{ID: 1}})
  448. srv := &EtcdServer{
  449. store: mockstore.NewRecorder(),
  450. cluster: cl,
  451. }
  452. srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}
  453. req := pb.Request{
  454. Method: "PUT",
  455. ID: 1,
  456. Path: membership.MemberAttributesStorePath(1),
  457. Val: `{"Name":"abc","ClientURLs":["http://127.0.0.1:2379"]}`,
  458. }
  459. srv.applyV2Request((*RequestV2)(&req))
  460. w := membership.Attributes{Name: "abc", ClientURLs: []string{"http://127.0.0.1:2379"}}
  461. if g := cl.Member(1).Attributes; !reflect.DeepEqual(g, w) {
  462. t.Errorf("attributes = %v, want %v", g, w)
  463. }
  464. }
  465. func TestApplyConfChangeError(t *testing.T) {
  466. cl := membership.NewCluster("")
  467. cl.SetStore(store.New())
  468. for i := 1; i <= 4; i++ {
  469. cl.AddMember(&membership.Member{ID: types.ID(i)})
  470. }
  471. cl.RemoveMember(4)
  472. tests := []struct {
  473. cc raftpb.ConfChange
  474. werr error
  475. }{
  476. {
  477. raftpb.ConfChange{
  478. Type: raftpb.ConfChangeAddNode,
  479. NodeID: 4,
  480. },
  481. membership.ErrIDRemoved,
  482. },
  483. {
  484. raftpb.ConfChange{
  485. Type: raftpb.ConfChangeUpdateNode,
  486. NodeID: 4,
  487. },
  488. membership.ErrIDRemoved,
  489. },
  490. {
  491. raftpb.ConfChange{
  492. Type: raftpb.ConfChangeAddNode,
  493. NodeID: 1,
  494. },
  495. membership.ErrIDExists,
  496. },
  497. {
  498. raftpb.ConfChange{
  499. Type: raftpb.ConfChangeRemoveNode,
  500. NodeID: 5,
  501. },
  502. membership.ErrIDNotFound,
  503. },
  504. }
  505. for i, tt := range tests {
  506. n := newNodeRecorder()
  507. srv := &EtcdServer{
  508. r: *newRaftNode(raftNodeConfig{Node: n}),
  509. cluster: cl,
  510. }
  511. _, err := srv.applyConfChange(tt.cc, nil)
  512. if err != tt.werr {
  513. t.Errorf("#%d: applyConfChange error = %v, want %v", i, err, tt.werr)
  514. }
  515. cc := raftpb.ConfChange{Type: tt.cc.Type, NodeID: raft.None}
  516. w := []testutil.Action{
  517. {
  518. Name: "ApplyConfChange",
  519. Params: []interface{}{cc},
  520. },
  521. }
  522. if g, _ := n.Wait(1); !reflect.DeepEqual(g, w) {
  523. t.Errorf("#%d: action = %+v, want %+v", i, g, w)
  524. }
  525. }
  526. }
  527. func TestApplyConfChangeShouldStop(t *testing.T) {
  528. cl := membership.NewCluster("")
  529. cl.SetStore(store.New())
  530. for i := 1; i <= 3; i++ {
  531. cl.AddMember(&membership.Member{ID: types.ID(i)})
  532. }
  533. r := newRaftNode(raftNodeConfig{
  534. Node: newNodeNop(),
  535. transport: rafthttp.NewNopTransporter(),
  536. })
  537. srv := &EtcdServer{
  538. id: 1,
  539. r: *r,
  540. cluster: cl,
  541. }
  542. cc := raftpb.ConfChange{
  543. Type: raftpb.ConfChangeRemoveNode,
  544. NodeID: 2,
  545. }
  546. // remove non-local member
  547. shouldStop, err := srv.applyConfChange(cc, &raftpb.ConfState{})
  548. if err != nil {
  549. t.Fatalf("unexpected error %v", err)
  550. }
  551. if shouldStop {
  552. t.Errorf("shouldStop = %t, want %t", shouldStop, false)
  553. }
  554. // remove local member
  555. cc.NodeID = 1
  556. shouldStop, err = srv.applyConfChange(cc, &raftpb.ConfState{})
  557. if err != nil {
  558. t.Fatalf("unexpected error %v", err)
  559. }
  560. if !shouldStop {
  561. t.Errorf("shouldStop = %t, want %t", shouldStop, true)
  562. }
  563. }
  564. // TestApplyConfigChangeUpdatesConsistIndex ensures a config change also updates the consistIndex
  565. // where consistIndex equals to applied index.
  566. func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {
  567. cl := membership.NewCluster("")
  568. cl.SetStore(store.New())
  569. cl.AddMember(&membership.Member{ID: types.ID(1)})
  570. r := newRaftNode(raftNodeConfig{
  571. Node: newNodeNop(),
  572. transport: rafthttp.NewNopTransporter(),
  573. })
  574. srv := &EtcdServer{
  575. id: 1,
  576. r: *r,
  577. cluster: cl,
  578. w: wait.New(),
  579. }
  580. // create EntryConfChange entry
  581. now := time.Now()
  582. urls, err := types.NewURLs([]string{"http://whatever:123"})
  583. if err != nil {
  584. t.Fatal(err)
  585. }
  586. m := membership.NewMember("", urls, "", &now)
  587. m.ID = types.ID(2)
  588. b, err := json.Marshal(m)
  589. if err != nil {
  590. t.Fatal(err)
  591. }
  592. cc := &raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 2, Context: b}
  593. ents := []raftpb.Entry{{
  594. Index: 2,
  595. Type: raftpb.EntryConfChange,
  596. Data: pbutil.MustMarshal(cc),
  597. }}
  598. _, appliedi, _ := srv.apply(ents, &raftpb.ConfState{})
  599. consistIndex := srv.consistIndex.ConsistentIndex()
  600. if consistIndex != appliedi {
  601. t.Fatalf("consistIndex = %v, want %v", consistIndex, appliedi)
  602. }
  603. }
  604. // TestApplyMultiConfChangeShouldStop ensures that apply will return shouldStop
  605. // if the local member is removed along with other conf updates.
  606. func TestApplyMultiConfChangeShouldStop(t *testing.T) {
  607. cl := membership.NewCluster("")
  608. cl.SetStore(store.New())
  609. for i := 1; i <= 5; i++ {
  610. cl.AddMember(&membership.Member{ID: types.ID(i)})
  611. }
  612. r := newRaftNode(raftNodeConfig{
  613. Node: newNodeNop(),
  614. transport: rafthttp.NewNopTransporter(),
  615. })
  616. srv := &EtcdServer{
  617. id: 2,
  618. r: *r,
  619. cluster: cl,
  620. w: wait.New(),
  621. }
  622. ents := []raftpb.Entry{}
  623. for i := 1; i <= 4; i++ {
  624. ent := raftpb.Entry{
  625. Term: 1,
  626. Index: uint64(i),
  627. Type: raftpb.EntryConfChange,
  628. Data: pbutil.MustMarshal(
  629. &raftpb.ConfChange{
  630. Type: raftpb.ConfChangeRemoveNode,
  631. NodeID: uint64(i)}),
  632. }
  633. ents = append(ents, ent)
  634. }
  635. _, _, shouldStop := srv.apply(ents, &raftpb.ConfState{})
  636. if !shouldStop {
  637. t.Errorf("shouldStop = %t, want %t", shouldStop, true)
  638. }
  639. }
  640. func TestDoProposal(t *testing.T) {
  641. tests := []pb.Request{
  642. {Method: "POST", ID: 1},
  643. {Method: "PUT", ID: 1},
  644. {Method: "DELETE", ID: 1},
  645. {Method: "GET", ID: 1, Quorum: true},
  646. }
  647. for i, tt := range tests {
  648. st := mockstore.NewRecorder()
  649. r := newRaftNode(raftNodeConfig{
  650. Node: newNodeCommitter(),
  651. storage: mockstorage.NewStorageRecorder(""),
  652. raftStorage: raft.NewMemoryStorage(),
  653. transport: rafthttp.NewNopTransporter(),
  654. })
  655. srv := &EtcdServer{
  656. Cfg: ServerConfig{TickMs: 1},
  657. r: *r,
  658. store: st,
  659. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  660. SyncTicker: &time.Ticker{},
  661. }
  662. srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}
  663. srv.start()
  664. resp, err := srv.Do(context.Background(), tt)
  665. srv.Stop()
  666. action := st.Action()
  667. if len(action) != 1 {
  668. t.Errorf("#%d: len(action) = %d, want 1", i, len(action))
  669. }
  670. if err != nil {
  671. t.Fatalf("#%d: err = %v, want nil", i, err)
  672. }
  673. // resp.Index is set in Do() based on the raft state; may either be 0 or 1
  674. wresp := Response{Event: &store.Event{}, Index: resp.Index}
  675. if !reflect.DeepEqual(resp, wresp) {
  676. t.Errorf("#%d: resp = %v, want %v", i, resp, wresp)
  677. }
  678. }
  679. }
  680. func TestDoProposalCancelled(t *testing.T) {
  681. wt := mockwait.NewRecorder()
  682. srv := &EtcdServer{
  683. Cfg: ServerConfig{TickMs: 1},
  684. r: *newRaftNode(raftNodeConfig{Node: newNodeNop()}),
  685. w: wt,
  686. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  687. }
  688. srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}
  689. ctx, cancel := context.WithCancel(context.Background())
  690. cancel()
  691. _, err := srv.Do(ctx, pb.Request{Method: "PUT"})
  692. if err != ErrCanceled {
  693. t.Fatalf("err = %v, want %v", err, ErrCanceled)
  694. }
  695. w := []testutil.Action{{Name: "Register"}, {Name: "Trigger"}}
  696. if !reflect.DeepEqual(wt.Action(), w) {
  697. t.Errorf("wt.action = %+v, want %+v", wt.Action(), w)
  698. }
  699. }
  700. func TestDoProposalTimeout(t *testing.T) {
  701. srv := &EtcdServer{
  702. Cfg: ServerConfig{TickMs: 1},
  703. r: *newRaftNode(raftNodeConfig{Node: newNodeNop()}),
  704. w: mockwait.NewNop(),
  705. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  706. }
  707. srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}
  708. ctx, cancel := context.WithTimeout(context.Background(), 0)
  709. _, err := srv.Do(ctx, pb.Request{Method: "PUT"})
  710. cancel()
  711. if err != ErrTimeout {
  712. t.Fatalf("err = %v, want %v", err, ErrTimeout)
  713. }
  714. }
  715. func TestDoProposalStopped(t *testing.T) {
  716. srv := &EtcdServer{
  717. Cfg: ServerConfig{TickMs: 1},
  718. r: *newRaftNode(raftNodeConfig{Node: newNodeNop()}),
  719. w: mockwait.NewNop(),
  720. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  721. }
  722. srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}
  723. srv.stopping = make(chan struct{})
  724. close(srv.stopping)
  725. _, err := srv.Do(context.Background(), pb.Request{Method: "PUT", ID: 1})
  726. if err != ErrStopped {
  727. t.Errorf("err = %v, want %v", err, ErrStopped)
  728. }
  729. }
  730. // TestSync tests sync 1. is nonblocking 2. proposes SYNC request.
  731. func TestSync(t *testing.T) {
  732. n := newNodeRecorder()
  733. ctx, cancel := context.WithCancel(context.TODO())
  734. srv := &EtcdServer{
  735. r: *newRaftNode(raftNodeConfig{Node: n}),
  736. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  737. ctx: ctx,
  738. cancel: cancel,
  739. }
  740. srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}
  741. // check that sync is non-blocking
  742. done := make(chan struct{})
  743. go func() {
  744. srv.sync(10 * time.Second)
  745. done <- struct{}{}
  746. }()
  747. select {
  748. case <-done:
  749. case <-time.After(time.Second):
  750. t.Fatal("sync should be non-blocking but did not return after 1s!")
  751. }
  752. action, _ := n.Wait(1)
  753. if len(action) != 1 {
  754. t.Fatalf("len(action) = %d, want 1", len(action))
  755. }
  756. if action[0].Name != "Propose" {
  757. t.Fatalf("action = %s, want Propose", action[0].Name)
  758. }
  759. data := action[0].Params[0].([]byte)
  760. var r pb.Request
  761. if err := r.Unmarshal(data); err != nil {
  762. t.Fatalf("unmarshal request error: %v", err)
  763. }
  764. if r.Method != "SYNC" {
  765. t.Errorf("method = %s, want SYNC", r.Method)
  766. }
  767. }
  768. // TestSyncTimeout tests the case that sync 1. is non-blocking 2. cancel request
  769. // after timeout
  770. func TestSyncTimeout(t *testing.T) {
  771. n := newProposalBlockerRecorder()
  772. ctx, cancel := context.WithCancel(context.TODO())
  773. srv := &EtcdServer{
  774. r: *newRaftNode(raftNodeConfig{Node: n}),
  775. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  776. ctx: ctx,
  777. cancel: cancel,
  778. }
  779. srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}
  780. // check that sync is non-blocking
  781. done := make(chan struct{})
  782. go func() {
  783. srv.sync(0)
  784. done <- struct{}{}
  785. }()
  786. select {
  787. case <-done:
  788. case <-time.After(time.Second):
  789. t.Fatal("sync should be non-blocking but did not return after 1s!")
  790. }
  791. w := []testutil.Action{{Name: "Propose blocked"}}
  792. if g, _ := n.Wait(1); !reflect.DeepEqual(g, w) {
  793. t.Errorf("action = %v, want %v", g, w)
  794. }
  795. }
  796. // TODO: TestNoSyncWhenNoLeader
  797. // TestSyncTrigger tests that the server proposes a SYNC request when its sync timer ticks
  798. func TestSyncTrigger(t *testing.T) {
  799. n := newReadyNode()
  800. st := make(chan time.Time, 1)
  801. tk := &time.Ticker{C: st}
  802. r := newRaftNode(raftNodeConfig{
  803. Node: n,
  804. raftStorage: raft.NewMemoryStorage(),
  805. transport: rafthttp.NewNopTransporter(),
  806. storage: mockstorage.NewStorageRecorder(""),
  807. })
  808. srv := &EtcdServer{
  809. Cfg: ServerConfig{TickMs: 1},
  810. r: *r,
  811. store: mockstore.NewNop(),
  812. SyncTicker: tk,
  813. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  814. }
  815. // trigger the server to become a leader and accept sync requests
  816. go func() {
  817. srv.start()
  818. n.readyc <- raft.Ready{
  819. SoftState: &raft.SoftState{
  820. RaftState: raft.StateLeader,
  821. },
  822. }
  823. // trigger a sync request
  824. st <- time.Time{}
  825. }()
  826. action, _ := n.Wait(1)
  827. go srv.Stop()
  828. if len(action) != 1 {
  829. t.Fatalf("len(action) = %d, want 1", len(action))
  830. }
  831. if action[0].Name != "Propose" {
  832. t.Fatalf("action = %s, want Propose", action[0].Name)
  833. }
  834. data := action[0].Params[0].([]byte)
  835. var req pb.Request
  836. if err := req.Unmarshal(data); err != nil {
  837. t.Fatalf("error unmarshalling data: %v", err)
  838. }
  839. if req.Method != "SYNC" {
  840. t.Fatalf("unexpected proposed request: %#v", req.Method)
  841. }
  842. // wait on stop message
  843. <-n.Chan()
  844. }
  845. // snapshot should snapshot the store and cut the persistent
  846. func TestSnapshot(t *testing.T) {
  847. be, tmpPath := backend.NewDefaultTmpBackend()
  848. defer func() {
  849. os.RemoveAll(tmpPath)
  850. }()
  851. s := raft.NewMemoryStorage()
  852. s.Append([]raftpb.Entry{{Index: 1}})
  853. st := mockstore.NewRecorderStream()
  854. p := mockstorage.NewStorageRecorderStream("")
  855. r := newRaftNode(raftNodeConfig{
  856. Node: newNodeNop(),
  857. raftStorage: s,
  858. storage: p,
  859. })
  860. srv := &EtcdServer{
  861. r: *r,
  862. store: st,
  863. }
  864. srv.kv = mvcc.New(be, &lease.FakeLessor{}, &srv.consistIndex)
  865. srv.be = be
  866. ch := make(chan struct{}, 2)
  867. go func() {
  868. gaction, _ := p.Wait(1)
  869. defer func() { ch <- struct{}{} }()
  870. if len(gaction) != 1 {
  871. t.Fatalf("len(action) = %d, want 1", len(gaction))
  872. }
  873. if !reflect.DeepEqual(gaction[0], testutil.Action{Name: "SaveSnap"}) {
  874. t.Errorf("action = %s, want SaveSnap", gaction[0])
  875. }
  876. }()
  877. go func() {
  878. gaction, _ := st.Wait(2)
  879. defer func() { ch <- struct{}{} }()
  880. if len(gaction) != 2 {
  881. t.Fatalf("len(action) = %d, want 2", len(gaction))
  882. }
  883. if !reflect.DeepEqual(gaction[0], testutil.Action{Name: "Clone"}) {
  884. t.Errorf("action = %s, want Clone", gaction[0])
  885. }
  886. if !reflect.DeepEqual(gaction[1], testutil.Action{Name: "SaveNoCopy"}) {
  887. t.Errorf("action = %s, want SaveNoCopy", gaction[1])
  888. }
  889. }()
  890. srv.snapshot(1, raftpb.ConfState{Nodes: []uint64{1}})
  891. <-ch
  892. <-ch
  893. }
  894. // TestSnapshotOrdering ensures raft persists snapshot onto disk before
  895. // snapshot db is applied.
  896. func TestSnapshotOrdering(t *testing.T) {
  897. n := newNopReadyNode()
  898. st := store.New()
  899. cl := membership.NewCluster("abc")
  900. cl.SetStore(st)
  901. testdir, err := ioutil.TempDir(os.TempDir(), "testsnapdir")
  902. if err != nil {
  903. t.Fatalf("couldn't open tempdir (%v)", err)
  904. }
  905. defer os.RemoveAll(testdir)
  906. snapdir := filepath.Join(testdir, "member", "snap")
  907. if err := os.MkdirAll(snapdir, 0755); err != nil {
  908. t.Fatalf("couldn't make snap dir (%v)", err)
  909. }
  910. rs := raft.NewMemoryStorage()
  911. p := mockstorage.NewStorageRecorderStream(testdir)
  912. tr, snapDoneC := rafthttp.NewSnapTransporter(snapdir)
  913. r := newRaftNode(raftNodeConfig{
  914. isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
  915. Node: n,
  916. transport: tr,
  917. storage: p,
  918. raftStorage: rs,
  919. })
  920. s := &EtcdServer{
  921. Cfg: ServerConfig{DataDir: testdir},
  922. r: *r,
  923. store: st,
  924. snapshotter: snap.New(snapdir),
  925. cluster: cl,
  926. SyncTicker: &time.Ticker{},
  927. }
  928. s.applyV2 = &applierV2store{store: s.store, cluster: s.cluster}
  929. be, tmpPath := backend.NewDefaultTmpBackend()
  930. defer os.RemoveAll(tmpPath)
  931. s.kv = mvcc.New(be, &lease.FakeLessor{}, &s.consistIndex)
  932. s.be = be
  933. s.start()
  934. defer s.Stop()
  935. n.readyc <- raft.Ready{Messages: []raftpb.Message{{Type: raftpb.MsgSnap}}}
  936. go func() {
  937. // get the snapshot sent by the transport
  938. snapMsg := <-snapDoneC
  939. // Snapshot first triggers raftnode to persists the snapshot onto disk
  940. // before renaming db snapshot file to db
  941. snapMsg.Snapshot.Metadata.Index = 1
  942. n.readyc <- raft.Ready{Snapshot: snapMsg.Snapshot}
  943. }()
  944. if ac := <-p.Chan(); ac.Name != "Save" {
  945. t.Fatalf("expected Save, got %+v", ac)
  946. }
  947. if ac := <-p.Chan(); ac.Name != "Save" {
  948. t.Fatalf("expected Save, got %+v", ac)
  949. }
  950. // confirm snapshot file still present before calling SaveSnap
  951. snapPath := filepath.Join(snapdir, fmt.Sprintf("%016x.snap.db", 1))
  952. if !fileutil.Exist(snapPath) {
  953. t.Fatalf("expected file %q, got missing", snapPath)
  954. }
  955. // unblock SaveSnapshot, etcdserver now permitted to move snapshot file
  956. if ac := <-p.Chan(); ac.Name != "SaveSnap" {
  957. t.Fatalf("expected SaveSnap, got %+v", ac)
  958. }
  959. }
  960. // Applied > SnapCount should trigger a SaveSnap event
  961. func TestTriggerSnap(t *testing.T) {
  962. be, tmpPath := backend.NewDefaultTmpBackend()
  963. defer func() {
  964. os.RemoveAll(tmpPath)
  965. }()
  966. snapc := 10
  967. st := mockstore.NewRecorder()
  968. p := mockstorage.NewStorageRecorderStream("")
  969. r := newRaftNode(raftNodeConfig{
  970. Node: newNodeCommitter(),
  971. raftStorage: raft.NewMemoryStorage(),
  972. storage: p,
  973. transport: rafthttp.NewNopTransporter(),
  974. })
  975. srv := &EtcdServer{
  976. Cfg: ServerConfig{TickMs: 1, SnapCount: uint64(snapc)},
  977. r: *r,
  978. store: st,
  979. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  980. SyncTicker: &time.Ticker{},
  981. }
  982. srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}
  983. srv.kv = mvcc.New(be, &lease.FakeLessor{}, &srv.consistIndex)
  984. srv.be = be
  985. srv.start()
  986. donec := make(chan struct{})
  987. go func() {
  988. wcnt := 2 + snapc
  989. gaction, _ := p.Wait(wcnt)
  990. // each operation is recorded as a Save
  991. // (SnapCount+1) * Puts + SaveSnap = (SnapCount+1) * Save + SaveSnap
  992. if len(gaction) != wcnt {
  993. t.Fatalf("len(action) = %d, want %d", len(gaction), wcnt)
  994. }
  995. if !reflect.DeepEqual(gaction[wcnt-1], testutil.Action{Name: "SaveSnap"}) {
  996. t.Errorf("action = %s, want SaveSnap", gaction[wcnt-1])
  997. }
  998. close(donec)
  999. }()
  1000. for i := 0; i < snapc+1; i++ {
  1001. srv.Do(context.Background(), pb.Request{Method: "PUT"})
  1002. }
  1003. <-donec
  1004. srv.Stop()
  1005. }
  1006. // TestConcurrentApplyAndSnapshotV3 will send out snapshots concurrently with
  1007. // proposals.
  1008. func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
  1009. n := newNopReadyNode()
  1010. st := store.New()
  1011. cl := membership.NewCluster("abc")
  1012. cl.SetStore(st)
  1013. testdir, err := ioutil.TempDir(os.TempDir(), "testsnapdir")
  1014. if err != nil {
  1015. t.Fatalf("Couldn't open tempdir (%v)", err)
  1016. }
  1017. defer os.RemoveAll(testdir)
  1018. if err := os.MkdirAll(testdir+"/member/snap", 0755); err != nil {
  1019. t.Fatalf("Couldn't make snap dir (%v)", err)
  1020. }
  1021. rs := raft.NewMemoryStorage()
  1022. tr, snapDoneC := rafthttp.NewSnapTransporter(testdir)
  1023. r := newRaftNode(raftNodeConfig{
  1024. isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
  1025. Node: n,
  1026. transport: tr,
  1027. storage: mockstorage.NewStorageRecorder(testdir),
  1028. raftStorage: rs,
  1029. })
  1030. s := &EtcdServer{
  1031. Cfg: ServerConfig{DataDir: testdir},
  1032. r: *r,
  1033. store: st,
  1034. snapshotter: snap.New(testdir),
  1035. cluster: cl,
  1036. SyncTicker: &time.Ticker{},
  1037. }
  1038. s.applyV2 = &applierV2store{store: s.store, cluster: s.cluster}
  1039. be, tmpPath := backend.NewDefaultTmpBackend()
  1040. defer func() {
  1041. os.RemoveAll(tmpPath)
  1042. }()
  1043. s.kv = mvcc.New(be, &lease.FakeLessor{}, &s.consistIndex)
  1044. s.be = be
  1045. s.start()
  1046. defer s.Stop()
  1047. // submit applied entries and snap entries
  1048. idx := uint64(0)
  1049. outdated := 0
  1050. accepted := 0
  1051. for k := 1; k <= 101; k++ {
  1052. idx++
  1053. ch := s.w.Register(uint64(idx))
  1054. req := &pb.Request{Method: "QGET", ID: uint64(idx)}
  1055. ent := raftpb.Entry{Index: uint64(idx), Data: pbutil.MustMarshal(req)}
  1056. ready := raft.Ready{Entries: []raftpb.Entry{ent}}
  1057. n.readyc <- ready
  1058. ready = raft.Ready{CommittedEntries: []raftpb.Entry{ent}}
  1059. n.readyc <- ready
  1060. // "idx" applied
  1061. <-ch
  1062. // one snapshot for every two messages
  1063. if k%2 != 0 {
  1064. continue
  1065. }
  1066. n.readyc <- raft.Ready{Messages: []raftpb.Message{{Type: raftpb.MsgSnap}}}
  1067. // get the snapshot sent by the transport
  1068. snapMsg := <-snapDoneC
  1069. // If the snapshot trails applied records, recovery will panic
  1070. // since there's no allocated snapshot at the place of the
  1071. // snapshot record. This only happens when the applier and the
  1072. // snapshot sender get out of sync.
  1073. if snapMsg.Snapshot.Metadata.Index == idx {
  1074. idx++
  1075. snapMsg.Snapshot.Metadata.Index = idx
  1076. ready = raft.Ready{Snapshot: snapMsg.Snapshot}
  1077. n.readyc <- ready
  1078. accepted++
  1079. } else {
  1080. outdated++
  1081. }
  1082. // don't wait for the snapshot to complete, move to next message
  1083. }
  1084. if accepted != 50 {
  1085. t.Errorf("accepted=%v, want 50", accepted)
  1086. }
  1087. if outdated != 0 {
  1088. t.Errorf("outdated=%v, want 0", outdated)
  1089. }
  1090. }
  1091. // TestAddMember tests AddMember can propose and perform node addition.
  1092. func TestAddMember(t *testing.T) {
  1093. n := newNodeConfChangeCommitterRecorder()
  1094. n.readyc <- raft.Ready{
  1095. SoftState: &raft.SoftState{RaftState: raft.StateLeader},
  1096. }
  1097. cl := newTestCluster(nil)
  1098. st := store.New()
  1099. cl.SetStore(st)
  1100. r := newRaftNode(raftNodeConfig{
  1101. Node: n,
  1102. raftStorage: raft.NewMemoryStorage(),
  1103. storage: mockstorage.NewStorageRecorder(""),
  1104. transport: rafthttp.NewNopTransporter(),
  1105. })
  1106. s := &EtcdServer{
  1107. r: *r,
  1108. store: st,
  1109. cluster: cl,
  1110. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  1111. SyncTicker: &time.Ticker{},
  1112. }
  1113. s.start()
  1114. m := membership.Member{ID: 1234, RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"foo"}}}
  1115. _, err := s.AddMember(context.TODO(), m)
  1116. gaction := n.Action()
  1117. s.Stop()
  1118. if err != nil {
  1119. t.Fatalf("AddMember error: %v", err)
  1120. }
  1121. wactions := []testutil.Action{{Name: "ProposeConfChange:ConfChangeAddNode"}, {Name: "ApplyConfChange:ConfChangeAddNode"}}
  1122. if !reflect.DeepEqual(gaction, wactions) {
  1123. t.Errorf("action = %v, want %v", gaction, wactions)
  1124. }
  1125. if cl.Member(1234) == nil {
  1126. t.Errorf("member with id 1234 is not added")
  1127. }
  1128. }
  1129. // TestRemoveMember tests RemoveMember can propose and perform node removal.
  1130. func TestRemoveMember(t *testing.T) {
  1131. n := newNodeConfChangeCommitterRecorder()
  1132. n.readyc <- raft.Ready{
  1133. SoftState: &raft.SoftState{RaftState: raft.StateLeader},
  1134. }
  1135. cl := newTestCluster(nil)
  1136. st := store.New()
  1137. cl.SetStore(store.New())
  1138. cl.AddMember(&membership.Member{ID: 1234})
  1139. r := newRaftNode(raftNodeConfig{
  1140. Node: n,
  1141. raftStorage: raft.NewMemoryStorage(),
  1142. storage: mockstorage.NewStorageRecorder(""),
  1143. transport: rafthttp.NewNopTransporter(),
  1144. })
  1145. s := &EtcdServer{
  1146. r: *r,
  1147. store: st,
  1148. cluster: cl,
  1149. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  1150. SyncTicker: &time.Ticker{},
  1151. }
  1152. s.start()
  1153. _, err := s.RemoveMember(context.TODO(), 1234)
  1154. gaction := n.Action()
  1155. s.Stop()
  1156. if err != nil {
  1157. t.Fatalf("RemoveMember error: %v", err)
  1158. }
  1159. wactions := []testutil.Action{{Name: "ProposeConfChange:ConfChangeRemoveNode"}, {Name: "ApplyConfChange:ConfChangeRemoveNode"}}
  1160. if !reflect.DeepEqual(gaction, wactions) {
  1161. t.Errorf("action = %v, want %v", gaction, wactions)
  1162. }
  1163. if cl.Member(1234) != nil {
  1164. t.Errorf("member with id 1234 is not removed")
  1165. }
  1166. }
  1167. // TestUpdateMember tests RemoveMember can propose and perform node update.
  1168. func TestUpdateMember(t *testing.T) {
  1169. n := newNodeConfChangeCommitterRecorder()
  1170. n.readyc <- raft.Ready{
  1171. SoftState: &raft.SoftState{RaftState: raft.StateLeader},
  1172. }
  1173. cl := newTestCluster(nil)
  1174. st := store.New()
  1175. cl.SetStore(st)
  1176. cl.AddMember(&membership.Member{ID: 1234})
  1177. r := newRaftNode(raftNodeConfig{
  1178. Node: n,
  1179. raftStorage: raft.NewMemoryStorage(),
  1180. storage: mockstorage.NewStorageRecorder(""),
  1181. transport: rafthttp.NewNopTransporter(),
  1182. })
  1183. s := &EtcdServer{
  1184. r: *r,
  1185. store: st,
  1186. cluster: cl,
  1187. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  1188. SyncTicker: &time.Ticker{},
  1189. }
  1190. s.start()
  1191. wm := membership.Member{ID: 1234, RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"http://127.0.0.1:1"}}}
  1192. _, err := s.UpdateMember(context.TODO(), wm)
  1193. gaction := n.Action()
  1194. s.Stop()
  1195. if err != nil {
  1196. t.Fatalf("UpdateMember error: %v", err)
  1197. }
  1198. wactions := []testutil.Action{{Name: "ProposeConfChange:ConfChangeUpdateNode"}, {Name: "ApplyConfChange:ConfChangeUpdateNode"}}
  1199. if !reflect.DeepEqual(gaction, wactions) {
  1200. t.Errorf("action = %v, want %v", gaction, wactions)
  1201. }
  1202. if !reflect.DeepEqual(cl.Member(1234), &wm) {
  1203. t.Errorf("member = %v, want %v", cl.Member(1234), &wm)
  1204. }
  1205. }
  1206. // TODO: test server could stop itself when being removed
  1207. func TestPublish(t *testing.T) {
  1208. n := newNodeRecorder()
  1209. ch := make(chan interface{}, 1)
  1210. // simulate that request has gone through consensus
  1211. ch <- Response{}
  1212. w := wait.NewWithResponse(ch)
  1213. ctx, cancel := context.WithCancel(context.TODO())
  1214. srv := &EtcdServer{
  1215. readych: make(chan struct{}),
  1216. Cfg: ServerConfig{TickMs: 1},
  1217. id: 1,
  1218. r: *newRaftNode(raftNodeConfig{Node: n}),
  1219. attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}},
  1220. cluster: &membership.RaftCluster{},
  1221. w: w,
  1222. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  1223. SyncTicker: &time.Ticker{},
  1224. ctx: ctx,
  1225. cancel: cancel,
  1226. }
  1227. srv.publish(time.Hour)
  1228. action := n.Action()
  1229. if len(action) != 1 {
  1230. t.Fatalf("len(action) = %d, want 1", len(action))
  1231. }
  1232. if action[0].Name != "Propose" {
  1233. t.Fatalf("action = %s, want Propose", action[0].Name)
  1234. }
  1235. data := action[0].Params[0].([]byte)
  1236. var r pb.Request
  1237. if err := r.Unmarshal(data); err != nil {
  1238. t.Fatalf("unmarshal request error: %v", err)
  1239. }
  1240. if r.Method != "PUT" {
  1241. t.Errorf("method = %s, want PUT", r.Method)
  1242. }
  1243. wm := membership.Member{ID: 1, Attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}}}
  1244. if wpath := membership.MemberAttributesStorePath(wm.ID); r.Path != wpath {
  1245. t.Errorf("path = %s, want %s", r.Path, wpath)
  1246. }
  1247. var gattr membership.Attributes
  1248. if err := json.Unmarshal([]byte(r.Val), &gattr); err != nil {
  1249. t.Fatalf("unmarshal val error: %v", err)
  1250. }
  1251. if !reflect.DeepEqual(gattr, wm.Attributes) {
  1252. t.Errorf("member = %v, want %v", gattr, wm.Attributes)
  1253. }
  1254. }
  1255. // TestPublishStopped tests that publish will be stopped if server is stopped.
  1256. func TestPublishStopped(t *testing.T) {
  1257. ctx, cancel := context.WithCancel(context.TODO())
  1258. r := newRaftNode(raftNodeConfig{
  1259. Node: newNodeNop(),
  1260. transport: rafthttp.NewNopTransporter(),
  1261. })
  1262. srv := &EtcdServer{
  1263. Cfg: ServerConfig{TickMs: 1},
  1264. r: *r,
  1265. cluster: &membership.RaftCluster{},
  1266. w: mockwait.NewNop(),
  1267. done: make(chan struct{}),
  1268. stopping: make(chan struct{}),
  1269. stop: make(chan struct{}),
  1270. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  1271. SyncTicker: &time.Ticker{},
  1272. ctx: ctx,
  1273. cancel: cancel,
  1274. }
  1275. close(srv.stopping)
  1276. srv.publish(time.Hour)
  1277. }
  1278. // TestPublishRetry tests that publish will keep retry until success.
  1279. func TestPublishRetry(t *testing.T) {
  1280. ctx, cancel := context.WithCancel(context.TODO())
  1281. n := newNodeRecorderStream()
  1282. srv := &EtcdServer{
  1283. Cfg: ServerConfig{TickMs: 1},
  1284. r: *newRaftNode(raftNodeConfig{Node: n}),
  1285. w: mockwait.NewNop(),
  1286. stopping: make(chan struct{}),
  1287. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  1288. SyncTicker: &time.Ticker{},
  1289. ctx: ctx,
  1290. cancel: cancel,
  1291. }
  1292. // expect multiple proposals from retrying
  1293. ch := make(chan struct{})
  1294. go func() {
  1295. defer close(ch)
  1296. if action, err := n.Wait(2); err != nil {
  1297. t.Errorf("len(action) = %d, want >= 2 (%v)", len(action), err)
  1298. }
  1299. close(srv.stopping)
  1300. // drain remaining actions, if any, so publish can terminate
  1301. for {
  1302. select {
  1303. case <-ch:
  1304. return
  1305. default:
  1306. n.Action()
  1307. }
  1308. }
  1309. }()
  1310. srv.publish(10 * time.Nanosecond)
  1311. ch <- struct{}{}
  1312. <-ch
  1313. }
  1314. func TestUpdateVersion(t *testing.T) {
  1315. n := newNodeRecorder()
  1316. ch := make(chan interface{}, 1)
  1317. // simulate that request has gone through consensus
  1318. ch <- Response{}
  1319. w := wait.NewWithResponse(ch)
  1320. ctx, cancel := context.WithCancel(context.TODO())
  1321. srv := &EtcdServer{
  1322. id: 1,
  1323. Cfg: ServerConfig{TickMs: 1},
  1324. r: *newRaftNode(raftNodeConfig{Node: n}),
  1325. attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://node1.com"}},
  1326. cluster: &membership.RaftCluster{},
  1327. w: w,
  1328. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  1329. SyncTicker: &time.Ticker{},
  1330. ctx: ctx,
  1331. cancel: cancel,
  1332. }
  1333. srv.updateClusterVersion("2.0.0")
  1334. action := n.Action()
  1335. if len(action) != 1 {
  1336. t.Fatalf("len(action) = %d, want 1", len(action))
  1337. }
  1338. if action[0].Name != "Propose" {
  1339. t.Fatalf("action = %s, want Propose", action[0].Name)
  1340. }
  1341. data := action[0].Params[0].([]byte)
  1342. var r pb.Request
  1343. if err := r.Unmarshal(data); err != nil {
  1344. t.Fatalf("unmarshal request error: %v", err)
  1345. }
  1346. if r.Method != "PUT" {
  1347. t.Errorf("method = %s, want PUT", r.Method)
  1348. }
  1349. if wpath := path.Join(StoreClusterPrefix, "version"); r.Path != wpath {
  1350. t.Errorf("path = %s, want %s", r.Path, wpath)
  1351. }
  1352. if r.Val != "2.0.0" {
  1353. t.Errorf("val = %s, want %s", r.Val, "2.0.0")
  1354. }
  1355. }
  1356. func TestStopNotify(t *testing.T) {
  1357. s := &EtcdServer{
  1358. stop: make(chan struct{}),
  1359. done: make(chan struct{}),
  1360. }
  1361. go func() {
  1362. <-s.stop
  1363. close(s.done)
  1364. }()
  1365. notifier := s.StopNotify()
  1366. select {
  1367. case <-notifier:
  1368. t.Fatalf("received unexpected stop notification")
  1369. default:
  1370. }
  1371. s.Stop()
  1372. select {
  1373. case <-notifier:
  1374. default:
  1375. t.Fatalf("cannot receive stop notification")
  1376. }
  1377. }
  1378. func TestGetOtherPeerURLs(t *testing.T) {
  1379. tests := []struct {
  1380. membs []*membership.Member
  1381. wurls []string
  1382. }{
  1383. {
  1384. []*membership.Member{
  1385. membership.NewMember("1", types.MustNewURLs([]string{"http://10.0.0.1:1"}), "a", nil),
  1386. },
  1387. []string{},
  1388. },
  1389. {
  1390. []*membership.Member{
  1391. membership.NewMember("1", types.MustNewURLs([]string{"http://10.0.0.1:1"}), "a", nil),
  1392. membership.NewMember("2", types.MustNewURLs([]string{"http://10.0.0.2:2"}), "a", nil),
  1393. membership.NewMember("3", types.MustNewURLs([]string{"http://10.0.0.3:3"}), "a", nil),
  1394. },
  1395. []string{"http://10.0.0.2:2", "http://10.0.0.3:3"},
  1396. },
  1397. {
  1398. []*membership.Member{
  1399. membership.NewMember("1", types.MustNewURLs([]string{"http://10.0.0.1:1"}), "a", nil),
  1400. membership.NewMember("3", types.MustNewURLs([]string{"http://10.0.0.3:3"}), "a", nil),
  1401. membership.NewMember("2", types.MustNewURLs([]string{"http://10.0.0.2:2"}), "a", nil),
  1402. },
  1403. []string{"http://10.0.0.2:2", "http://10.0.0.3:3"},
  1404. },
  1405. }
  1406. for i, tt := range tests {
  1407. cl := membership.NewClusterFromMembers("", types.ID(0), tt.membs)
  1408. self := "1"
  1409. urls := getRemotePeerURLs(cl, self)
  1410. if !reflect.DeepEqual(urls, tt.wurls) {
  1411. t.Errorf("#%d: urls = %+v, want %+v", i, urls, tt.wurls)
  1412. }
  1413. }
  1414. }
  1415. type nodeRecorder struct{ testutil.Recorder }
  1416. func newNodeRecorder() *nodeRecorder { return &nodeRecorder{&testutil.RecorderBuffered{}} }
  1417. func newNodeRecorderStream() *nodeRecorder { return &nodeRecorder{testutil.NewRecorderStream()} }
  1418. func newNodeNop() raft.Node { return newNodeRecorder() }
  1419. func (n *nodeRecorder) Tick() { n.Record(testutil.Action{Name: "Tick"}) }
  1420. func (n *nodeRecorder) Campaign(ctx context.Context) error {
  1421. n.Record(testutil.Action{Name: "Campaign"})
  1422. return nil
  1423. }
  1424. func (n *nodeRecorder) Propose(ctx context.Context, data []byte) error {
  1425. n.Record(testutil.Action{Name: "Propose", Params: []interface{}{data}})
  1426. return nil
  1427. }
  1428. func (n *nodeRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error {
  1429. n.Record(testutil.Action{Name: "ProposeConfChange"})
  1430. return nil
  1431. }
  1432. func (n *nodeRecorder) Step(ctx context.Context, msg raftpb.Message) error {
  1433. n.Record(testutil.Action{Name: "Step"})
  1434. return nil
  1435. }
  1436. func (n *nodeRecorder) Status() raft.Status { return raft.Status{} }
  1437. func (n *nodeRecorder) Ready() <-chan raft.Ready { return nil }
  1438. func (n *nodeRecorder) TransferLeadership(ctx context.Context, lead, transferee uint64) {}
  1439. func (n *nodeRecorder) ReadIndex(ctx context.Context, rctx []byte) error { return nil }
  1440. func (n *nodeRecorder) Advance() {}
  1441. func (n *nodeRecorder) ApplyConfChange(conf raftpb.ConfChange) *raftpb.ConfState {
  1442. n.Record(testutil.Action{Name: "ApplyConfChange", Params: []interface{}{conf}})
  1443. return &raftpb.ConfState{}
  1444. }
  1445. func (n *nodeRecorder) Stop() {
  1446. n.Record(testutil.Action{Name: "Stop"})
  1447. }
  1448. func (n *nodeRecorder) ReportUnreachable(id uint64) {}
  1449. func (n *nodeRecorder) ReportSnapshot(id uint64, status raft.SnapshotStatus) {}
  1450. func (n *nodeRecorder) Compact(index uint64, nodes []uint64, d []byte) {
  1451. n.Record(testutil.Action{Name: "Compact"})
  1452. }
  1453. type nodeProposalBlockerRecorder struct {
  1454. nodeRecorder
  1455. }
  1456. func newProposalBlockerRecorder() *nodeProposalBlockerRecorder {
  1457. return &nodeProposalBlockerRecorder{*newNodeRecorderStream()}
  1458. }
  1459. func (n *nodeProposalBlockerRecorder) Propose(ctx context.Context, data []byte) error {
  1460. <-ctx.Done()
  1461. n.Record(testutil.Action{Name: "Propose blocked"})
  1462. return nil
  1463. }
  1464. // readyNode is a nodeRecorder with a user-writeable ready channel
  1465. type readyNode struct {
  1466. nodeRecorder
  1467. readyc chan raft.Ready
  1468. }
  1469. func newReadyNode() *readyNode {
  1470. return &readyNode{
  1471. nodeRecorder{testutil.NewRecorderStream()},
  1472. make(chan raft.Ready, 1)}
  1473. }
  1474. func newNopReadyNode() *readyNode {
  1475. return &readyNode{*newNodeRecorder(), make(chan raft.Ready, 1)}
  1476. }
  1477. func (n *readyNode) Ready() <-chan raft.Ready { return n.readyc }
  1478. type nodeConfChangeCommitterRecorder struct {
  1479. readyNode
  1480. index uint64
  1481. }
  1482. func newNodeConfChangeCommitterRecorder() *nodeConfChangeCommitterRecorder {
  1483. return &nodeConfChangeCommitterRecorder{*newNopReadyNode(), 0}
  1484. }
  1485. func newNodeConfChangeCommitterStream() *nodeConfChangeCommitterRecorder {
  1486. return &nodeConfChangeCommitterRecorder{*newReadyNode(), 0}
  1487. }
  1488. func (n *nodeConfChangeCommitterRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error {
  1489. data, err := conf.Marshal()
  1490. if err != nil {
  1491. return err
  1492. }
  1493. n.index++
  1494. n.Record(testutil.Action{Name: "ProposeConfChange:" + conf.Type.String()})
  1495. n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Index: n.index, Type: raftpb.EntryConfChange, Data: data}}}
  1496. return nil
  1497. }
  1498. func (n *nodeConfChangeCommitterRecorder) Ready() <-chan raft.Ready {
  1499. return n.readyc
  1500. }
  1501. func (n *nodeConfChangeCommitterRecorder) ApplyConfChange(conf raftpb.ConfChange) *raftpb.ConfState {
  1502. n.Record(testutil.Action{Name: "ApplyConfChange:" + conf.Type.String()})
  1503. return &raftpb.ConfState{}
  1504. }
  1505. // nodeCommitter commits proposed data immediately.
  1506. type nodeCommitter struct {
  1507. readyNode
  1508. index uint64
  1509. }
  1510. func newNodeCommitter() raft.Node {
  1511. return &nodeCommitter{*newNopReadyNode(), 0}
  1512. }
  1513. func (n *nodeCommitter) Propose(ctx context.Context, data []byte) error {
  1514. n.index++
  1515. ents := []raftpb.Entry{{Index: n.index, Data: data}}
  1516. n.readyc <- raft.Ready{
  1517. Entries: ents,
  1518. CommittedEntries: ents,
  1519. }
  1520. return nil
  1521. }
  1522. func newTestCluster(membs []*membership.Member) *membership.RaftCluster {
  1523. c := membership.NewCluster("")
  1524. for _, m := range membs {
  1525. c.AddMember(m)
  1526. }
  1527. return c
  1528. }