node_test.go 27 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019
  1. // Copyright 2015 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package raft
  15. import (
  16. "bytes"
  17. "context"
  18. "fmt"
  19. "math"
  20. "reflect"
  21. "strings"
  22. "testing"
  23. "time"
  24. "go.etcd.io/etcd/pkg/testutil"
  25. "go.etcd.io/etcd/raft/raftpb"
  26. )
  27. // readyWithTimeout selects from n.Ready() with a 1-second timeout. It
  28. // panics on timeout, which is better than the indefinite wait that
  29. // would occur if this channel were read without being wrapped in a
  30. // select.
  31. func readyWithTimeout(n Node) Ready {
  32. select {
  33. case rd := <-n.Ready():
  34. return rd
  35. case <-time.After(time.Second):
  36. panic("timed out waiting for ready")
  37. }
  38. }
  39. // TestNodeStep ensures that node.Step sends msgProp to propc chan
  40. // and other kinds of messages to recvc chan.
  41. func TestNodeStep(t *testing.T) {
  42. for i, msgn := range raftpb.MessageType_name {
  43. n := &node{
  44. propc: make(chan msgWithResult, 1),
  45. recvc: make(chan raftpb.Message, 1),
  46. }
  47. msgt := raftpb.MessageType(i)
  48. n.Step(context.TODO(), raftpb.Message{Type: msgt})
  49. // Proposal goes to proc chan. Others go to recvc chan.
  50. if msgt == raftpb.MsgProp {
  51. select {
  52. case <-n.propc:
  53. default:
  54. t.Errorf("%d: cannot receive %s on propc chan", msgt, msgn)
  55. }
  56. } else {
  57. if IsLocalMsg(msgt) {
  58. select {
  59. case <-n.recvc:
  60. t.Errorf("%d: step should ignore %s", msgt, msgn)
  61. default:
  62. }
  63. } else {
  64. select {
  65. case <-n.recvc:
  66. default:
  67. t.Errorf("%d: cannot receive %s on recvc chan", msgt, msgn)
  68. }
  69. }
  70. }
  71. }
  72. }
  73. // Cancel and Stop should unblock Step()
  74. func TestNodeStepUnblock(t *testing.T) {
  75. // a node without buffer to block step
  76. n := &node{
  77. propc: make(chan msgWithResult),
  78. done: make(chan struct{}),
  79. }
  80. ctx, cancel := context.WithCancel(context.Background())
  81. stopFunc := func() { close(n.done) }
  82. tests := []struct {
  83. unblock func()
  84. werr error
  85. }{
  86. {stopFunc, ErrStopped},
  87. {cancel, context.Canceled},
  88. }
  89. for i, tt := range tests {
  90. errc := make(chan error, 1)
  91. go func() {
  92. err := n.Step(ctx, raftpb.Message{Type: raftpb.MsgProp})
  93. errc <- err
  94. }()
  95. tt.unblock()
  96. select {
  97. case err := <-errc:
  98. if err != tt.werr {
  99. t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
  100. }
  101. //clean up side-effect
  102. if ctx.Err() != nil {
  103. ctx = context.TODO()
  104. }
  105. select {
  106. case <-n.done:
  107. n.done = make(chan struct{})
  108. default:
  109. }
  110. case <-time.After(1 * time.Second):
  111. t.Fatalf("#%d: failed to unblock step", i)
  112. }
  113. }
  114. }
  115. // TestNodePropose ensures that node.Propose sends the given proposal to the underlying raft.
  116. func TestNodePropose(t *testing.T) {
  117. msgs := []raftpb.Message{}
  118. appendStep := func(r *raft, m raftpb.Message) error {
  119. msgs = append(msgs, m)
  120. return nil
  121. }
  122. s := NewMemoryStorage()
  123. rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
  124. n := newNode(rn)
  125. r := rn.raft
  126. go n.run()
  127. if err := n.Campaign(context.TODO()); err != nil {
  128. t.Fatal(err)
  129. }
  130. for {
  131. rd := <-n.Ready()
  132. s.Append(rd.Entries)
  133. // change the step function to appendStep until this raft becomes leader
  134. if rd.SoftState.Lead == r.id {
  135. r.step = appendStep
  136. n.Advance()
  137. break
  138. }
  139. n.Advance()
  140. }
  141. n.Propose(context.TODO(), []byte("somedata"))
  142. n.Stop()
  143. if len(msgs) != 1 {
  144. t.Fatalf("len(msgs) = %d, want %d", len(msgs), 1)
  145. }
  146. if msgs[0].Type != raftpb.MsgProp {
  147. t.Errorf("msg type = %d, want %d", msgs[0].Type, raftpb.MsgProp)
  148. }
  149. if !bytes.Equal(msgs[0].Entries[0].Data, []byte("somedata")) {
  150. t.Errorf("data = %v, want %v", msgs[0].Entries[0].Data, []byte("somedata"))
  151. }
  152. }
  153. // TestNodeReadIndex ensures that node.ReadIndex sends the MsgReadIndex message to the underlying raft.
  154. // It also ensures that ReadState can be read out through ready chan.
  155. func TestNodeReadIndex(t *testing.T) {
  156. msgs := []raftpb.Message{}
  157. appendStep := func(r *raft, m raftpb.Message) error {
  158. msgs = append(msgs, m)
  159. return nil
  160. }
  161. wrs := []ReadState{{Index: uint64(1), RequestCtx: []byte("somedata")}}
  162. s := NewMemoryStorage()
  163. rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
  164. n := newNode(rn)
  165. r := rn.raft
  166. r.readStates = wrs
  167. go n.run()
  168. n.Campaign(context.TODO())
  169. for {
  170. rd := <-n.Ready()
  171. if !reflect.DeepEqual(rd.ReadStates, wrs) {
  172. t.Errorf("ReadStates = %v, want %v", rd.ReadStates, wrs)
  173. }
  174. s.Append(rd.Entries)
  175. if rd.SoftState.Lead == r.id {
  176. n.Advance()
  177. break
  178. }
  179. n.Advance()
  180. }
  181. r.step = appendStep
  182. wrequestCtx := []byte("somedata2")
  183. n.ReadIndex(context.TODO(), wrequestCtx)
  184. n.Stop()
  185. if len(msgs) != 1 {
  186. t.Fatalf("len(msgs) = %d, want %d", len(msgs), 1)
  187. }
  188. if msgs[0].Type != raftpb.MsgReadIndex {
  189. t.Errorf("msg type = %d, want %d", msgs[0].Type, raftpb.MsgReadIndex)
  190. }
  191. if !bytes.Equal(msgs[0].Entries[0].Data, wrequestCtx) {
  192. t.Errorf("data = %v, want %v", msgs[0].Entries[0].Data, wrequestCtx)
  193. }
  194. }
  195. // TestDisableProposalForwarding ensures that proposals are not forwarded to
  196. // the leader when DisableProposalForwarding is true.
  197. func TestDisableProposalForwarding(t *testing.T) {
  198. r1 := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
  199. r2 := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
  200. cfg3 := newTestConfig(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
  201. cfg3.DisableProposalForwarding = true
  202. r3 := newRaft(cfg3)
  203. nt := newNetwork(r1, r2, r3)
  204. // elect r1 as leader
  205. nt.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgHup})
  206. var testEntries = []raftpb.Entry{{Data: []byte("testdata")}}
  207. // send proposal to r2(follower) where DisableProposalForwarding is false
  208. r2.Step(raftpb.Message{From: 2, To: 2, Type: raftpb.MsgProp, Entries: testEntries})
  209. // verify r2(follower) does forward the proposal when DisableProposalForwarding is false
  210. if len(r2.msgs) != 1 {
  211. t.Fatalf("len(r2.msgs) expected 1, got %d", len(r2.msgs))
  212. }
  213. // send proposal to r3(follower) where DisableProposalForwarding is true
  214. r3.Step(raftpb.Message{From: 3, To: 3, Type: raftpb.MsgProp, Entries: testEntries})
  215. // verify r3(follower) does not forward the proposal when DisableProposalForwarding is true
  216. if len(r3.msgs) != 0 {
  217. t.Fatalf("len(r3.msgs) expected 0, got %d", len(r3.msgs))
  218. }
  219. }
  220. // TestNodeReadIndexToOldLeader ensures that raftpb.MsgReadIndex to old leader
  221. // gets forwarded to the new leader and 'send' method does not attach its term.
  222. func TestNodeReadIndexToOldLeader(t *testing.T) {
  223. r1 := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
  224. r2 := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
  225. r3 := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
  226. nt := newNetwork(r1, r2, r3)
  227. // elect r1 as leader
  228. nt.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgHup})
  229. var testEntries = []raftpb.Entry{{Data: []byte("testdata")}}
  230. // send readindex request to r2(follower)
  231. r2.Step(raftpb.Message{From: 2, To: 2, Type: raftpb.MsgReadIndex, Entries: testEntries})
  232. // verify r2(follower) forwards this message to r1(leader) with term not set
  233. if len(r2.msgs) != 1 {
  234. t.Fatalf("len(r2.msgs) expected 1, got %d", len(r2.msgs))
  235. }
  236. readIndxMsg1 := raftpb.Message{From: 2, To: 1, Type: raftpb.MsgReadIndex, Entries: testEntries}
  237. if !reflect.DeepEqual(r2.msgs[0], readIndxMsg1) {
  238. t.Fatalf("r2.msgs[0] expected %+v, got %+v", readIndxMsg1, r2.msgs[0])
  239. }
  240. // send readindex request to r3(follower)
  241. r3.Step(raftpb.Message{From: 3, To: 3, Type: raftpb.MsgReadIndex, Entries: testEntries})
  242. // verify r3(follower) forwards this message to r1(leader) with term not set as well.
  243. if len(r3.msgs) != 1 {
  244. t.Fatalf("len(r3.msgs) expected 1, got %d", len(r3.msgs))
  245. }
  246. readIndxMsg2 := raftpb.Message{From: 3, To: 1, Type: raftpb.MsgReadIndex, Entries: testEntries}
  247. if !reflect.DeepEqual(r3.msgs[0], readIndxMsg2) {
  248. t.Fatalf("r3.msgs[0] expected %+v, got %+v", readIndxMsg2, r3.msgs[0])
  249. }
  250. // now elect r3 as leader
  251. nt.send(raftpb.Message{From: 3, To: 3, Type: raftpb.MsgHup})
  252. // let r1 steps the two messages previously we got from r2, r3
  253. r1.Step(readIndxMsg1)
  254. r1.Step(readIndxMsg2)
  255. // verify r1(follower) forwards these messages again to r3(new leader)
  256. if len(r1.msgs) != 2 {
  257. t.Fatalf("len(r1.msgs) expected 1, got %d", len(r1.msgs))
  258. }
  259. readIndxMsg3 := raftpb.Message{From: 1, To: 3, Type: raftpb.MsgReadIndex, Entries: testEntries}
  260. if !reflect.DeepEqual(r1.msgs[0], readIndxMsg3) {
  261. t.Fatalf("r1.msgs[0] expected %+v, got %+v", readIndxMsg3, r1.msgs[0])
  262. }
  263. if !reflect.DeepEqual(r1.msgs[1], readIndxMsg3) {
  264. t.Fatalf("r1.msgs[1] expected %+v, got %+v", readIndxMsg3, r1.msgs[1])
  265. }
  266. }
  267. // TestNodeProposeConfig ensures that node.ProposeConfChange sends the given configuration proposal
  268. // to the underlying raft.
  269. func TestNodeProposeConfig(t *testing.T) {
  270. msgs := []raftpb.Message{}
  271. appendStep := func(r *raft, m raftpb.Message) error {
  272. msgs = append(msgs, m)
  273. return nil
  274. }
  275. s := NewMemoryStorage()
  276. rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
  277. n := newNode(rn)
  278. r := rn.raft
  279. go n.run()
  280. n.Campaign(context.TODO())
  281. for {
  282. rd := <-n.Ready()
  283. s.Append(rd.Entries)
  284. // change the step function to appendStep until this raft becomes leader
  285. if rd.SoftState.Lead == r.id {
  286. r.step = appendStep
  287. n.Advance()
  288. break
  289. }
  290. n.Advance()
  291. }
  292. cc := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1}
  293. ccdata, err := cc.Marshal()
  294. if err != nil {
  295. t.Fatal(err)
  296. }
  297. n.ProposeConfChange(context.TODO(), cc)
  298. n.Stop()
  299. if len(msgs) != 1 {
  300. t.Fatalf("len(msgs) = %d, want %d", len(msgs), 1)
  301. }
  302. if msgs[0].Type != raftpb.MsgProp {
  303. t.Errorf("msg type = %d, want %d", msgs[0].Type, raftpb.MsgProp)
  304. }
  305. if !bytes.Equal(msgs[0].Entries[0].Data, ccdata) {
  306. t.Errorf("data = %v, want %v", msgs[0].Entries[0].Data, ccdata)
  307. }
  308. }
  309. // TestNodeProposeAddDuplicateNode ensures that two proposes to add the same node should
  310. // not affect the later propose to add new node.
  311. func TestNodeProposeAddDuplicateNode(t *testing.T) {
  312. s := NewMemoryStorage()
  313. rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
  314. n := newNode(rn)
  315. go n.run()
  316. n.Campaign(context.TODO())
  317. rdyEntries := make([]raftpb.Entry, 0)
  318. ticker := time.NewTicker(time.Millisecond * 100)
  319. defer ticker.Stop()
  320. done := make(chan struct{})
  321. stop := make(chan struct{})
  322. applyConfChan := make(chan struct{})
  323. go func() {
  324. defer close(done)
  325. for {
  326. select {
  327. case <-stop:
  328. return
  329. case <-ticker.C:
  330. n.Tick()
  331. case rd := <-n.Ready():
  332. s.Append(rd.Entries)
  333. applied := false
  334. for _, e := range rd.Entries {
  335. rdyEntries = append(rdyEntries, e)
  336. switch e.Type {
  337. case raftpb.EntryNormal:
  338. case raftpb.EntryConfChange:
  339. var cc raftpb.ConfChange
  340. cc.Unmarshal(e.Data)
  341. n.ApplyConfChange(cc)
  342. applied = true
  343. }
  344. }
  345. n.Advance()
  346. if applied {
  347. applyConfChan <- struct{}{}
  348. }
  349. }
  350. }
  351. }()
  352. cc1 := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1}
  353. ccdata1, _ := cc1.Marshal()
  354. n.ProposeConfChange(context.TODO(), cc1)
  355. <-applyConfChan
  356. // try add the same node again
  357. n.ProposeConfChange(context.TODO(), cc1)
  358. <-applyConfChan
  359. // the new node join should be ok
  360. cc2 := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 2}
  361. ccdata2, _ := cc2.Marshal()
  362. n.ProposeConfChange(context.TODO(), cc2)
  363. <-applyConfChan
  364. close(stop)
  365. <-done
  366. if len(rdyEntries) != 4 {
  367. t.Errorf("len(entry) = %d, want %d, %v\n", len(rdyEntries), 4, rdyEntries)
  368. }
  369. if !bytes.Equal(rdyEntries[1].Data, ccdata1) {
  370. t.Errorf("data = %v, want %v", rdyEntries[1].Data, ccdata1)
  371. }
  372. if !bytes.Equal(rdyEntries[3].Data, ccdata2) {
  373. t.Errorf("data = %v, want %v", rdyEntries[3].Data, ccdata2)
  374. }
  375. n.Stop()
  376. }
  377. // TestBlockProposal ensures that node will block proposal when it does not
  378. // know who is the current leader; node will accept proposal when it knows
  379. // who is the current leader.
  380. func TestBlockProposal(t *testing.T) {
  381. rn := newTestRawNode(1, []uint64{1}, 10, 1, NewMemoryStorage())
  382. n := newNode(rn)
  383. go n.run()
  384. defer n.Stop()
  385. errc := make(chan error, 1)
  386. go func() {
  387. errc <- n.Propose(context.TODO(), []byte("somedata"))
  388. }()
  389. testutil.WaitSchedule()
  390. select {
  391. case err := <-errc:
  392. t.Errorf("err = %v, want blocking", err)
  393. default:
  394. }
  395. n.Campaign(context.TODO())
  396. select {
  397. case err := <-errc:
  398. if err != nil {
  399. t.Errorf("err = %v, want %v", err, nil)
  400. }
  401. case <-time.After(10 * time.Second):
  402. t.Errorf("blocking proposal, want unblocking")
  403. }
  404. }
  405. func TestNodeProposeWaitDropped(t *testing.T) {
  406. msgs := []raftpb.Message{}
  407. droppingMsg := []byte("test_dropping")
  408. dropStep := func(r *raft, m raftpb.Message) error {
  409. if m.Type == raftpb.MsgProp && strings.Contains(m.String(), string(droppingMsg)) {
  410. t.Logf("dropping message: %v", m.String())
  411. return ErrProposalDropped
  412. }
  413. msgs = append(msgs, m)
  414. return nil
  415. }
  416. s := NewMemoryStorage()
  417. rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
  418. n := newNode(rn)
  419. r := rn.raft
  420. go n.run()
  421. n.Campaign(context.TODO())
  422. for {
  423. rd := <-n.Ready()
  424. s.Append(rd.Entries)
  425. // change the step function to dropStep until this raft becomes leader
  426. if rd.SoftState.Lead == r.id {
  427. r.step = dropStep
  428. n.Advance()
  429. break
  430. }
  431. n.Advance()
  432. }
  433. proposalTimeout := time.Millisecond * 100
  434. ctx, cancel := context.WithTimeout(context.Background(), proposalTimeout)
  435. // propose with cancel should be cancelled earyly if dropped
  436. err := n.Propose(ctx, droppingMsg)
  437. if err != ErrProposalDropped {
  438. t.Errorf("should drop proposal : %v", err)
  439. }
  440. cancel()
  441. n.Stop()
  442. if len(msgs) != 0 {
  443. t.Fatalf("len(msgs) = %d, want %d", len(msgs), 1)
  444. }
  445. }
  446. // TestNodeTick ensures that node.Tick() will increase the
  447. // elapsed of the underlying raft state machine.
  448. func TestNodeTick(t *testing.T) {
  449. s := NewMemoryStorage()
  450. rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
  451. n := newNode(rn)
  452. r := rn.raft
  453. go n.run()
  454. elapsed := r.electionElapsed
  455. n.Tick()
  456. for len(n.tickc) != 0 {
  457. time.Sleep(100 * time.Millisecond)
  458. }
  459. n.Stop()
  460. if r.electionElapsed != elapsed+1 {
  461. t.Errorf("elapsed = %d, want %d", r.electionElapsed, elapsed+1)
  462. }
  463. }
  464. // TestNodeStop ensures that node.Stop() blocks until the node has stopped
  465. // processing, and that it is idempotent
  466. func TestNodeStop(t *testing.T) {
  467. rn := newTestRawNode(1, []uint64{1}, 10, 1, NewMemoryStorage())
  468. n := newNode(rn)
  469. donec := make(chan struct{})
  470. go func() {
  471. n.run()
  472. close(donec)
  473. }()
  474. status := n.Status()
  475. n.Stop()
  476. select {
  477. case <-donec:
  478. case <-time.After(time.Second):
  479. t.Fatalf("timed out waiting for node to stop!")
  480. }
  481. emptyStatus := Status{}
  482. if reflect.DeepEqual(status, emptyStatus) {
  483. t.Errorf("status = %v, want not empty", status)
  484. }
  485. // Further status should return be empty, the node is stopped.
  486. status = n.Status()
  487. if !reflect.DeepEqual(status, emptyStatus) {
  488. t.Errorf("status = %v, want empty", status)
  489. }
  490. // Subsequent Stops should have no effect.
  491. n.Stop()
  492. }
  493. func TestReadyContainUpdates(t *testing.T) {
  494. tests := []struct {
  495. rd Ready
  496. wcontain bool
  497. }{
  498. {Ready{}, false},
  499. {Ready{SoftState: &SoftState{Lead: 1}}, true},
  500. {Ready{HardState: raftpb.HardState{Vote: 1}}, true},
  501. {Ready{Entries: make([]raftpb.Entry, 1)}, true},
  502. {Ready{CommittedEntries: make([]raftpb.Entry, 1)}, true},
  503. {Ready{Messages: make([]raftpb.Message, 1)}, true},
  504. {Ready{Snapshot: raftpb.Snapshot{Metadata: raftpb.SnapshotMetadata{Index: 1}}}, true},
  505. }
  506. for i, tt := range tests {
  507. if g := tt.rd.containsUpdates(); g != tt.wcontain {
  508. t.Errorf("#%d: containUpdates = %v, want %v", i, g, tt.wcontain)
  509. }
  510. }
  511. }
  512. // TestNodeStart ensures that a node can be started correctly. The node should
  513. // start with correct configuration change entries, and can accept and commit
  514. // proposals.
  515. func TestNodeStart(t *testing.T) {
  516. ctx, cancel := context.WithCancel(context.Background())
  517. defer cancel()
  518. cc := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1}
  519. ccdata, err := cc.Marshal()
  520. if err != nil {
  521. t.Fatalf("unexpected marshal error: %v", err)
  522. }
  523. wants := []Ready{
  524. {
  525. HardState: raftpb.HardState{Term: 1, Commit: 1, Vote: 0},
  526. Entries: []raftpb.Entry{
  527. {Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata},
  528. },
  529. CommittedEntries: []raftpb.Entry{
  530. {Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata},
  531. },
  532. MustSync: true,
  533. },
  534. {
  535. HardState: raftpb.HardState{Term: 2, Commit: 3, Vote: 1},
  536. Entries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
  537. CommittedEntries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
  538. MustSync: true,
  539. },
  540. }
  541. storage := NewMemoryStorage()
  542. c := &Config{
  543. ID: 1,
  544. ElectionTick: 10,
  545. HeartbeatTick: 1,
  546. Storage: storage,
  547. MaxSizePerMsg: noLimit,
  548. MaxInflightMsgs: 256,
  549. }
  550. n := StartNode(c, []Peer{{ID: 1}})
  551. defer n.Stop()
  552. g := <-n.Ready()
  553. if !reflect.DeepEqual(g, wants[0]) {
  554. t.Fatalf("#%d: g = %+v,\n w %+v", 1, g, wants[0])
  555. } else {
  556. storage.Append(g.Entries)
  557. n.Advance()
  558. }
  559. if err := n.Campaign(ctx); err != nil {
  560. t.Fatal(err)
  561. }
  562. rd := <-n.Ready()
  563. storage.Append(rd.Entries)
  564. n.Advance()
  565. n.Propose(ctx, []byte("foo"))
  566. if g2 := <-n.Ready(); !reflect.DeepEqual(g2, wants[1]) {
  567. t.Errorf("#%d: g = %+v,\n w %+v", 2, g2, wants[1])
  568. } else {
  569. storage.Append(g2.Entries)
  570. n.Advance()
  571. }
  572. select {
  573. case rd := <-n.Ready():
  574. t.Errorf("unexpected Ready: %+v", rd)
  575. case <-time.After(time.Millisecond):
  576. }
  577. }
  578. func TestNodeRestart(t *testing.T) {
  579. entries := []raftpb.Entry{
  580. {Term: 1, Index: 1},
  581. {Term: 1, Index: 2, Data: []byte("foo")},
  582. }
  583. st := raftpb.HardState{Term: 1, Commit: 1}
  584. want := Ready{
  585. // No HardState is emitted because there was no change.
  586. HardState: raftpb.HardState{},
  587. // commit up to index commit index in st
  588. CommittedEntries: entries[:st.Commit],
  589. // MustSync is false because no HardState or new entries are provided.
  590. MustSync: false,
  591. }
  592. storage := NewMemoryStorage()
  593. storage.SetHardState(st)
  594. storage.Append(entries)
  595. c := &Config{
  596. ID: 1,
  597. ElectionTick: 10,
  598. HeartbeatTick: 1,
  599. Storage: storage,
  600. MaxSizePerMsg: noLimit,
  601. MaxInflightMsgs: 256,
  602. }
  603. n := RestartNode(c)
  604. defer n.Stop()
  605. if g := <-n.Ready(); !reflect.DeepEqual(g, want) {
  606. t.Errorf("g = %+v,\n w %+v", g, want)
  607. }
  608. n.Advance()
  609. select {
  610. case rd := <-n.Ready():
  611. t.Errorf("unexpected Ready: %+v", rd)
  612. case <-time.After(time.Millisecond):
  613. }
  614. }
  615. func TestNodeRestartFromSnapshot(t *testing.T) {
  616. snap := raftpb.Snapshot{
  617. Metadata: raftpb.SnapshotMetadata{
  618. ConfState: raftpb.ConfState{Voters: []uint64{1, 2}},
  619. Index: 2,
  620. Term: 1,
  621. },
  622. }
  623. entries := []raftpb.Entry{
  624. {Term: 1, Index: 3, Data: []byte("foo")},
  625. }
  626. st := raftpb.HardState{Term: 1, Commit: 3}
  627. want := Ready{
  628. // No HardState is emitted because nothing changed relative to what is
  629. // already persisted.
  630. HardState: raftpb.HardState{},
  631. // commit up to index commit index in st
  632. CommittedEntries: entries,
  633. // MustSync is only true when there is a new HardState or new entries;
  634. // neither is the case here.
  635. MustSync: false,
  636. }
  637. s := NewMemoryStorage()
  638. s.SetHardState(st)
  639. s.ApplySnapshot(snap)
  640. s.Append(entries)
  641. c := &Config{
  642. ID: 1,
  643. ElectionTick: 10,
  644. HeartbeatTick: 1,
  645. Storage: s,
  646. MaxSizePerMsg: noLimit,
  647. MaxInflightMsgs: 256,
  648. }
  649. n := RestartNode(c)
  650. defer n.Stop()
  651. if g := <-n.Ready(); !reflect.DeepEqual(g, want) {
  652. t.Errorf("g = %+v,\n w %+v", g, want)
  653. } else {
  654. n.Advance()
  655. }
  656. select {
  657. case rd := <-n.Ready():
  658. t.Errorf("unexpected Ready: %+v", rd)
  659. case <-time.After(time.Millisecond):
  660. }
  661. }
  662. func TestNodeAdvance(t *testing.T) {
  663. ctx, cancel := context.WithCancel(context.Background())
  664. defer cancel()
  665. storage := NewMemoryStorage()
  666. c := &Config{
  667. ID: 1,
  668. ElectionTick: 10,
  669. HeartbeatTick: 1,
  670. Storage: storage,
  671. MaxSizePerMsg: noLimit,
  672. MaxInflightMsgs: 256,
  673. }
  674. n := StartNode(c, []Peer{{ID: 1}})
  675. defer n.Stop()
  676. rd := <-n.Ready()
  677. storage.Append(rd.Entries)
  678. n.Advance()
  679. n.Campaign(ctx)
  680. <-n.Ready()
  681. n.Propose(ctx, []byte("foo"))
  682. select {
  683. case rd = <-n.Ready():
  684. t.Fatalf("unexpected Ready before Advance: %+v", rd)
  685. case <-time.After(time.Millisecond):
  686. }
  687. storage.Append(rd.Entries)
  688. n.Advance()
  689. select {
  690. case <-n.Ready():
  691. case <-time.After(100 * time.Millisecond):
  692. t.Errorf("expect Ready after Advance, but there is no Ready available")
  693. }
  694. }
  695. func TestSoftStateEqual(t *testing.T) {
  696. tests := []struct {
  697. st *SoftState
  698. we bool
  699. }{
  700. {&SoftState{}, true},
  701. {&SoftState{Lead: 1}, false},
  702. {&SoftState{RaftState: StateLeader}, false},
  703. }
  704. for i, tt := range tests {
  705. if g := tt.st.equal(&SoftState{}); g != tt.we {
  706. t.Errorf("#%d, equal = %v, want %v", i, g, tt.we)
  707. }
  708. }
  709. }
  710. func TestIsHardStateEqual(t *testing.T) {
  711. tests := []struct {
  712. st raftpb.HardState
  713. we bool
  714. }{
  715. {emptyState, true},
  716. {raftpb.HardState{Vote: 1}, false},
  717. {raftpb.HardState{Commit: 1}, false},
  718. {raftpb.HardState{Term: 1}, false},
  719. }
  720. for i, tt := range tests {
  721. if isHardStateEqual(tt.st, emptyState) != tt.we {
  722. t.Errorf("#%d, equal = %v, want %v", i, isHardStateEqual(tt.st, emptyState), tt.we)
  723. }
  724. }
  725. }
  726. func TestNodeProposeAddLearnerNode(t *testing.T) {
  727. ticker := time.NewTicker(time.Millisecond * 100)
  728. defer ticker.Stop()
  729. s := NewMemoryStorage()
  730. rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
  731. n := newNode(rn)
  732. go n.run()
  733. n.Campaign(context.TODO())
  734. stop := make(chan struct{})
  735. done := make(chan struct{})
  736. applyConfChan := make(chan struct{})
  737. go func() {
  738. defer close(done)
  739. for {
  740. select {
  741. case <-stop:
  742. return
  743. case <-ticker.C:
  744. n.Tick()
  745. case rd := <-n.Ready():
  746. s.Append(rd.Entries)
  747. t.Logf("raft: %v", rd.Entries)
  748. for _, ent := range rd.Entries {
  749. if ent.Type != raftpb.EntryConfChange {
  750. continue
  751. }
  752. var cc raftpb.ConfChange
  753. cc.Unmarshal(ent.Data)
  754. state := n.ApplyConfChange(cc)
  755. if len(state.Learners) == 0 ||
  756. state.Learners[0] != cc.NodeID ||
  757. cc.NodeID != 2 {
  758. t.Errorf("apply conf change should return new added learner: %v", state.String())
  759. }
  760. if len(state.Voters) != 1 {
  761. t.Errorf("add learner should not change the nodes: %v", state.String())
  762. }
  763. t.Logf("apply raft conf %v changed to: %v", cc, state.String())
  764. applyConfChan <- struct{}{}
  765. }
  766. n.Advance()
  767. }
  768. }
  769. }()
  770. cc := raftpb.ConfChange{Type: raftpb.ConfChangeAddLearnerNode, NodeID: 2}
  771. n.ProposeConfChange(context.TODO(), cc)
  772. <-applyConfChan
  773. close(stop)
  774. <-done
  775. }
  776. func TestAppendPagination(t *testing.T) {
  777. const maxSizePerMsg = 2048
  778. n := newNetworkWithConfig(func(c *Config) {
  779. c.MaxSizePerMsg = maxSizePerMsg
  780. }, nil, nil, nil)
  781. seenFullMessage := false
  782. // Inspect all messages to see that we never exceed the limit, but
  783. // we do see messages of larger than half the limit.
  784. n.msgHook = func(m raftpb.Message) bool {
  785. if m.Type == raftpb.MsgApp {
  786. size := 0
  787. for _, e := range m.Entries {
  788. size += len(e.Data)
  789. }
  790. if size > maxSizePerMsg {
  791. t.Errorf("sent MsgApp that is too large: %d bytes", size)
  792. }
  793. if size > maxSizePerMsg/2 {
  794. seenFullMessage = true
  795. }
  796. }
  797. return true
  798. }
  799. n.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgHup})
  800. // Partition the network while we make our proposals. This forces
  801. // the entries to be batched into larger messages.
  802. n.isolate(1)
  803. blob := []byte(strings.Repeat("a", 1000))
  804. for i := 0; i < 5; i++ {
  805. n.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgProp, Entries: []raftpb.Entry{{Data: blob}}})
  806. }
  807. n.recover()
  808. // After the partition recovers, tick the clock to wake everything
  809. // back up and send the messages.
  810. n.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgBeat})
  811. if !seenFullMessage {
  812. t.Error("didn't see any messages more than half the max size; something is wrong with this test")
  813. }
  814. }
  815. func TestCommitPagination(t *testing.T) {
  816. s := NewMemoryStorage()
  817. cfg := newTestConfig(1, []uint64{1}, 10, 1, s)
  818. cfg.MaxCommittedSizePerReady = 2048
  819. rn, err := NewRawNode(cfg)
  820. if err != nil {
  821. t.Fatal(err)
  822. }
  823. n := newNode(rn)
  824. go n.run()
  825. n.Campaign(context.TODO())
  826. rd := readyWithTimeout(&n)
  827. if len(rd.CommittedEntries) != 1 {
  828. t.Fatalf("expected 1 (empty) entry, got %d", len(rd.CommittedEntries))
  829. }
  830. s.Append(rd.Entries)
  831. n.Advance()
  832. blob := []byte(strings.Repeat("a", 1000))
  833. for i := 0; i < 3; i++ {
  834. if err := n.Propose(context.TODO(), blob); err != nil {
  835. t.Fatal(err)
  836. }
  837. }
  838. // The 3 proposals will commit in two batches.
  839. rd = readyWithTimeout(&n)
  840. if len(rd.CommittedEntries) != 2 {
  841. t.Fatalf("expected 2 entries in first batch, got %d", len(rd.CommittedEntries))
  842. }
  843. s.Append(rd.Entries)
  844. n.Advance()
  845. rd = readyWithTimeout(&n)
  846. if len(rd.CommittedEntries) != 1 {
  847. t.Fatalf("expected 1 entry in second batch, got %d", len(rd.CommittedEntries))
  848. }
  849. s.Append(rd.Entries)
  850. n.Advance()
  851. }
  852. type ignoreSizeHintMemStorage struct {
  853. *MemoryStorage
  854. }
  855. func (s *ignoreSizeHintMemStorage) Entries(lo, hi uint64, maxSize uint64) ([]raftpb.Entry, error) {
  856. return s.MemoryStorage.Entries(lo, hi, math.MaxUint64)
  857. }
  858. // TestNodeCommitPaginationAfterRestart regression tests a scenario in which the
  859. // Storage's Entries size limitation is slightly more permissive than Raft's
  860. // internal one. The original bug was the following:
  861. //
  862. // - node learns that index 11 (or 100, doesn't matter) is committed
  863. // - nextEnts returns index 1..10 in CommittedEntries due to size limiting. However,
  864. // index 10 already exceeds maxBytes, due to a user-provided impl of Entries.
  865. // - Commit index gets bumped to 10
  866. // - the node persists the HardState, but crashes before applying the entries
  867. // - upon restart, the storage returns the same entries, but `slice` takes a different code path
  868. // (since it is now called with an upper bound of 10) and removes the last entry.
  869. // - Raft emits a HardState with a regressing commit index.
  870. //
  871. // A simpler version of this test would have the storage return a lot less entries than dictated
  872. // by maxSize (for example, exactly one entry) after the restart, resulting in a larger regression.
  873. // This wouldn't need to exploit anything about Raft-internal code paths to fail.
  874. func TestNodeCommitPaginationAfterRestart(t *testing.T) {
  875. s := &ignoreSizeHintMemStorage{
  876. MemoryStorage: NewMemoryStorage(),
  877. }
  878. persistedHardState := raftpb.HardState{
  879. Term: 1,
  880. Vote: 1,
  881. Commit: 10,
  882. }
  883. s.hardState = persistedHardState
  884. s.ents = make([]raftpb.Entry, 10)
  885. var size uint64
  886. for i := range s.ents {
  887. ent := raftpb.Entry{
  888. Term: 1,
  889. Index: uint64(i + 1),
  890. Type: raftpb.EntryNormal,
  891. Data: []byte("a"),
  892. }
  893. s.ents[i] = ent
  894. size += uint64(ent.Size())
  895. }
  896. cfg := newTestConfig(1, []uint64{1}, 10, 1, s)
  897. // Set a MaxSizePerMsg that would suggest to Raft that the last committed entry should
  898. // not be included in the initial rd.CommittedEntries. However, our storage will ignore
  899. // this and *will* return it (which is how the Commit index ended up being 10 initially).
  900. cfg.MaxSizePerMsg = size - uint64(s.ents[len(s.ents)-1].Size()) - 1
  901. rn, err := NewRawNode(cfg)
  902. if err != nil {
  903. t.Fatal(err)
  904. }
  905. n := newNode(rn)
  906. go n.run()
  907. defer n.Stop()
  908. rd := readyWithTimeout(&n)
  909. if !IsEmptyHardState(rd.HardState) && rd.HardState.Commit < persistedHardState.Commit {
  910. t.Errorf("HardState regressed: Commit %d -> %d\nCommitting:\n%+v",
  911. persistedHardState.Commit, rd.HardState.Commit,
  912. DescribeEntries(rd.CommittedEntries, func(data []byte) string { return fmt.Sprintf("%q", data) }),
  913. )
  914. }
  915. }