server_test.go 47 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729
  1. // Copyright 2015 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package etcdserver
  15. import (
  16. "context"
  17. "encoding/json"
  18. "fmt"
  19. "io/ioutil"
  20. "os"
  21. "path"
  22. "path/filepath"
  23. "reflect"
  24. "sync"
  25. "testing"
  26. "time"
  27. "go.uber.org/zap"
  28. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  29. "github.com/coreos/etcd/etcdserver/membership"
  30. "github.com/coreos/etcd/etcdserver/v2store"
  31. "github.com/coreos/etcd/lease"
  32. "github.com/coreos/etcd/mvcc"
  33. "github.com/coreos/etcd/mvcc/backend"
  34. "github.com/coreos/etcd/pkg/fileutil"
  35. "github.com/coreos/etcd/pkg/idutil"
  36. "github.com/coreos/etcd/pkg/mock/mockstorage"
  37. "github.com/coreos/etcd/pkg/mock/mockstore"
  38. "github.com/coreos/etcd/pkg/mock/mockwait"
  39. "github.com/coreos/etcd/pkg/pbutil"
  40. "github.com/coreos/etcd/pkg/testutil"
  41. "github.com/coreos/etcd/pkg/types"
  42. "github.com/coreos/etcd/pkg/wait"
  43. "github.com/coreos/etcd/raft"
  44. "github.com/coreos/etcd/raft/raftpb"
  45. "github.com/coreos/etcd/rafthttp"
  46. "github.com/coreos/etcd/raftsnap"
  47. )
  48. // TestDoLocalAction tests requests which do not need to go through raft to be applied,
  49. // and are served through local data.
  50. func TestDoLocalAction(t *testing.T) {
  51. tests := []struct {
  52. req pb.Request
  53. wresp Response
  54. werr error
  55. wactions []testutil.Action
  56. }{
  57. {
  58. pb.Request{Method: "GET", ID: 1, Wait: true},
  59. Response{Watcher: v2store.NewNopWatcher()}, nil, []testutil.Action{{Name: "Watch"}},
  60. },
  61. {
  62. pb.Request{Method: "GET", ID: 1},
  63. Response{Event: &v2store.Event{}}, nil,
  64. []testutil.Action{
  65. {
  66. Name: "Get",
  67. Params: []interface{}{"", false, false},
  68. },
  69. },
  70. },
  71. {
  72. pb.Request{Method: "HEAD", ID: 1},
  73. Response{Event: &v2store.Event{}}, nil,
  74. []testutil.Action{
  75. {
  76. Name: "Get",
  77. Params: []interface{}{"", false, false},
  78. },
  79. },
  80. },
  81. {
  82. pb.Request{Method: "BADMETHOD", ID: 1},
  83. Response{}, ErrUnknownMethod, []testutil.Action{},
  84. },
  85. }
  86. for i, tt := range tests {
  87. st := mockstore.NewRecorder()
  88. srv := &EtcdServer{
  89. lgMu: new(sync.RWMutex),
  90. lg: zap.NewExample(),
  91. v2store: st,
  92. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  93. }
  94. resp, err := srv.Do(context.TODO(), tt.req)
  95. if err != tt.werr {
  96. t.Fatalf("#%d: err = %+v, want %+v", i, err, tt.werr)
  97. }
  98. if !reflect.DeepEqual(resp, tt.wresp) {
  99. t.Errorf("#%d: resp = %+v, want %+v", i, resp, tt.wresp)
  100. }
  101. gaction := st.Action()
  102. if !reflect.DeepEqual(gaction, tt.wactions) {
  103. t.Errorf("#%d: action = %+v, want %+v", i, gaction, tt.wactions)
  104. }
  105. }
  106. }
  107. // TestDoBadLocalAction tests server requests which do not need to go through consensus,
  108. // and return errors when they fetch from local data.
  109. func TestDoBadLocalAction(t *testing.T) {
  110. storeErr := fmt.Errorf("bah")
  111. tests := []struct {
  112. req pb.Request
  113. wactions []testutil.Action
  114. }{
  115. {
  116. pb.Request{Method: "GET", ID: 1, Wait: true},
  117. []testutil.Action{{Name: "Watch"}},
  118. },
  119. {
  120. pb.Request{Method: "GET", ID: 1},
  121. []testutil.Action{
  122. {
  123. Name: "Get",
  124. Params: []interface{}{"", false, false},
  125. },
  126. },
  127. },
  128. {
  129. pb.Request{Method: "HEAD", ID: 1},
  130. []testutil.Action{
  131. {
  132. Name: "Get",
  133. Params: []interface{}{"", false, false},
  134. },
  135. },
  136. },
  137. }
  138. for i, tt := range tests {
  139. st := mockstore.NewErrRecorder(storeErr)
  140. srv := &EtcdServer{
  141. lgMu: new(sync.RWMutex),
  142. lg: zap.NewExample(),
  143. v2store: st,
  144. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  145. }
  146. resp, err := srv.Do(context.Background(), tt.req)
  147. if err != storeErr {
  148. t.Fatalf("#%d: err = %+v, want %+v", i, err, storeErr)
  149. }
  150. if !reflect.DeepEqual(resp, Response{}) {
  151. t.Errorf("#%d: resp = %+v, want %+v", i, resp, Response{})
  152. }
  153. gaction := st.Action()
  154. if !reflect.DeepEqual(gaction, tt.wactions) {
  155. t.Errorf("#%d: action = %+v, want %+v", i, gaction, tt.wactions)
  156. }
  157. }
  158. }
  159. // TestApplyRepeat tests that server handles repeat raft messages gracefully
  160. func TestApplyRepeat(t *testing.T) {
  161. n := newNodeConfChangeCommitterStream()
  162. n.readyc <- raft.Ready{
  163. SoftState: &raft.SoftState{RaftState: raft.StateLeader},
  164. }
  165. cl := newTestCluster(nil)
  166. st := v2store.New()
  167. cl.SetStore(v2store.New())
  168. cl.AddMember(&membership.Member{ID: 1234})
  169. r := newRaftNode(raftNodeConfig{
  170. lg: zap.NewExample(),
  171. Node: n,
  172. raftStorage: raft.NewMemoryStorage(),
  173. storage: mockstorage.NewStorageRecorder(""),
  174. transport: rafthttp.NewNopTransporter(),
  175. })
  176. s := &EtcdServer{
  177. lgMu: new(sync.RWMutex),
  178. lg: zap.NewExample(),
  179. r: *r,
  180. v2store: st,
  181. cluster: cl,
  182. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  183. SyncTicker: &time.Ticker{},
  184. }
  185. s.applyV2 = &applierV2store{store: s.v2store, cluster: s.cluster}
  186. s.start()
  187. req := &pb.Request{Method: "QGET", ID: uint64(1)}
  188. ents := []raftpb.Entry{{Index: 1, Data: pbutil.MustMarshal(req)}}
  189. n.readyc <- raft.Ready{CommittedEntries: ents}
  190. // dup msg
  191. n.readyc <- raft.Ready{CommittedEntries: ents}
  192. // use a conf change to block until dup msgs are all processed
  193. cc := &raftpb.ConfChange{Type: raftpb.ConfChangeRemoveNode, NodeID: 2}
  194. ents = []raftpb.Entry{{
  195. Index: 2,
  196. Type: raftpb.EntryConfChange,
  197. Data: pbutil.MustMarshal(cc),
  198. }}
  199. n.readyc <- raft.Ready{CommittedEntries: ents}
  200. // wait for conf change message
  201. act, err := n.Wait(1)
  202. // wait for stop message (async to avoid deadlock)
  203. stopc := make(chan error)
  204. go func() {
  205. _, werr := n.Wait(1)
  206. stopc <- werr
  207. }()
  208. s.Stop()
  209. // only want to confirm etcdserver won't panic; no data to check
  210. if err != nil {
  211. t.Fatal(err)
  212. }
  213. if len(act) == 0 {
  214. t.Fatalf("expected len(act)=0, got %d", len(act))
  215. }
  216. if err = <-stopc; err != nil {
  217. t.Fatalf("error on stop (%v)", err)
  218. }
  219. }
  220. func TestApplyRequest(t *testing.T) {
  221. tests := []struct {
  222. req pb.Request
  223. wresp Response
  224. wactions []testutil.Action
  225. }{
  226. // POST ==> Create
  227. {
  228. pb.Request{Method: "POST", ID: 1},
  229. Response{Event: &v2store.Event{}},
  230. []testutil.Action{
  231. {
  232. Name: "Create",
  233. Params: []interface{}{"", false, "", true, v2store.TTLOptionSet{ExpireTime: time.Time{}}},
  234. },
  235. },
  236. },
  237. // POST ==> Create, with expiration
  238. {
  239. pb.Request{Method: "POST", ID: 1, Expiration: 1337},
  240. Response{Event: &v2store.Event{}},
  241. []testutil.Action{
  242. {
  243. Name: "Create",
  244. Params: []interface{}{"", false, "", true, v2store.TTLOptionSet{ExpireTime: time.Unix(0, 1337)}},
  245. },
  246. },
  247. },
  248. // POST ==> Create, with dir
  249. {
  250. pb.Request{Method: "POST", ID: 1, Dir: true},
  251. Response{Event: &v2store.Event{}},
  252. []testutil.Action{
  253. {
  254. Name: "Create",
  255. Params: []interface{}{"", true, "", true, v2store.TTLOptionSet{ExpireTime: time.Time{}}},
  256. },
  257. },
  258. },
  259. // PUT ==> Set
  260. {
  261. pb.Request{Method: "PUT", ID: 1},
  262. Response{Event: &v2store.Event{}},
  263. []testutil.Action{
  264. {
  265. Name: "Set",
  266. Params: []interface{}{"", false, "", v2store.TTLOptionSet{ExpireTime: time.Time{}}},
  267. },
  268. },
  269. },
  270. // PUT ==> Set, with dir
  271. {
  272. pb.Request{Method: "PUT", ID: 1, Dir: true},
  273. Response{Event: &v2store.Event{}},
  274. []testutil.Action{
  275. {
  276. Name: "Set",
  277. Params: []interface{}{"", true, "", v2store.TTLOptionSet{ExpireTime: time.Time{}}},
  278. },
  279. },
  280. },
  281. // PUT with PrevExist=true ==> Update
  282. {
  283. pb.Request{Method: "PUT", ID: 1, PrevExist: pbutil.Boolp(true)},
  284. Response{Event: &v2store.Event{}},
  285. []testutil.Action{
  286. {
  287. Name: "Update",
  288. Params: []interface{}{"", "", v2store.TTLOptionSet{ExpireTime: time.Time{}}},
  289. },
  290. },
  291. },
  292. // PUT with PrevExist=false ==> Create
  293. {
  294. pb.Request{Method: "PUT", ID: 1, PrevExist: pbutil.Boolp(false)},
  295. Response{Event: &v2store.Event{}},
  296. []testutil.Action{
  297. {
  298. Name: "Create",
  299. Params: []interface{}{"", false, "", false, v2store.TTLOptionSet{ExpireTime: time.Time{}}},
  300. },
  301. },
  302. },
  303. // PUT with PrevExist=true *and* PrevIndex set ==> CompareAndSwap
  304. {
  305. pb.Request{Method: "PUT", ID: 1, PrevExist: pbutil.Boolp(true), PrevIndex: 1},
  306. Response{Event: &v2store.Event{}},
  307. []testutil.Action{
  308. {
  309. Name: "CompareAndSwap",
  310. Params: []interface{}{"", "", uint64(1), "", v2store.TTLOptionSet{ExpireTime: time.Time{}}},
  311. },
  312. },
  313. },
  314. // PUT with PrevExist=false *and* PrevIndex set ==> Create
  315. {
  316. pb.Request{Method: "PUT", ID: 1, PrevExist: pbutil.Boolp(false), PrevIndex: 1},
  317. Response{Event: &v2store.Event{}},
  318. []testutil.Action{
  319. {
  320. Name: "Create",
  321. Params: []interface{}{"", false, "", false, v2store.TTLOptionSet{ExpireTime: time.Time{}}},
  322. },
  323. },
  324. },
  325. // PUT with PrevIndex set ==> CompareAndSwap
  326. {
  327. pb.Request{Method: "PUT", ID: 1, PrevIndex: 1},
  328. Response{Event: &v2store.Event{}},
  329. []testutil.Action{
  330. {
  331. Name: "CompareAndSwap",
  332. Params: []interface{}{"", "", uint64(1), "", v2store.TTLOptionSet{ExpireTime: time.Time{}}},
  333. },
  334. },
  335. },
  336. // PUT with PrevValue set ==> CompareAndSwap
  337. {
  338. pb.Request{Method: "PUT", ID: 1, PrevValue: "bar"},
  339. Response{Event: &v2store.Event{}},
  340. []testutil.Action{
  341. {
  342. Name: "CompareAndSwap",
  343. Params: []interface{}{"", "bar", uint64(0), "", v2store.TTLOptionSet{ExpireTime: time.Time{}}},
  344. },
  345. },
  346. },
  347. // PUT with PrevIndex and PrevValue set ==> CompareAndSwap
  348. {
  349. pb.Request{Method: "PUT", ID: 1, PrevIndex: 1, PrevValue: "bar"},
  350. Response{Event: &v2store.Event{}},
  351. []testutil.Action{
  352. {
  353. Name: "CompareAndSwap",
  354. Params: []interface{}{"", "bar", uint64(1), "", v2store.TTLOptionSet{ExpireTime: time.Time{}}},
  355. },
  356. },
  357. },
  358. // DELETE ==> Delete
  359. {
  360. pb.Request{Method: "DELETE", ID: 1},
  361. Response{Event: &v2store.Event{}},
  362. []testutil.Action{
  363. {
  364. Name: "Delete",
  365. Params: []interface{}{"", false, false},
  366. },
  367. },
  368. },
  369. // DELETE with PrevIndex set ==> CompareAndDelete
  370. {
  371. pb.Request{Method: "DELETE", ID: 1, PrevIndex: 1},
  372. Response{Event: &v2store.Event{}},
  373. []testutil.Action{
  374. {
  375. Name: "CompareAndDelete",
  376. Params: []interface{}{"", "", uint64(1)},
  377. },
  378. },
  379. },
  380. // DELETE with PrevValue set ==> CompareAndDelete
  381. {
  382. pb.Request{Method: "DELETE", ID: 1, PrevValue: "bar"},
  383. Response{Event: &v2store.Event{}},
  384. []testutil.Action{
  385. {
  386. Name: "CompareAndDelete",
  387. Params: []interface{}{"", "bar", uint64(0)},
  388. },
  389. },
  390. },
  391. // DELETE with PrevIndex *and* PrevValue set ==> CompareAndDelete
  392. {
  393. pb.Request{Method: "DELETE", ID: 1, PrevIndex: 5, PrevValue: "bar"},
  394. Response{Event: &v2store.Event{}},
  395. []testutil.Action{
  396. {
  397. Name: "CompareAndDelete",
  398. Params: []interface{}{"", "bar", uint64(5)},
  399. },
  400. },
  401. },
  402. // QGET ==> Get
  403. {
  404. pb.Request{Method: "QGET", ID: 1},
  405. Response{Event: &v2store.Event{}},
  406. []testutil.Action{
  407. {
  408. Name: "Get",
  409. Params: []interface{}{"", false, false},
  410. },
  411. },
  412. },
  413. // SYNC ==> DeleteExpiredKeys
  414. {
  415. pb.Request{Method: "SYNC", ID: 1},
  416. Response{},
  417. []testutil.Action{
  418. {
  419. Name: "DeleteExpiredKeys",
  420. Params: []interface{}{time.Unix(0, 0)},
  421. },
  422. },
  423. },
  424. {
  425. pb.Request{Method: "SYNC", ID: 1, Time: 12345},
  426. Response{},
  427. []testutil.Action{
  428. {
  429. Name: "DeleteExpiredKeys",
  430. Params: []interface{}{time.Unix(0, 12345)},
  431. },
  432. },
  433. },
  434. // Unknown method - error
  435. {
  436. pb.Request{Method: "BADMETHOD", ID: 1},
  437. Response{Err: ErrUnknownMethod},
  438. []testutil.Action{},
  439. },
  440. }
  441. for i, tt := range tests {
  442. st := mockstore.NewRecorder()
  443. srv := &EtcdServer{
  444. lgMu: new(sync.RWMutex),
  445. lg: zap.NewExample(),
  446. v2store: st,
  447. }
  448. srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}
  449. resp := srv.applyV2Request((*RequestV2)(&tt.req))
  450. if !reflect.DeepEqual(resp, tt.wresp) {
  451. t.Errorf("#%d: resp = %+v, want %+v", i, resp, tt.wresp)
  452. }
  453. gaction := st.Action()
  454. if !reflect.DeepEqual(gaction, tt.wactions) {
  455. t.Errorf("#%d: action = %#v, want %#v", i, gaction, tt.wactions)
  456. }
  457. }
  458. }
  459. func TestApplyRequestOnAdminMemberAttributes(t *testing.T) {
  460. cl := newTestCluster([]*membership.Member{{ID: 1}})
  461. srv := &EtcdServer{
  462. lgMu: new(sync.RWMutex),
  463. lg: zap.NewExample(),
  464. v2store: mockstore.NewRecorder(),
  465. cluster: cl,
  466. }
  467. srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}
  468. req := pb.Request{
  469. Method: "PUT",
  470. ID: 1,
  471. Path: membership.MemberAttributesStorePath(1),
  472. Val: `{"Name":"abc","ClientURLs":["http://127.0.0.1:2379"]}`,
  473. }
  474. srv.applyV2Request((*RequestV2)(&req))
  475. w := membership.Attributes{Name: "abc", ClientURLs: []string{"http://127.0.0.1:2379"}}
  476. if g := cl.Member(1).Attributes; !reflect.DeepEqual(g, w) {
  477. t.Errorf("attributes = %v, want %v", g, w)
  478. }
  479. }
  480. func TestApplyConfChangeError(t *testing.T) {
  481. cl := membership.NewCluster(zap.NewExample(), "")
  482. cl.SetStore(v2store.New())
  483. for i := 1; i <= 4; i++ {
  484. cl.AddMember(&membership.Member{ID: types.ID(i)})
  485. }
  486. cl.RemoveMember(4)
  487. tests := []struct {
  488. cc raftpb.ConfChange
  489. werr error
  490. }{
  491. {
  492. raftpb.ConfChange{
  493. Type: raftpb.ConfChangeAddNode,
  494. NodeID: 4,
  495. },
  496. membership.ErrIDRemoved,
  497. },
  498. {
  499. raftpb.ConfChange{
  500. Type: raftpb.ConfChangeUpdateNode,
  501. NodeID: 4,
  502. },
  503. membership.ErrIDRemoved,
  504. },
  505. {
  506. raftpb.ConfChange{
  507. Type: raftpb.ConfChangeAddNode,
  508. NodeID: 1,
  509. },
  510. membership.ErrIDExists,
  511. },
  512. {
  513. raftpb.ConfChange{
  514. Type: raftpb.ConfChangeRemoveNode,
  515. NodeID: 5,
  516. },
  517. membership.ErrIDNotFound,
  518. },
  519. }
  520. for i, tt := range tests {
  521. n := newNodeRecorder()
  522. srv := &EtcdServer{
  523. lgMu: new(sync.RWMutex),
  524. lg: zap.NewExample(),
  525. r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: n}),
  526. cluster: cl,
  527. }
  528. _, err := srv.applyConfChange(tt.cc, nil)
  529. if err != tt.werr {
  530. t.Errorf("#%d: applyConfChange error = %v, want %v", i, err, tt.werr)
  531. }
  532. cc := raftpb.ConfChange{Type: tt.cc.Type, NodeID: raft.None}
  533. w := []testutil.Action{
  534. {
  535. Name: "ApplyConfChange",
  536. Params: []interface{}{cc},
  537. },
  538. }
  539. if g, _ := n.Wait(1); !reflect.DeepEqual(g, w) {
  540. t.Errorf("#%d: action = %+v, want %+v", i, g, w)
  541. }
  542. }
  543. }
  544. func TestApplyConfChangeShouldStop(t *testing.T) {
  545. cl := membership.NewCluster(zap.NewExample(), "")
  546. cl.SetStore(v2store.New())
  547. for i := 1; i <= 3; i++ {
  548. cl.AddMember(&membership.Member{ID: types.ID(i)})
  549. }
  550. r := newRaftNode(raftNodeConfig{
  551. lg: zap.NewExample(),
  552. Node: newNodeNop(),
  553. transport: rafthttp.NewNopTransporter(),
  554. })
  555. srv := &EtcdServer{
  556. lgMu: new(sync.RWMutex),
  557. lg: zap.NewExample(),
  558. id: 1,
  559. r: *r,
  560. cluster: cl,
  561. }
  562. cc := raftpb.ConfChange{
  563. Type: raftpb.ConfChangeRemoveNode,
  564. NodeID: 2,
  565. }
  566. // remove non-local member
  567. shouldStop, err := srv.applyConfChange(cc, &raftpb.ConfState{})
  568. if err != nil {
  569. t.Fatalf("unexpected error %v", err)
  570. }
  571. if shouldStop {
  572. t.Errorf("shouldStop = %t, want %t", shouldStop, false)
  573. }
  574. // remove local member
  575. cc.NodeID = 1
  576. shouldStop, err = srv.applyConfChange(cc, &raftpb.ConfState{})
  577. if err != nil {
  578. t.Fatalf("unexpected error %v", err)
  579. }
  580. if !shouldStop {
  581. t.Errorf("shouldStop = %t, want %t", shouldStop, true)
  582. }
  583. }
  584. // TestApplyConfigChangeUpdatesConsistIndex ensures a config change also updates the consistIndex
  585. // where consistIndex equals to applied index.
  586. func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {
  587. cl := membership.NewCluster(zap.NewExample(), "")
  588. cl.SetStore(v2store.New())
  589. cl.AddMember(&membership.Member{ID: types.ID(1)})
  590. r := newRaftNode(raftNodeConfig{
  591. lg: zap.NewExample(),
  592. Node: newNodeNop(),
  593. transport: rafthttp.NewNopTransporter(),
  594. })
  595. srv := &EtcdServer{
  596. lgMu: new(sync.RWMutex),
  597. lg: zap.NewExample(),
  598. id: 1,
  599. r: *r,
  600. cluster: cl,
  601. w: wait.New(),
  602. }
  603. // create EntryConfChange entry
  604. now := time.Now()
  605. urls, err := types.NewURLs([]string{"http://whatever:123"})
  606. if err != nil {
  607. t.Fatal(err)
  608. }
  609. m := membership.NewMember("", urls, "", &now)
  610. m.ID = types.ID(2)
  611. b, err := json.Marshal(m)
  612. if err != nil {
  613. t.Fatal(err)
  614. }
  615. cc := &raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 2, Context: b}
  616. ents := []raftpb.Entry{{
  617. Index: 2,
  618. Type: raftpb.EntryConfChange,
  619. Data: pbutil.MustMarshal(cc),
  620. }}
  621. _, appliedi, _ := srv.apply(ents, &raftpb.ConfState{})
  622. consistIndex := srv.consistIndex.ConsistentIndex()
  623. if consistIndex != appliedi {
  624. t.Fatalf("consistIndex = %v, want %v", consistIndex, appliedi)
  625. }
  626. }
  627. // TestApplyMultiConfChangeShouldStop ensures that apply will return shouldStop
  628. // if the local member is removed along with other conf updates.
  629. func TestApplyMultiConfChangeShouldStop(t *testing.T) {
  630. cl := membership.NewCluster(zap.NewExample(), "")
  631. cl.SetStore(v2store.New())
  632. for i := 1; i <= 5; i++ {
  633. cl.AddMember(&membership.Member{ID: types.ID(i)})
  634. }
  635. r := newRaftNode(raftNodeConfig{
  636. lg: zap.NewExample(),
  637. Node: newNodeNop(),
  638. transport: rafthttp.NewNopTransporter(),
  639. })
  640. srv := &EtcdServer{
  641. lgMu: new(sync.RWMutex),
  642. lg: zap.NewExample(),
  643. id: 2,
  644. r: *r,
  645. cluster: cl,
  646. w: wait.New(),
  647. }
  648. ents := []raftpb.Entry{}
  649. for i := 1; i <= 4; i++ {
  650. ent := raftpb.Entry{
  651. Term: 1,
  652. Index: uint64(i),
  653. Type: raftpb.EntryConfChange,
  654. Data: pbutil.MustMarshal(
  655. &raftpb.ConfChange{
  656. Type: raftpb.ConfChangeRemoveNode,
  657. NodeID: uint64(i)}),
  658. }
  659. ents = append(ents, ent)
  660. }
  661. _, _, shouldStop := srv.apply(ents, &raftpb.ConfState{})
  662. if !shouldStop {
  663. t.Errorf("shouldStop = %t, want %t", shouldStop, true)
  664. }
  665. }
  666. func TestDoProposal(t *testing.T) {
  667. tests := []pb.Request{
  668. {Method: "POST", ID: 1},
  669. {Method: "PUT", ID: 1},
  670. {Method: "DELETE", ID: 1},
  671. {Method: "GET", ID: 1, Quorum: true},
  672. }
  673. for i, tt := range tests {
  674. st := mockstore.NewRecorder()
  675. r := newRaftNode(raftNodeConfig{
  676. lg: zap.NewExample(),
  677. Node: newNodeCommitter(),
  678. storage: mockstorage.NewStorageRecorder(""),
  679. raftStorage: raft.NewMemoryStorage(),
  680. transport: rafthttp.NewNopTransporter(),
  681. })
  682. srv := &EtcdServer{
  683. lgMu: new(sync.RWMutex),
  684. lg: zap.NewExample(),
  685. Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1},
  686. r: *r,
  687. v2store: st,
  688. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  689. SyncTicker: &time.Ticker{},
  690. }
  691. srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}
  692. srv.start()
  693. resp, err := srv.Do(context.Background(), tt)
  694. srv.Stop()
  695. action := st.Action()
  696. if len(action) != 1 {
  697. t.Errorf("#%d: len(action) = %d, want 1", i, len(action))
  698. }
  699. if err != nil {
  700. t.Fatalf("#%d: err = %v, want nil", i, err)
  701. }
  702. // resp.Index is set in Do() based on the raft state; may either be 0 or 1
  703. wresp := Response{Event: &v2store.Event{}, Index: resp.Index}
  704. if !reflect.DeepEqual(resp, wresp) {
  705. t.Errorf("#%d: resp = %v, want %v", i, resp, wresp)
  706. }
  707. }
  708. }
  709. func TestDoProposalCancelled(t *testing.T) {
  710. wt := mockwait.NewRecorder()
  711. srv := &EtcdServer{
  712. lgMu: new(sync.RWMutex),
  713. lg: zap.NewExample(),
  714. Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1},
  715. r: *newRaftNode(raftNodeConfig{Node: newNodeNop()}),
  716. w: wt,
  717. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  718. }
  719. srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}
  720. ctx, cancel := context.WithCancel(context.Background())
  721. cancel()
  722. _, err := srv.Do(ctx, pb.Request{Method: "PUT"})
  723. if err != ErrCanceled {
  724. t.Fatalf("err = %v, want %v", err, ErrCanceled)
  725. }
  726. w := []testutil.Action{{Name: "Register"}, {Name: "Trigger"}}
  727. if !reflect.DeepEqual(wt.Action(), w) {
  728. t.Errorf("wt.action = %+v, want %+v", wt.Action(), w)
  729. }
  730. }
  731. func TestDoProposalTimeout(t *testing.T) {
  732. srv := &EtcdServer{
  733. lgMu: new(sync.RWMutex),
  734. lg: zap.NewExample(),
  735. Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1},
  736. r: *newRaftNode(raftNodeConfig{Node: newNodeNop()}),
  737. w: mockwait.NewNop(),
  738. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  739. }
  740. srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}
  741. ctx, cancel := context.WithTimeout(context.Background(), 0)
  742. _, err := srv.Do(ctx, pb.Request{Method: "PUT"})
  743. cancel()
  744. if err != ErrTimeout {
  745. t.Fatalf("err = %v, want %v", err, ErrTimeout)
  746. }
  747. }
  748. func TestDoProposalStopped(t *testing.T) {
  749. srv := &EtcdServer{
  750. lgMu: new(sync.RWMutex),
  751. lg: zap.NewExample(),
  752. Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1},
  753. r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: newNodeNop()}),
  754. w: mockwait.NewNop(),
  755. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  756. }
  757. srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}
  758. srv.stopping = make(chan struct{})
  759. close(srv.stopping)
  760. _, err := srv.Do(context.Background(), pb.Request{Method: "PUT", ID: 1})
  761. if err != ErrStopped {
  762. t.Errorf("err = %v, want %v", err, ErrStopped)
  763. }
  764. }
  765. // TestSync tests sync 1. is nonblocking 2. proposes SYNC request.
  766. func TestSync(t *testing.T) {
  767. n := newNodeRecorder()
  768. ctx, cancel := context.WithCancel(context.TODO())
  769. srv := &EtcdServer{
  770. lgMu: new(sync.RWMutex),
  771. lg: zap.NewExample(),
  772. r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: n}),
  773. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  774. ctx: ctx,
  775. cancel: cancel,
  776. }
  777. srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}
  778. // check that sync is non-blocking
  779. done := make(chan struct{})
  780. go func() {
  781. srv.sync(10 * time.Second)
  782. done <- struct{}{}
  783. }()
  784. select {
  785. case <-done:
  786. case <-time.After(time.Second):
  787. t.Fatal("sync should be non-blocking but did not return after 1s!")
  788. }
  789. action, _ := n.Wait(1)
  790. if len(action) != 1 {
  791. t.Fatalf("len(action) = %d, want 1", len(action))
  792. }
  793. if action[0].Name != "Propose" {
  794. t.Fatalf("action = %s, want Propose", action[0].Name)
  795. }
  796. data := action[0].Params[0].([]byte)
  797. var r pb.Request
  798. if err := r.Unmarshal(data); err != nil {
  799. t.Fatalf("unmarshal request error: %v", err)
  800. }
  801. if r.Method != "SYNC" {
  802. t.Errorf("method = %s, want SYNC", r.Method)
  803. }
  804. }
  805. // TestSyncTimeout tests the case that sync 1. is non-blocking 2. cancel request
  806. // after timeout
  807. func TestSyncTimeout(t *testing.T) {
  808. n := newProposalBlockerRecorder()
  809. ctx, cancel := context.WithCancel(context.TODO())
  810. srv := &EtcdServer{
  811. lgMu: new(sync.RWMutex),
  812. lg: zap.NewExample(),
  813. r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: n}),
  814. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  815. ctx: ctx,
  816. cancel: cancel,
  817. }
  818. srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}
  819. // check that sync is non-blocking
  820. done := make(chan struct{})
  821. go func() {
  822. srv.sync(0)
  823. done <- struct{}{}
  824. }()
  825. select {
  826. case <-done:
  827. case <-time.After(time.Second):
  828. t.Fatal("sync should be non-blocking but did not return after 1s!")
  829. }
  830. w := []testutil.Action{{Name: "Propose blocked"}}
  831. if g, _ := n.Wait(1); !reflect.DeepEqual(g, w) {
  832. t.Errorf("action = %v, want %v", g, w)
  833. }
  834. }
  835. // TODO: TestNoSyncWhenNoLeader
  836. // TestSyncTrigger tests that the server proposes a SYNC request when its sync timer ticks
  837. func TestSyncTrigger(t *testing.T) {
  838. n := newReadyNode()
  839. st := make(chan time.Time, 1)
  840. tk := &time.Ticker{C: st}
  841. r := newRaftNode(raftNodeConfig{
  842. lg: zap.NewExample(),
  843. Node: n,
  844. raftStorage: raft.NewMemoryStorage(),
  845. transport: rafthttp.NewNopTransporter(),
  846. storage: mockstorage.NewStorageRecorder(""),
  847. })
  848. srv := &EtcdServer{
  849. lgMu: new(sync.RWMutex),
  850. lg: zap.NewExample(),
  851. Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1},
  852. r: *r,
  853. v2store: mockstore.NewNop(),
  854. SyncTicker: tk,
  855. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  856. }
  857. // trigger the server to become a leader and accept sync requests
  858. go func() {
  859. srv.start()
  860. n.readyc <- raft.Ready{
  861. SoftState: &raft.SoftState{
  862. RaftState: raft.StateLeader,
  863. },
  864. }
  865. // trigger a sync request
  866. st <- time.Time{}
  867. }()
  868. action, _ := n.Wait(1)
  869. go srv.Stop()
  870. if len(action) != 1 {
  871. t.Fatalf("len(action) = %d, want 1", len(action))
  872. }
  873. if action[0].Name != "Propose" {
  874. t.Fatalf("action = %s, want Propose", action[0].Name)
  875. }
  876. data := action[0].Params[0].([]byte)
  877. var req pb.Request
  878. if err := req.Unmarshal(data); err != nil {
  879. t.Fatalf("error unmarshalling data: %v", err)
  880. }
  881. if req.Method != "SYNC" {
  882. t.Fatalf("unexpected proposed request: %#v", req.Method)
  883. }
  884. // wait on stop message
  885. <-n.Chan()
  886. }
  887. // snapshot should snapshot the store and cut the persistent
  888. func TestSnapshot(t *testing.T) {
  889. be, tmpPath := backend.NewDefaultTmpBackend()
  890. defer func() {
  891. os.RemoveAll(tmpPath)
  892. }()
  893. s := raft.NewMemoryStorage()
  894. s.Append([]raftpb.Entry{{Index: 1}})
  895. st := mockstore.NewRecorderStream()
  896. p := mockstorage.NewStorageRecorderStream("")
  897. r := newRaftNode(raftNodeConfig{
  898. lg: zap.NewExample(),
  899. Node: newNodeNop(),
  900. raftStorage: s,
  901. storage: p,
  902. })
  903. srv := &EtcdServer{
  904. lgMu: new(sync.RWMutex),
  905. lg: zap.NewExample(),
  906. r: *r,
  907. v2store: st,
  908. }
  909. srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &srv.consistIndex)
  910. srv.be = be
  911. ch := make(chan struct{}, 2)
  912. go func() {
  913. gaction, _ := p.Wait(1)
  914. defer func() { ch <- struct{}{} }()
  915. if len(gaction) != 1 {
  916. t.Fatalf("len(action) = %d, want 1", len(gaction))
  917. }
  918. if !reflect.DeepEqual(gaction[0], testutil.Action{Name: "SaveSnap"}) {
  919. t.Errorf("action = %s, want SaveSnap", gaction[0])
  920. }
  921. }()
  922. go func() {
  923. gaction, _ := st.Wait(2)
  924. defer func() { ch <- struct{}{} }()
  925. if len(gaction) != 2 {
  926. t.Fatalf("len(action) = %d, want 2", len(gaction))
  927. }
  928. if !reflect.DeepEqual(gaction[0], testutil.Action{Name: "Clone"}) {
  929. t.Errorf("action = %s, want Clone", gaction[0])
  930. }
  931. if !reflect.DeepEqual(gaction[1], testutil.Action{Name: "SaveNoCopy"}) {
  932. t.Errorf("action = %s, want SaveNoCopy", gaction[1])
  933. }
  934. }()
  935. srv.snapshot(1, raftpb.ConfState{Nodes: []uint64{1}})
  936. <-ch
  937. <-ch
  938. }
  939. // TestSnapshotOrdering ensures raft persists snapshot onto disk before
  940. // snapshot db is applied.
  941. func TestSnapshotOrdering(t *testing.T) {
  942. n := newNopReadyNode()
  943. st := v2store.New()
  944. cl := membership.NewCluster(zap.NewExample(), "abc")
  945. cl.SetStore(st)
  946. testdir, err := ioutil.TempDir(os.TempDir(), "testsnapdir")
  947. if err != nil {
  948. t.Fatalf("couldn't open tempdir (%v)", err)
  949. }
  950. defer os.RemoveAll(testdir)
  951. snapdir := filepath.Join(testdir, "member", "snap")
  952. if err := os.MkdirAll(snapdir, 0755); err != nil {
  953. t.Fatalf("couldn't make snap dir (%v)", err)
  954. }
  955. rs := raft.NewMemoryStorage()
  956. p := mockstorage.NewStorageRecorderStream(testdir)
  957. tr, snapDoneC := rafthttp.NewSnapTransporter(snapdir)
  958. r := newRaftNode(raftNodeConfig{
  959. lg: zap.NewExample(),
  960. isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
  961. Node: n,
  962. transport: tr,
  963. storage: p,
  964. raftStorage: rs,
  965. })
  966. s := &EtcdServer{
  967. lgMu: new(sync.RWMutex),
  968. lg: zap.NewExample(),
  969. Cfg: ServerConfig{Logger: zap.NewExample(), DataDir: testdir},
  970. r: *r,
  971. v2store: st,
  972. snapshotter: raftsnap.New(zap.NewExample(), snapdir),
  973. cluster: cl,
  974. SyncTicker: &time.Ticker{},
  975. }
  976. s.applyV2 = &applierV2store{store: s.v2store, cluster: s.cluster}
  977. be, tmpPath := backend.NewDefaultTmpBackend()
  978. defer os.RemoveAll(tmpPath)
  979. s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &s.consistIndex)
  980. s.be = be
  981. s.start()
  982. defer s.Stop()
  983. n.readyc <- raft.Ready{Messages: []raftpb.Message{{Type: raftpb.MsgSnap}}}
  984. go func() {
  985. // get the snapshot sent by the transport
  986. snapMsg := <-snapDoneC
  987. // Snapshot first triggers raftnode to persists the snapshot onto disk
  988. // before renaming db snapshot file to db
  989. snapMsg.Snapshot.Metadata.Index = 1
  990. n.readyc <- raft.Ready{Snapshot: snapMsg.Snapshot}
  991. }()
  992. if ac := <-p.Chan(); ac.Name != "Save" {
  993. t.Fatalf("expected Save, got %+v", ac)
  994. }
  995. if ac := <-p.Chan(); ac.Name != "Save" {
  996. t.Fatalf("expected Save, got %+v", ac)
  997. }
  998. // confirm snapshot file still present before calling SaveSnap
  999. snapPath := filepath.Join(snapdir, fmt.Sprintf("%016x.snap.db", 1))
  1000. if !fileutil.Exist(snapPath) {
  1001. t.Fatalf("expected file %q, got missing", snapPath)
  1002. }
  1003. // unblock SaveSnapshot, etcdserver now permitted to move snapshot file
  1004. if ac := <-p.Chan(); ac.Name != "SaveSnap" {
  1005. t.Fatalf("expected SaveSnap, got %+v", ac)
  1006. }
  1007. }
  1008. // Applied > SnapCount should trigger a SaveSnap event
  1009. func TestTriggerSnap(t *testing.T) {
  1010. be, tmpPath := backend.NewDefaultTmpBackend()
  1011. defer func() {
  1012. os.RemoveAll(tmpPath)
  1013. }()
  1014. snapc := 10
  1015. st := mockstore.NewRecorder()
  1016. p := mockstorage.NewStorageRecorderStream("")
  1017. r := newRaftNode(raftNodeConfig{
  1018. lg: zap.NewExample(),
  1019. Node: newNodeCommitter(),
  1020. raftStorage: raft.NewMemoryStorage(),
  1021. storage: p,
  1022. transport: rafthttp.NewNopTransporter(),
  1023. })
  1024. srv := &EtcdServer{
  1025. lgMu: new(sync.RWMutex),
  1026. lg: zap.NewExample(),
  1027. Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapCount: uint64(snapc)},
  1028. r: *r,
  1029. v2store: st,
  1030. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  1031. SyncTicker: &time.Ticker{},
  1032. }
  1033. srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}
  1034. srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &srv.consistIndex)
  1035. srv.be = be
  1036. srv.start()
  1037. donec := make(chan struct{})
  1038. go func() {
  1039. wcnt := 2 + snapc
  1040. gaction, _ := p.Wait(wcnt)
  1041. // each operation is recorded as a Save
  1042. // (SnapCount+1) * Puts + SaveSnap = (SnapCount+1) * Save + SaveSnap
  1043. if len(gaction) != wcnt {
  1044. t.Fatalf("len(action) = %d, want %d", len(gaction), wcnt)
  1045. }
  1046. if !reflect.DeepEqual(gaction[wcnt-1], testutil.Action{Name: "SaveSnap"}) {
  1047. t.Errorf("action = %s, want SaveSnap", gaction[wcnt-1])
  1048. }
  1049. close(donec)
  1050. }()
  1051. for i := 0; i < snapc+1; i++ {
  1052. srv.Do(context.Background(), pb.Request{Method: "PUT"})
  1053. }
  1054. <-donec
  1055. srv.Stop()
  1056. }
  1057. // TestConcurrentApplyAndSnapshotV3 will send out snapshots concurrently with
  1058. // proposals.
  1059. func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
  1060. n := newNopReadyNode()
  1061. st := v2store.New()
  1062. cl := membership.NewCluster(zap.NewExample(), "abc")
  1063. cl.SetStore(st)
  1064. testdir, err := ioutil.TempDir(os.TempDir(), "testsnapdir")
  1065. if err != nil {
  1066. t.Fatalf("Couldn't open tempdir (%v)", err)
  1067. }
  1068. defer os.RemoveAll(testdir)
  1069. if err := os.MkdirAll(testdir+"/member/snap", 0755); err != nil {
  1070. t.Fatalf("Couldn't make snap dir (%v)", err)
  1071. }
  1072. rs := raft.NewMemoryStorage()
  1073. tr, snapDoneC := rafthttp.NewSnapTransporter(testdir)
  1074. r := newRaftNode(raftNodeConfig{
  1075. lg: zap.NewExample(),
  1076. isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
  1077. Node: n,
  1078. transport: tr,
  1079. storage: mockstorage.NewStorageRecorder(testdir),
  1080. raftStorage: rs,
  1081. })
  1082. s := &EtcdServer{
  1083. lgMu: new(sync.RWMutex),
  1084. lg: zap.NewExample(),
  1085. Cfg: ServerConfig{Logger: zap.NewExample(), DataDir: testdir},
  1086. r: *r,
  1087. v2store: st,
  1088. snapshotter: raftsnap.New(zap.NewExample(), testdir),
  1089. cluster: cl,
  1090. SyncTicker: &time.Ticker{},
  1091. }
  1092. s.applyV2 = &applierV2store{store: s.v2store, cluster: s.cluster}
  1093. be, tmpPath := backend.NewDefaultTmpBackend()
  1094. defer func() {
  1095. os.RemoveAll(tmpPath)
  1096. }()
  1097. s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &s.consistIndex)
  1098. s.be = be
  1099. s.start()
  1100. defer s.Stop()
  1101. // submit applied entries and snap entries
  1102. idx := uint64(0)
  1103. outdated := 0
  1104. accepted := 0
  1105. for k := 1; k <= 101; k++ {
  1106. idx++
  1107. ch := s.w.Register(uint64(idx))
  1108. req := &pb.Request{Method: "QGET", ID: uint64(idx)}
  1109. ent := raftpb.Entry{Index: uint64(idx), Data: pbutil.MustMarshal(req)}
  1110. ready := raft.Ready{Entries: []raftpb.Entry{ent}}
  1111. n.readyc <- ready
  1112. ready = raft.Ready{CommittedEntries: []raftpb.Entry{ent}}
  1113. n.readyc <- ready
  1114. // "idx" applied
  1115. <-ch
  1116. // one snapshot for every two messages
  1117. if k%2 != 0 {
  1118. continue
  1119. }
  1120. n.readyc <- raft.Ready{Messages: []raftpb.Message{{Type: raftpb.MsgSnap}}}
  1121. // get the snapshot sent by the transport
  1122. snapMsg := <-snapDoneC
  1123. // If the snapshot trails applied records, recovery will panic
  1124. // since there's no allocated snapshot at the place of the
  1125. // snapshot record. This only happens when the applier and the
  1126. // snapshot sender get out of sync.
  1127. if snapMsg.Snapshot.Metadata.Index == idx {
  1128. idx++
  1129. snapMsg.Snapshot.Metadata.Index = idx
  1130. ready = raft.Ready{Snapshot: snapMsg.Snapshot}
  1131. n.readyc <- ready
  1132. accepted++
  1133. } else {
  1134. outdated++
  1135. }
  1136. // don't wait for the snapshot to complete, move to next message
  1137. }
  1138. if accepted != 50 {
  1139. t.Errorf("accepted=%v, want 50", accepted)
  1140. }
  1141. if outdated != 0 {
  1142. t.Errorf("outdated=%v, want 0", outdated)
  1143. }
  1144. }
  1145. // TestAddMember tests AddMember can propose and perform node addition.
  1146. func TestAddMember(t *testing.T) {
  1147. n := newNodeConfChangeCommitterRecorder()
  1148. n.readyc <- raft.Ready{
  1149. SoftState: &raft.SoftState{RaftState: raft.StateLeader},
  1150. }
  1151. cl := newTestCluster(nil)
  1152. st := v2store.New()
  1153. cl.SetStore(st)
  1154. r := newRaftNode(raftNodeConfig{
  1155. lg: zap.NewExample(),
  1156. Node: n,
  1157. raftStorage: raft.NewMemoryStorage(),
  1158. storage: mockstorage.NewStorageRecorder(""),
  1159. transport: rafthttp.NewNopTransporter(),
  1160. })
  1161. s := &EtcdServer{
  1162. lgMu: new(sync.RWMutex),
  1163. lg: zap.NewExample(),
  1164. r: *r,
  1165. v2store: st,
  1166. cluster: cl,
  1167. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  1168. SyncTicker: &time.Ticker{},
  1169. }
  1170. s.start()
  1171. m := membership.Member{ID: 1234, RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"foo"}}}
  1172. _, err := s.AddMember(context.TODO(), m)
  1173. gaction := n.Action()
  1174. s.Stop()
  1175. if err != nil {
  1176. t.Fatalf("AddMember error: %v", err)
  1177. }
  1178. wactions := []testutil.Action{{Name: "ProposeConfChange:ConfChangeAddNode"}, {Name: "ApplyConfChange:ConfChangeAddNode"}}
  1179. if !reflect.DeepEqual(gaction, wactions) {
  1180. t.Errorf("action = %v, want %v", gaction, wactions)
  1181. }
  1182. if cl.Member(1234) == nil {
  1183. t.Errorf("member with id 1234 is not added")
  1184. }
  1185. }
  1186. // TestRemoveMember tests RemoveMember can propose and perform node removal.
  1187. func TestRemoveMember(t *testing.T) {
  1188. n := newNodeConfChangeCommitterRecorder()
  1189. n.readyc <- raft.Ready{
  1190. SoftState: &raft.SoftState{RaftState: raft.StateLeader},
  1191. }
  1192. cl := newTestCluster(nil)
  1193. st := v2store.New()
  1194. cl.SetStore(v2store.New())
  1195. cl.AddMember(&membership.Member{ID: 1234})
  1196. r := newRaftNode(raftNodeConfig{
  1197. lg: zap.NewExample(),
  1198. Node: n,
  1199. raftStorage: raft.NewMemoryStorage(),
  1200. storage: mockstorage.NewStorageRecorder(""),
  1201. transport: rafthttp.NewNopTransporter(),
  1202. })
  1203. s := &EtcdServer{
  1204. lgMu: new(sync.RWMutex),
  1205. lg: zap.NewExample(),
  1206. r: *r,
  1207. v2store: st,
  1208. cluster: cl,
  1209. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  1210. SyncTicker: &time.Ticker{},
  1211. }
  1212. s.start()
  1213. _, err := s.RemoveMember(context.TODO(), 1234)
  1214. gaction := n.Action()
  1215. s.Stop()
  1216. if err != nil {
  1217. t.Fatalf("RemoveMember error: %v", err)
  1218. }
  1219. wactions := []testutil.Action{{Name: "ProposeConfChange:ConfChangeRemoveNode"}, {Name: "ApplyConfChange:ConfChangeRemoveNode"}}
  1220. if !reflect.DeepEqual(gaction, wactions) {
  1221. t.Errorf("action = %v, want %v", gaction, wactions)
  1222. }
  1223. if cl.Member(1234) != nil {
  1224. t.Errorf("member with id 1234 is not removed")
  1225. }
  1226. }
  1227. // TestUpdateMember tests RemoveMember can propose and perform node update.
  1228. func TestUpdateMember(t *testing.T) {
  1229. n := newNodeConfChangeCommitterRecorder()
  1230. n.readyc <- raft.Ready{
  1231. SoftState: &raft.SoftState{RaftState: raft.StateLeader},
  1232. }
  1233. cl := newTestCluster(nil)
  1234. st := v2store.New()
  1235. cl.SetStore(st)
  1236. cl.AddMember(&membership.Member{ID: 1234})
  1237. r := newRaftNode(raftNodeConfig{
  1238. lg: zap.NewExample(),
  1239. Node: n,
  1240. raftStorage: raft.NewMemoryStorage(),
  1241. storage: mockstorage.NewStorageRecorder(""),
  1242. transport: rafthttp.NewNopTransporter(),
  1243. })
  1244. s := &EtcdServer{
  1245. lgMu: new(sync.RWMutex),
  1246. lg: zap.NewExample(),
  1247. r: *r,
  1248. v2store: st,
  1249. cluster: cl,
  1250. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  1251. SyncTicker: &time.Ticker{},
  1252. }
  1253. s.start()
  1254. wm := membership.Member{ID: 1234, RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"http://127.0.0.1:1"}}}
  1255. _, err := s.UpdateMember(context.TODO(), wm)
  1256. gaction := n.Action()
  1257. s.Stop()
  1258. if err != nil {
  1259. t.Fatalf("UpdateMember error: %v", err)
  1260. }
  1261. wactions := []testutil.Action{{Name: "ProposeConfChange:ConfChangeUpdateNode"}, {Name: "ApplyConfChange:ConfChangeUpdateNode"}}
  1262. if !reflect.DeepEqual(gaction, wactions) {
  1263. t.Errorf("action = %v, want %v", gaction, wactions)
  1264. }
  1265. if !reflect.DeepEqual(cl.Member(1234), &wm) {
  1266. t.Errorf("member = %v, want %v", cl.Member(1234), &wm)
  1267. }
  1268. }
  1269. // TODO: test server could stop itself when being removed
  1270. func TestPublish(t *testing.T) {
  1271. n := newNodeRecorder()
  1272. ch := make(chan interface{}, 1)
  1273. // simulate that request has gone through consensus
  1274. ch <- Response{}
  1275. w := wait.NewWithResponse(ch)
  1276. ctx, cancel := context.WithCancel(context.TODO())
  1277. srv := &EtcdServer{
  1278. lgMu: new(sync.RWMutex),
  1279. lg: zap.NewExample(),
  1280. readych: make(chan struct{}),
  1281. Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1},
  1282. id: 1,
  1283. r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: n}),
  1284. attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}},
  1285. cluster: &membership.RaftCluster{},
  1286. w: w,
  1287. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  1288. SyncTicker: &time.Ticker{},
  1289. ctx: ctx,
  1290. cancel: cancel,
  1291. }
  1292. srv.publish(time.Hour)
  1293. action := n.Action()
  1294. if len(action) != 1 {
  1295. t.Fatalf("len(action) = %d, want 1", len(action))
  1296. }
  1297. if action[0].Name != "Propose" {
  1298. t.Fatalf("action = %s, want Propose", action[0].Name)
  1299. }
  1300. data := action[0].Params[0].([]byte)
  1301. var r pb.Request
  1302. if err := r.Unmarshal(data); err != nil {
  1303. t.Fatalf("unmarshal request error: %v", err)
  1304. }
  1305. if r.Method != "PUT" {
  1306. t.Errorf("method = %s, want PUT", r.Method)
  1307. }
  1308. wm := membership.Member{ID: 1, Attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}}}
  1309. if wpath := membership.MemberAttributesStorePath(wm.ID); r.Path != wpath {
  1310. t.Errorf("path = %s, want %s", r.Path, wpath)
  1311. }
  1312. var gattr membership.Attributes
  1313. if err := json.Unmarshal([]byte(r.Val), &gattr); err != nil {
  1314. t.Fatalf("unmarshal val error: %v", err)
  1315. }
  1316. if !reflect.DeepEqual(gattr, wm.Attributes) {
  1317. t.Errorf("member = %v, want %v", gattr, wm.Attributes)
  1318. }
  1319. }
  1320. // TestPublishStopped tests that publish will be stopped if server is stopped.
  1321. func TestPublishStopped(t *testing.T) {
  1322. ctx, cancel := context.WithCancel(context.TODO())
  1323. r := newRaftNode(raftNodeConfig{
  1324. lg: zap.NewExample(),
  1325. Node: newNodeNop(),
  1326. transport: rafthttp.NewNopTransporter(),
  1327. })
  1328. srv := &EtcdServer{
  1329. lgMu: new(sync.RWMutex),
  1330. lg: zap.NewExample(),
  1331. Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1},
  1332. r: *r,
  1333. cluster: &membership.RaftCluster{},
  1334. w: mockwait.NewNop(),
  1335. done: make(chan struct{}),
  1336. stopping: make(chan struct{}),
  1337. stop: make(chan struct{}),
  1338. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  1339. SyncTicker: &time.Ticker{},
  1340. ctx: ctx,
  1341. cancel: cancel,
  1342. }
  1343. close(srv.stopping)
  1344. srv.publish(time.Hour)
  1345. }
  1346. // TestPublishRetry tests that publish will keep retry until success.
  1347. func TestPublishRetry(t *testing.T) {
  1348. ctx, cancel := context.WithCancel(context.TODO())
  1349. n := newNodeRecorderStream()
  1350. srv := &EtcdServer{
  1351. lgMu: new(sync.RWMutex),
  1352. lg: zap.NewExample(),
  1353. Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1},
  1354. r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: n}),
  1355. w: mockwait.NewNop(),
  1356. stopping: make(chan struct{}),
  1357. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  1358. SyncTicker: &time.Ticker{},
  1359. ctx: ctx,
  1360. cancel: cancel,
  1361. }
  1362. // expect multiple proposals from retrying
  1363. ch := make(chan struct{})
  1364. go func() {
  1365. defer close(ch)
  1366. if action, err := n.Wait(2); err != nil {
  1367. t.Errorf("len(action) = %d, want >= 2 (%v)", len(action), err)
  1368. }
  1369. close(srv.stopping)
  1370. // drain remaining actions, if any, so publish can terminate
  1371. for {
  1372. select {
  1373. case <-ch:
  1374. return
  1375. default:
  1376. n.Action()
  1377. }
  1378. }
  1379. }()
  1380. srv.publish(10 * time.Nanosecond)
  1381. ch <- struct{}{}
  1382. <-ch
  1383. }
  1384. func TestUpdateVersion(t *testing.T) {
  1385. n := newNodeRecorder()
  1386. ch := make(chan interface{}, 1)
  1387. // simulate that request has gone through consensus
  1388. ch <- Response{}
  1389. w := wait.NewWithResponse(ch)
  1390. ctx, cancel := context.WithCancel(context.TODO())
  1391. srv := &EtcdServer{
  1392. lgMu: new(sync.RWMutex),
  1393. lg: zap.NewExample(),
  1394. id: 1,
  1395. Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1},
  1396. r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: n}),
  1397. attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://node1.com"}},
  1398. cluster: &membership.RaftCluster{},
  1399. w: w,
  1400. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  1401. SyncTicker: &time.Ticker{},
  1402. ctx: ctx,
  1403. cancel: cancel,
  1404. }
  1405. srv.updateClusterVersion("2.0.0")
  1406. action := n.Action()
  1407. if len(action) != 1 {
  1408. t.Fatalf("len(action) = %d, want 1", len(action))
  1409. }
  1410. if action[0].Name != "Propose" {
  1411. t.Fatalf("action = %s, want Propose", action[0].Name)
  1412. }
  1413. data := action[0].Params[0].([]byte)
  1414. var r pb.Request
  1415. if err := r.Unmarshal(data); err != nil {
  1416. t.Fatalf("unmarshal request error: %v", err)
  1417. }
  1418. if r.Method != "PUT" {
  1419. t.Errorf("method = %s, want PUT", r.Method)
  1420. }
  1421. if wpath := path.Join(StoreClusterPrefix, "version"); r.Path != wpath {
  1422. t.Errorf("path = %s, want %s", r.Path, wpath)
  1423. }
  1424. if r.Val != "2.0.0" {
  1425. t.Errorf("val = %s, want %s", r.Val, "2.0.0")
  1426. }
  1427. }
  1428. func TestStopNotify(t *testing.T) {
  1429. s := &EtcdServer{
  1430. lgMu: new(sync.RWMutex),
  1431. lg: zap.NewExample(),
  1432. stop: make(chan struct{}),
  1433. done: make(chan struct{}),
  1434. }
  1435. go func() {
  1436. <-s.stop
  1437. close(s.done)
  1438. }()
  1439. notifier := s.StopNotify()
  1440. select {
  1441. case <-notifier:
  1442. t.Fatalf("received unexpected stop notification")
  1443. default:
  1444. }
  1445. s.Stop()
  1446. select {
  1447. case <-notifier:
  1448. default:
  1449. t.Fatalf("cannot receive stop notification")
  1450. }
  1451. }
  1452. func TestGetOtherPeerURLs(t *testing.T) {
  1453. tests := []struct {
  1454. membs []*membership.Member
  1455. wurls []string
  1456. }{
  1457. {
  1458. []*membership.Member{
  1459. membership.NewMember("1", types.MustNewURLs([]string{"http://10.0.0.1:1"}), "a", nil),
  1460. },
  1461. []string{},
  1462. },
  1463. {
  1464. []*membership.Member{
  1465. membership.NewMember("1", types.MustNewURLs([]string{"http://10.0.0.1:1"}), "a", nil),
  1466. membership.NewMember("2", types.MustNewURLs([]string{"http://10.0.0.2:2"}), "a", nil),
  1467. membership.NewMember("3", types.MustNewURLs([]string{"http://10.0.0.3:3"}), "a", nil),
  1468. },
  1469. []string{"http://10.0.0.2:2", "http://10.0.0.3:3"},
  1470. },
  1471. {
  1472. []*membership.Member{
  1473. membership.NewMember("1", types.MustNewURLs([]string{"http://10.0.0.1:1"}), "a", nil),
  1474. membership.NewMember("3", types.MustNewURLs([]string{"http://10.0.0.3:3"}), "a", nil),
  1475. membership.NewMember("2", types.MustNewURLs([]string{"http://10.0.0.2:2"}), "a", nil),
  1476. },
  1477. []string{"http://10.0.0.2:2", "http://10.0.0.3:3"},
  1478. },
  1479. }
  1480. for i, tt := range tests {
  1481. cl := membership.NewClusterFromMembers(zap.NewExample(), "", types.ID(0), tt.membs)
  1482. self := "1"
  1483. urls := getRemotePeerURLs(cl, self)
  1484. if !reflect.DeepEqual(urls, tt.wurls) {
  1485. t.Errorf("#%d: urls = %+v, want %+v", i, urls, tt.wurls)
  1486. }
  1487. }
  1488. }
  1489. type nodeRecorder struct{ testutil.Recorder }
  1490. func newNodeRecorder() *nodeRecorder { return &nodeRecorder{&testutil.RecorderBuffered{}} }
  1491. func newNodeRecorderStream() *nodeRecorder { return &nodeRecorder{testutil.NewRecorderStream()} }
  1492. func newNodeNop() raft.Node { return newNodeRecorder() }
  1493. func (n *nodeRecorder) Tick() { n.Record(testutil.Action{Name: "Tick"}) }
  1494. func (n *nodeRecorder) Campaign(ctx context.Context) error {
  1495. n.Record(testutil.Action{Name: "Campaign"})
  1496. return nil
  1497. }
  1498. func (n *nodeRecorder) Propose(ctx context.Context, data []byte) error {
  1499. n.Record(testutil.Action{Name: "Propose", Params: []interface{}{data}})
  1500. return nil
  1501. }
  1502. func (n *nodeRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error {
  1503. n.Record(testutil.Action{Name: "ProposeConfChange"})
  1504. return nil
  1505. }
  1506. func (n *nodeRecorder) Step(ctx context.Context, msg raftpb.Message) error {
  1507. n.Record(testutil.Action{Name: "Step"})
  1508. return nil
  1509. }
  1510. func (n *nodeRecorder) Status() raft.Status { return raft.Status{} }
  1511. func (n *nodeRecorder) Ready() <-chan raft.Ready { return nil }
  1512. func (n *nodeRecorder) TransferLeadership(ctx context.Context, lead, transferee uint64) {}
  1513. func (n *nodeRecorder) ReadIndex(ctx context.Context, rctx []byte) error { return nil }
  1514. func (n *nodeRecorder) Advance() {}
  1515. func (n *nodeRecorder) ApplyConfChange(conf raftpb.ConfChange) *raftpb.ConfState {
  1516. n.Record(testutil.Action{Name: "ApplyConfChange", Params: []interface{}{conf}})
  1517. return &raftpb.ConfState{}
  1518. }
  1519. func (n *nodeRecorder) Stop() {
  1520. n.Record(testutil.Action{Name: "Stop"})
  1521. }
  1522. func (n *nodeRecorder) ReportUnreachable(id uint64) {}
  1523. func (n *nodeRecorder) ReportSnapshot(id uint64, status raft.SnapshotStatus) {}
  1524. func (n *nodeRecorder) Compact(index uint64, nodes []uint64, d []byte) {
  1525. n.Record(testutil.Action{Name: "Compact"})
  1526. }
  1527. type nodeProposalBlockerRecorder struct {
  1528. nodeRecorder
  1529. }
  1530. func newProposalBlockerRecorder() *nodeProposalBlockerRecorder {
  1531. return &nodeProposalBlockerRecorder{*newNodeRecorderStream()}
  1532. }
  1533. func (n *nodeProposalBlockerRecorder) Propose(ctx context.Context, data []byte) error {
  1534. <-ctx.Done()
  1535. n.Record(testutil.Action{Name: "Propose blocked"})
  1536. return nil
  1537. }
  1538. // readyNode is a nodeRecorder with a user-writeable ready channel
  1539. type readyNode struct {
  1540. nodeRecorder
  1541. readyc chan raft.Ready
  1542. }
  1543. func newReadyNode() *readyNode {
  1544. return &readyNode{
  1545. nodeRecorder{testutil.NewRecorderStream()},
  1546. make(chan raft.Ready, 1)}
  1547. }
  1548. func newNopReadyNode() *readyNode {
  1549. return &readyNode{*newNodeRecorder(), make(chan raft.Ready, 1)}
  1550. }
  1551. func (n *readyNode) Ready() <-chan raft.Ready { return n.readyc }
  1552. type nodeConfChangeCommitterRecorder struct {
  1553. readyNode
  1554. index uint64
  1555. }
  1556. func newNodeConfChangeCommitterRecorder() *nodeConfChangeCommitterRecorder {
  1557. return &nodeConfChangeCommitterRecorder{*newNopReadyNode(), 0}
  1558. }
  1559. func newNodeConfChangeCommitterStream() *nodeConfChangeCommitterRecorder {
  1560. return &nodeConfChangeCommitterRecorder{*newReadyNode(), 0}
  1561. }
  1562. func (n *nodeConfChangeCommitterRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error {
  1563. data, err := conf.Marshal()
  1564. if err != nil {
  1565. return err
  1566. }
  1567. n.index++
  1568. n.Record(testutil.Action{Name: "ProposeConfChange:" + conf.Type.String()})
  1569. n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Index: n.index, Type: raftpb.EntryConfChange, Data: data}}}
  1570. return nil
  1571. }
  1572. func (n *nodeConfChangeCommitterRecorder) Ready() <-chan raft.Ready {
  1573. return n.readyc
  1574. }
  1575. func (n *nodeConfChangeCommitterRecorder) ApplyConfChange(conf raftpb.ConfChange) *raftpb.ConfState {
  1576. n.Record(testutil.Action{Name: "ApplyConfChange:" + conf.Type.String()})
  1577. return &raftpb.ConfState{}
  1578. }
  1579. // nodeCommitter commits proposed data immediately.
  1580. type nodeCommitter struct {
  1581. readyNode
  1582. index uint64
  1583. }
  1584. func newNodeCommitter() raft.Node {
  1585. return &nodeCommitter{*newNopReadyNode(), 0}
  1586. }
  1587. func (n *nodeCommitter) Propose(ctx context.Context, data []byte) error {
  1588. n.index++
  1589. ents := []raftpb.Entry{{Index: n.index, Data: data}}
  1590. n.readyc <- raft.Ready{
  1591. Entries: ents,
  1592. CommittedEntries: ents,
  1593. }
  1594. return nil
  1595. }
  1596. func newTestCluster(membs []*membership.Member) *membership.RaftCluster {
  1597. c := membership.NewCluster(zap.NewExample(), "")
  1598. for _, m := range membs {
  1599. c.AddMember(m)
  1600. }
  1601. return c
  1602. }