node_test.go 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053
  1. // Copyright 2015 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package raft
  15. import (
  16. "bytes"
  17. "context"
  18. "fmt"
  19. "math"
  20. "reflect"
  21. "strings"
  22. "testing"
  23. "time"
  24. "go.etcd.io/etcd/pkg/testutil"
  25. "go.etcd.io/etcd/raft/raftpb"
  26. )
  27. // readyWithTimeout selects from n.Ready() with a 1-second timeout. It
  28. // panics on timeout, which is better than the indefinite wait that
  29. // would occur if this channel were read without being wrapped in a
  30. // select.
  31. func readyWithTimeout(n Node) Ready {
  32. select {
  33. case rd := <-n.Ready():
  34. return rd
  35. case <-time.After(time.Second):
  36. panic("timed out waiting for ready")
  37. }
  38. }
  39. // TestNodeStep ensures that node.Step sends msgProp to propc chan
  40. // and other kinds of messages to recvc chan.
  41. func TestNodeStep(t *testing.T) {
  42. for i, msgn := range raftpb.MessageType_name {
  43. n := &node{
  44. propc: make(chan msgWithResult, 1),
  45. recvc: make(chan raftpb.Message, 1),
  46. }
  47. msgt := raftpb.MessageType(i)
  48. n.Step(context.TODO(), raftpb.Message{Type: msgt})
  49. // Proposal goes to proc chan. Others go to recvc chan.
  50. if msgt == raftpb.MsgProp {
  51. select {
  52. case <-n.propc:
  53. default:
  54. t.Errorf("%d: cannot receive %s on propc chan", msgt, msgn)
  55. }
  56. } else {
  57. if IsLocalMsg(msgt) {
  58. select {
  59. case <-n.recvc:
  60. t.Errorf("%d: step should ignore %s", msgt, msgn)
  61. default:
  62. }
  63. } else {
  64. select {
  65. case <-n.recvc:
  66. default:
  67. t.Errorf("%d: cannot receive %s on recvc chan", msgt, msgn)
  68. }
  69. }
  70. }
  71. }
  72. }
  73. // Cancel and Stop should unblock Step()
  74. func TestNodeStepUnblock(t *testing.T) {
  75. // a node without buffer to block step
  76. n := &node{
  77. propc: make(chan msgWithResult),
  78. done: make(chan struct{}),
  79. }
  80. ctx, cancel := context.WithCancel(context.Background())
  81. stopFunc := func() { close(n.done) }
  82. tests := []struct {
  83. unblock func()
  84. werr error
  85. }{
  86. {stopFunc, ErrStopped},
  87. {cancel, context.Canceled},
  88. }
  89. for i, tt := range tests {
  90. errc := make(chan error, 1)
  91. go func() {
  92. err := n.Step(ctx, raftpb.Message{Type: raftpb.MsgProp})
  93. errc <- err
  94. }()
  95. tt.unblock()
  96. select {
  97. case err := <-errc:
  98. if err != tt.werr {
  99. t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
  100. }
  101. //clean up side-effect
  102. if ctx.Err() != nil {
  103. ctx = context.TODO()
  104. }
  105. select {
  106. case <-n.done:
  107. n.done = make(chan struct{})
  108. default:
  109. }
  110. case <-time.After(1 * time.Second):
  111. t.Fatalf("#%d: failed to unblock step", i)
  112. }
  113. }
  114. }
  115. // TestNodePropose ensures that node.Propose sends the given proposal to the underlying raft.
  116. func TestNodePropose(t *testing.T) {
  117. msgs := []raftpb.Message{}
  118. appendStep := func(r *raft, m raftpb.Message) error {
  119. msgs = append(msgs, m)
  120. return nil
  121. }
  122. n := newNode()
  123. s := NewMemoryStorage()
  124. r := newTestRaft(1, []uint64{1}, 10, 1, s)
  125. go n.run(r)
  126. n.Campaign(context.TODO())
  127. for {
  128. rd := <-n.Ready()
  129. s.Append(rd.Entries)
  130. // change the step function to appendStep until this raft becomes leader
  131. if rd.SoftState.Lead == r.id {
  132. r.step = appendStep
  133. n.Advance()
  134. break
  135. }
  136. n.Advance()
  137. }
  138. n.Propose(context.TODO(), []byte("somedata"))
  139. n.Stop()
  140. if len(msgs) != 1 {
  141. t.Fatalf("len(msgs) = %d, want %d", len(msgs), 1)
  142. }
  143. if msgs[0].Type != raftpb.MsgProp {
  144. t.Errorf("msg type = %d, want %d", msgs[0].Type, raftpb.MsgProp)
  145. }
  146. if !bytes.Equal(msgs[0].Entries[0].Data, []byte("somedata")) {
  147. t.Errorf("data = %v, want %v", msgs[0].Entries[0].Data, []byte("somedata"))
  148. }
  149. }
  150. // TestNodeReadIndex ensures that node.ReadIndex sends the MsgReadIndex message to the underlying raft.
  151. // It also ensures that ReadState can be read out through ready chan.
  152. func TestNodeReadIndex(t *testing.T) {
  153. msgs := []raftpb.Message{}
  154. appendStep := func(r *raft, m raftpb.Message) error {
  155. msgs = append(msgs, m)
  156. return nil
  157. }
  158. wrs := []ReadState{{Index: uint64(1), RequestCtx: []byte("somedata")}}
  159. n := newNode()
  160. s := NewMemoryStorage()
  161. r := newTestRaft(1, []uint64{1}, 10, 1, s)
  162. r.readStates = wrs
  163. go n.run(r)
  164. n.Campaign(context.TODO())
  165. for {
  166. rd := <-n.Ready()
  167. if !reflect.DeepEqual(rd.ReadStates, wrs) {
  168. t.Errorf("ReadStates = %v, want %v", rd.ReadStates, wrs)
  169. }
  170. s.Append(rd.Entries)
  171. if rd.SoftState.Lead == r.id {
  172. n.Advance()
  173. break
  174. }
  175. n.Advance()
  176. }
  177. r.step = appendStep
  178. wrequestCtx := []byte("somedata2")
  179. n.ReadIndex(context.TODO(), wrequestCtx)
  180. n.Stop()
  181. if len(msgs) != 1 {
  182. t.Fatalf("len(msgs) = %d, want %d", len(msgs), 1)
  183. }
  184. if msgs[0].Type != raftpb.MsgReadIndex {
  185. t.Errorf("msg type = %d, want %d", msgs[0].Type, raftpb.MsgReadIndex)
  186. }
  187. if !bytes.Equal(msgs[0].Entries[0].Data, wrequestCtx) {
  188. t.Errorf("data = %v, want %v", msgs[0].Entries[0].Data, wrequestCtx)
  189. }
  190. }
  191. // TestDisableProposalForwarding ensures that proposals are not forwarded to
  192. // the leader when DisableProposalForwarding is true.
  193. func TestDisableProposalForwarding(t *testing.T) {
  194. r1 := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
  195. r2 := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
  196. cfg3 := newTestConfig(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
  197. cfg3.DisableProposalForwarding = true
  198. r3 := newRaft(cfg3)
  199. nt := newNetwork(r1, r2, r3)
  200. // elect r1 as leader
  201. nt.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgHup})
  202. var testEntries = []raftpb.Entry{{Data: []byte("testdata")}}
  203. // send proposal to r2(follower) where DisableProposalForwarding is false
  204. r2.Step(raftpb.Message{From: 2, To: 2, Type: raftpb.MsgProp, Entries: testEntries})
  205. // verify r2(follower) does forward the proposal when DisableProposalForwarding is false
  206. if len(r2.msgs) != 1 {
  207. t.Fatalf("len(r2.msgs) expected 1, got %d", len(r2.msgs))
  208. }
  209. // send proposal to r3(follower) where DisableProposalForwarding is true
  210. r3.Step(raftpb.Message{From: 3, To: 3, Type: raftpb.MsgProp, Entries: testEntries})
  211. // verify r3(follower) does not forward the proposal when DisableProposalForwarding is true
  212. if len(r3.msgs) != 0 {
  213. t.Fatalf("len(r3.msgs) expected 0, got %d", len(r3.msgs))
  214. }
  215. }
  216. // TestNodeReadIndexToOldLeader ensures that raftpb.MsgReadIndex to old leader
  217. // gets forwarded to the new leader and 'send' method does not attach its term.
  218. func TestNodeReadIndexToOldLeader(t *testing.T) {
  219. r1 := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
  220. r2 := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
  221. r3 := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
  222. nt := newNetwork(r1, r2, r3)
  223. // elect r1 as leader
  224. nt.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgHup})
  225. var testEntries = []raftpb.Entry{{Data: []byte("testdata")}}
  226. // send readindex request to r2(follower)
  227. r2.Step(raftpb.Message{From: 2, To: 2, Type: raftpb.MsgReadIndex, Entries: testEntries})
  228. // verify r2(follower) forwards this message to r1(leader) with term not set
  229. if len(r2.msgs) != 1 {
  230. t.Fatalf("len(r2.msgs) expected 1, got %d", len(r2.msgs))
  231. }
  232. readIndxMsg1 := raftpb.Message{From: 2, To: 1, Type: raftpb.MsgReadIndex, Entries: testEntries}
  233. if !reflect.DeepEqual(r2.msgs[0], readIndxMsg1) {
  234. t.Fatalf("r2.msgs[0] expected %+v, got %+v", readIndxMsg1, r2.msgs[0])
  235. }
  236. // send readindex request to r3(follower)
  237. r3.Step(raftpb.Message{From: 3, To: 3, Type: raftpb.MsgReadIndex, Entries: testEntries})
  238. // verify r3(follower) forwards this message to r1(leader) with term not set as well.
  239. if len(r3.msgs) != 1 {
  240. t.Fatalf("len(r3.msgs) expected 1, got %d", len(r3.msgs))
  241. }
  242. readIndxMsg2 := raftpb.Message{From: 3, To: 1, Type: raftpb.MsgReadIndex, Entries: testEntries}
  243. if !reflect.DeepEqual(r3.msgs[0], readIndxMsg2) {
  244. t.Fatalf("r3.msgs[0] expected %+v, got %+v", readIndxMsg2, r3.msgs[0])
  245. }
  246. // now elect r3 as leader
  247. nt.send(raftpb.Message{From: 3, To: 3, Type: raftpb.MsgHup})
  248. // let r1 steps the two messages previously we got from r2, r3
  249. r1.Step(readIndxMsg1)
  250. r1.Step(readIndxMsg2)
  251. // verify r1(follower) forwards these messages again to r3(new leader)
  252. if len(r1.msgs) != 2 {
  253. t.Fatalf("len(r1.msgs) expected 1, got %d", len(r1.msgs))
  254. }
  255. readIndxMsg3 := raftpb.Message{From: 1, To: 3, Type: raftpb.MsgReadIndex, Entries: testEntries}
  256. if !reflect.DeepEqual(r1.msgs[0], readIndxMsg3) {
  257. t.Fatalf("r1.msgs[0] expected %+v, got %+v", readIndxMsg3, r1.msgs[0])
  258. }
  259. if !reflect.DeepEqual(r1.msgs[1], readIndxMsg3) {
  260. t.Fatalf("r1.msgs[1] expected %+v, got %+v", readIndxMsg3, r1.msgs[1])
  261. }
  262. }
  263. // TestNodeProposeConfig ensures that node.ProposeConfChange sends the given configuration proposal
  264. // to the underlying raft.
  265. func TestNodeProposeConfig(t *testing.T) {
  266. msgs := []raftpb.Message{}
  267. appendStep := func(r *raft, m raftpb.Message) error {
  268. msgs = append(msgs, m)
  269. return nil
  270. }
  271. n := newNode()
  272. s := NewMemoryStorage()
  273. r := newTestRaft(1, []uint64{1}, 10, 1, s)
  274. go n.run(r)
  275. n.Campaign(context.TODO())
  276. for {
  277. rd := <-n.Ready()
  278. s.Append(rd.Entries)
  279. // change the step function to appendStep until this raft becomes leader
  280. if rd.SoftState.Lead == r.id {
  281. r.step = appendStep
  282. n.Advance()
  283. break
  284. }
  285. n.Advance()
  286. }
  287. cc := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1}
  288. ccdata, err := cc.Marshal()
  289. if err != nil {
  290. t.Fatal(err)
  291. }
  292. n.ProposeConfChange(context.TODO(), cc)
  293. n.Stop()
  294. if len(msgs) != 1 {
  295. t.Fatalf("len(msgs) = %d, want %d", len(msgs), 1)
  296. }
  297. if msgs[0].Type != raftpb.MsgProp {
  298. t.Errorf("msg type = %d, want %d", msgs[0].Type, raftpb.MsgProp)
  299. }
  300. if !bytes.Equal(msgs[0].Entries[0].Data, ccdata) {
  301. t.Errorf("data = %v, want %v", msgs[0].Entries[0].Data, ccdata)
  302. }
  303. }
  304. // TestNodeProposeAddDuplicateNode ensures that two proposes to add the same node should
  305. // not affect the later propose to add new node.
  306. func TestNodeProposeAddDuplicateNode(t *testing.T) {
  307. n := newNode()
  308. s := NewMemoryStorage()
  309. r := newTestRaft(1, []uint64{1}, 10, 1, s)
  310. go n.run(r)
  311. n.Campaign(context.TODO())
  312. rdyEntries := make([]raftpb.Entry, 0)
  313. ticker := time.NewTicker(time.Millisecond * 100)
  314. defer ticker.Stop()
  315. done := make(chan struct{})
  316. stop := make(chan struct{})
  317. applyConfChan := make(chan struct{})
  318. go func() {
  319. defer close(done)
  320. for {
  321. select {
  322. case <-stop:
  323. return
  324. case <-ticker.C:
  325. n.Tick()
  326. case rd := <-n.Ready():
  327. s.Append(rd.Entries)
  328. applied := false
  329. for _, e := range rd.Entries {
  330. rdyEntries = append(rdyEntries, e)
  331. switch e.Type {
  332. case raftpb.EntryNormal:
  333. case raftpb.EntryConfChange:
  334. var cc raftpb.ConfChange
  335. cc.Unmarshal(e.Data)
  336. n.ApplyConfChange(cc)
  337. applied = true
  338. }
  339. }
  340. n.Advance()
  341. if applied {
  342. applyConfChan <- struct{}{}
  343. }
  344. }
  345. }
  346. }()
  347. cc1 := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1}
  348. ccdata1, _ := cc1.Marshal()
  349. n.ProposeConfChange(context.TODO(), cc1)
  350. <-applyConfChan
  351. // try add the same node again
  352. n.ProposeConfChange(context.TODO(), cc1)
  353. <-applyConfChan
  354. // the new node join should be ok
  355. cc2 := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 2}
  356. ccdata2, _ := cc2.Marshal()
  357. n.ProposeConfChange(context.TODO(), cc2)
  358. <-applyConfChan
  359. close(stop)
  360. <-done
  361. if len(rdyEntries) != 4 {
  362. t.Errorf("len(entry) = %d, want %d, %v\n", len(rdyEntries), 4, rdyEntries)
  363. }
  364. if !bytes.Equal(rdyEntries[1].Data, ccdata1) {
  365. t.Errorf("data = %v, want %v", rdyEntries[1].Data, ccdata1)
  366. }
  367. if !bytes.Equal(rdyEntries[3].Data, ccdata2) {
  368. t.Errorf("data = %v, want %v", rdyEntries[3].Data, ccdata2)
  369. }
  370. n.Stop()
  371. }
  372. // TestBlockProposal ensures that node will block proposal when it does not
  373. // know who is the current leader; node will accept proposal when it knows
  374. // who is the current leader.
  375. func TestBlockProposal(t *testing.T) {
  376. n := newNode()
  377. r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
  378. go n.run(r)
  379. defer n.Stop()
  380. errc := make(chan error, 1)
  381. go func() {
  382. errc <- n.Propose(context.TODO(), []byte("somedata"))
  383. }()
  384. testutil.WaitSchedule()
  385. select {
  386. case err := <-errc:
  387. t.Errorf("err = %v, want blocking", err)
  388. default:
  389. }
  390. n.Campaign(context.TODO())
  391. select {
  392. case err := <-errc:
  393. if err != nil {
  394. t.Errorf("err = %v, want %v", err, nil)
  395. }
  396. case <-time.After(10 * time.Second):
  397. t.Errorf("blocking proposal, want unblocking")
  398. }
  399. }
  400. func TestNodeProposeWaitDropped(t *testing.T) {
  401. msgs := []raftpb.Message{}
  402. droppingMsg := []byte("test_dropping")
  403. dropStep := func(r *raft, m raftpb.Message) error {
  404. if m.Type == raftpb.MsgProp && strings.Contains(m.String(), string(droppingMsg)) {
  405. t.Logf("dropping message: %v", m.String())
  406. return ErrProposalDropped
  407. }
  408. msgs = append(msgs, m)
  409. return nil
  410. }
  411. n := newNode()
  412. s := NewMemoryStorage()
  413. r := newTestRaft(1, []uint64{1}, 10, 1, s)
  414. go n.run(r)
  415. n.Campaign(context.TODO())
  416. for {
  417. rd := <-n.Ready()
  418. s.Append(rd.Entries)
  419. // change the step function to dropStep until this raft becomes leader
  420. if rd.SoftState.Lead == r.id {
  421. r.step = dropStep
  422. n.Advance()
  423. break
  424. }
  425. n.Advance()
  426. }
  427. proposalTimeout := time.Millisecond * 100
  428. ctx, cancel := context.WithTimeout(context.Background(), proposalTimeout)
  429. // propose with cancel should be cancelled earyly if dropped
  430. err := n.Propose(ctx, droppingMsg)
  431. if err != ErrProposalDropped {
  432. t.Errorf("should drop proposal : %v", err)
  433. }
  434. cancel()
  435. n.Stop()
  436. if len(msgs) != 0 {
  437. t.Fatalf("len(msgs) = %d, want %d", len(msgs), 1)
  438. }
  439. }
  440. // TestNodeTick ensures that node.Tick() will increase the
  441. // elapsed of the underlying raft state machine.
  442. func TestNodeTick(t *testing.T) {
  443. n := newNode()
  444. s := NewMemoryStorage()
  445. r := newTestRaft(1, []uint64{1}, 10, 1, s)
  446. go n.run(r)
  447. elapsed := r.electionElapsed
  448. n.Tick()
  449. for len(n.tickc) != 0 {
  450. time.Sleep(100 * time.Millisecond)
  451. }
  452. n.Stop()
  453. if r.electionElapsed != elapsed+1 {
  454. t.Errorf("elapsed = %d, want %d", r.electionElapsed, elapsed+1)
  455. }
  456. }
  457. // TestNodeStop ensures that node.Stop() blocks until the node has stopped
  458. // processing, and that it is idempotent
  459. func TestNodeStop(t *testing.T) {
  460. n := newNode()
  461. s := NewMemoryStorage()
  462. r := newTestRaft(1, []uint64{1}, 10, 1, s)
  463. donec := make(chan struct{})
  464. go func() {
  465. n.run(r)
  466. close(donec)
  467. }()
  468. status := n.Status()
  469. n.Stop()
  470. select {
  471. case <-donec:
  472. case <-time.After(time.Second):
  473. t.Fatalf("timed out waiting for node to stop!")
  474. }
  475. emptyStatus := Status{}
  476. if reflect.DeepEqual(status, emptyStatus) {
  477. t.Errorf("status = %v, want not empty", status)
  478. }
  479. // Further status should return be empty, the node is stopped.
  480. status = n.Status()
  481. if !reflect.DeepEqual(status, emptyStatus) {
  482. t.Errorf("status = %v, want empty", status)
  483. }
  484. // Subsequent Stops should have no effect.
  485. n.Stop()
  486. }
  487. func TestReadyContainUpdates(t *testing.T) {
  488. tests := []struct {
  489. rd Ready
  490. wcontain bool
  491. }{
  492. {Ready{}, false},
  493. {Ready{SoftState: &SoftState{Lead: 1}}, true},
  494. {Ready{HardState: raftpb.HardState{Vote: 1}}, true},
  495. {Ready{Entries: make([]raftpb.Entry, 1)}, true},
  496. {Ready{CommittedEntries: make([]raftpb.Entry, 1)}, true},
  497. {Ready{Messages: make([]raftpb.Message, 1)}, true},
  498. {Ready{Snapshot: raftpb.Snapshot{Metadata: raftpb.SnapshotMetadata{Index: 1}}}, true},
  499. }
  500. for i, tt := range tests {
  501. if g := tt.rd.containsUpdates(); g != tt.wcontain {
  502. t.Errorf("#%d: containUpdates = %v, want %v", i, g, tt.wcontain)
  503. }
  504. }
  505. }
  506. // TestNodeStart ensures that a node can be started correctly. The node should
  507. // start with correct configuration change entries, and can accept and commit
  508. // proposals.
  509. func TestNodeStart(t *testing.T) {
  510. ctx, cancel := context.WithCancel(context.Background())
  511. defer cancel()
  512. cc := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1}
  513. ccdata, err := cc.Marshal()
  514. if err != nil {
  515. t.Fatalf("unexpected marshal error: %v", err)
  516. }
  517. wants := []Ready{
  518. {
  519. HardState: raftpb.HardState{Term: 1, Commit: 1, Vote: 0},
  520. Entries: []raftpb.Entry{
  521. {Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata},
  522. },
  523. CommittedEntries: []raftpb.Entry{
  524. {Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata},
  525. },
  526. MustSync: true,
  527. },
  528. {
  529. HardState: raftpb.HardState{Term: 2, Commit: 3, Vote: 1},
  530. Entries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
  531. CommittedEntries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
  532. MustSync: true,
  533. },
  534. }
  535. storage := NewMemoryStorage()
  536. c := &Config{
  537. ID: 1,
  538. ElectionTick: 10,
  539. HeartbeatTick: 1,
  540. Storage: storage,
  541. MaxSizePerMsg: noLimit,
  542. MaxInflightMsgs: 256,
  543. }
  544. n := StartNode(c, []Peer{{ID: 1}})
  545. defer n.Stop()
  546. g := <-n.Ready()
  547. if !reflect.DeepEqual(g, wants[0]) {
  548. t.Fatalf("#%d: g = %+v,\n w %+v", 1, g, wants[0])
  549. } else {
  550. storage.Append(g.Entries)
  551. n.Advance()
  552. }
  553. n.Campaign(ctx)
  554. rd := <-n.Ready()
  555. storage.Append(rd.Entries)
  556. n.Advance()
  557. n.Propose(ctx, []byte("foo"))
  558. if g2 := <-n.Ready(); !reflect.DeepEqual(g2, wants[1]) {
  559. t.Errorf("#%d: g = %+v,\n w %+v", 2, g2, wants[1])
  560. } else {
  561. storage.Append(g2.Entries)
  562. n.Advance()
  563. }
  564. select {
  565. case rd := <-n.Ready():
  566. t.Errorf("unexpected Ready: %+v", rd)
  567. case <-time.After(time.Millisecond):
  568. }
  569. }
  570. func TestNodeRestart(t *testing.T) {
  571. entries := []raftpb.Entry{
  572. {Term: 1, Index: 1},
  573. {Term: 1, Index: 2, Data: []byte("foo")},
  574. }
  575. st := raftpb.HardState{Term: 1, Commit: 1}
  576. want := Ready{
  577. HardState: st,
  578. // commit up to index commit index in st
  579. CommittedEntries: entries[:st.Commit],
  580. MustSync: true,
  581. }
  582. storage := NewMemoryStorage()
  583. storage.SetHardState(st)
  584. storage.Append(entries)
  585. c := &Config{
  586. ID: 1,
  587. ElectionTick: 10,
  588. HeartbeatTick: 1,
  589. Storage: storage,
  590. MaxSizePerMsg: noLimit,
  591. MaxInflightMsgs: 256,
  592. }
  593. n := RestartNode(c)
  594. defer n.Stop()
  595. if g := <-n.Ready(); !reflect.DeepEqual(g, want) {
  596. t.Errorf("g = %+v,\n w %+v", g, want)
  597. }
  598. n.Advance()
  599. select {
  600. case rd := <-n.Ready():
  601. t.Errorf("unexpected Ready: %+v", rd)
  602. case <-time.After(time.Millisecond):
  603. }
  604. }
  605. func TestNodeRestartFromSnapshot(t *testing.T) {
  606. snap := raftpb.Snapshot{
  607. Metadata: raftpb.SnapshotMetadata{
  608. ConfState: raftpb.ConfState{Nodes: []uint64{1, 2}},
  609. Index: 2,
  610. Term: 1,
  611. },
  612. }
  613. entries := []raftpb.Entry{
  614. {Term: 1, Index: 3, Data: []byte("foo")},
  615. }
  616. st := raftpb.HardState{Term: 1, Commit: 3}
  617. want := Ready{
  618. HardState: st,
  619. // commit up to index commit index in st
  620. CommittedEntries: entries,
  621. MustSync: true,
  622. }
  623. s := NewMemoryStorage()
  624. s.SetHardState(st)
  625. s.ApplySnapshot(snap)
  626. s.Append(entries)
  627. c := &Config{
  628. ID: 1,
  629. ElectionTick: 10,
  630. HeartbeatTick: 1,
  631. Storage: s,
  632. MaxSizePerMsg: noLimit,
  633. MaxInflightMsgs: 256,
  634. }
  635. n := RestartNode(c)
  636. defer n.Stop()
  637. if g := <-n.Ready(); !reflect.DeepEqual(g, want) {
  638. t.Errorf("g = %+v,\n w %+v", g, want)
  639. } else {
  640. n.Advance()
  641. }
  642. select {
  643. case rd := <-n.Ready():
  644. t.Errorf("unexpected Ready: %+v", rd)
  645. case <-time.After(time.Millisecond):
  646. }
  647. }
  648. func TestNodeAdvance(t *testing.T) {
  649. ctx, cancel := context.WithCancel(context.Background())
  650. defer cancel()
  651. storage := NewMemoryStorage()
  652. c := &Config{
  653. ID: 1,
  654. ElectionTick: 10,
  655. HeartbeatTick: 1,
  656. Storage: storage,
  657. MaxSizePerMsg: noLimit,
  658. MaxInflightMsgs: 256,
  659. }
  660. n := StartNode(c, []Peer{{ID: 1}})
  661. defer n.Stop()
  662. rd := <-n.Ready()
  663. storage.Append(rd.Entries)
  664. n.Advance()
  665. n.Campaign(ctx)
  666. <-n.Ready()
  667. n.Propose(ctx, []byte("foo"))
  668. select {
  669. case rd = <-n.Ready():
  670. t.Fatalf("unexpected Ready before Advance: %+v", rd)
  671. case <-time.After(time.Millisecond):
  672. }
  673. storage.Append(rd.Entries)
  674. n.Advance()
  675. select {
  676. case <-n.Ready():
  677. case <-time.After(100 * time.Millisecond):
  678. t.Errorf("expect Ready after Advance, but there is no Ready available")
  679. }
  680. }
  681. func TestSoftStateEqual(t *testing.T) {
  682. tests := []struct {
  683. st *SoftState
  684. we bool
  685. }{
  686. {&SoftState{}, true},
  687. {&SoftState{Lead: 1}, false},
  688. {&SoftState{RaftState: StateLeader}, false},
  689. }
  690. for i, tt := range tests {
  691. if g := tt.st.equal(&SoftState{}); g != tt.we {
  692. t.Errorf("#%d, equal = %v, want %v", i, g, tt.we)
  693. }
  694. }
  695. }
  696. func TestIsHardStateEqual(t *testing.T) {
  697. tests := []struct {
  698. st raftpb.HardState
  699. we bool
  700. }{
  701. {emptyState, true},
  702. {raftpb.HardState{Vote: 1}, false},
  703. {raftpb.HardState{Commit: 1}, false},
  704. {raftpb.HardState{Term: 1}, false},
  705. }
  706. for i, tt := range tests {
  707. if isHardStateEqual(tt.st, emptyState) != tt.we {
  708. t.Errorf("#%d, equal = %v, want %v", i, isHardStateEqual(tt.st, emptyState), tt.we)
  709. }
  710. }
  711. }
  712. func TestNodeProposeAddLearnerNode(t *testing.T) {
  713. ticker := time.NewTicker(time.Millisecond * 100)
  714. defer ticker.Stop()
  715. n := newNode()
  716. s := NewMemoryStorage()
  717. r := newTestRaft(1, []uint64{1}, 10, 1, s)
  718. go n.run(r)
  719. n.Campaign(context.TODO())
  720. stop := make(chan struct{})
  721. done := make(chan struct{})
  722. applyConfChan := make(chan struct{})
  723. go func() {
  724. defer close(done)
  725. for {
  726. select {
  727. case <-stop:
  728. return
  729. case <-ticker.C:
  730. n.Tick()
  731. case rd := <-n.Ready():
  732. s.Append(rd.Entries)
  733. t.Logf("raft: %v", rd.Entries)
  734. for _, ent := range rd.Entries {
  735. if ent.Type != raftpb.EntryConfChange {
  736. continue
  737. }
  738. var cc raftpb.ConfChange
  739. cc.Unmarshal(ent.Data)
  740. state := n.ApplyConfChange(cc)
  741. if len(state.Learners) == 0 ||
  742. state.Learners[0] != cc.NodeID ||
  743. cc.NodeID != 2 {
  744. t.Errorf("apply conf change should return new added learner: %v", state.String())
  745. }
  746. if len(state.Nodes) != 1 {
  747. t.Errorf("add learner should not change the nodes: %v", state.String())
  748. }
  749. t.Logf("apply raft conf %v changed to: %v", cc, state.String())
  750. applyConfChan <- struct{}{}
  751. }
  752. n.Advance()
  753. }
  754. }
  755. }()
  756. cc := raftpb.ConfChange{Type: raftpb.ConfChangeAddLearnerNode, NodeID: 2}
  757. n.ProposeConfChange(context.TODO(), cc)
  758. <-applyConfChan
  759. close(stop)
  760. <-done
  761. }
  762. func TestAppendPagination(t *testing.T) {
  763. const maxSizePerMsg = 2048
  764. n := newNetworkWithConfig(func(c *Config) {
  765. c.MaxSizePerMsg = maxSizePerMsg
  766. }, nil, nil, nil)
  767. seenFullMessage := false
  768. // Inspect all messages to see that we never exceed the limit, but
  769. // we do see messages of larger than half the limit.
  770. n.msgHook = func(m raftpb.Message) bool {
  771. if m.Type == raftpb.MsgApp {
  772. size := 0
  773. for _, e := range m.Entries {
  774. size += len(e.Data)
  775. }
  776. if size > maxSizePerMsg {
  777. t.Errorf("sent MsgApp that is too large: %d bytes", size)
  778. }
  779. if size > maxSizePerMsg/2 {
  780. seenFullMessage = true
  781. }
  782. }
  783. return true
  784. }
  785. n.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgHup})
  786. // Partition the network while we make our proposals. This forces
  787. // the entries to be batched into larger messages.
  788. n.isolate(1)
  789. blob := []byte(strings.Repeat("a", 1000))
  790. for i := 0; i < 5; i++ {
  791. n.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgProp, Entries: []raftpb.Entry{{Data: blob}}})
  792. }
  793. n.recover()
  794. // After the partition recovers, tick the clock to wake everything
  795. // back up and send the messages.
  796. n.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgBeat})
  797. if !seenFullMessage {
  798. t.Error("didn't see any messages more than half the max size; something is wrong with this test")
  799. }
  800. }
  801. func TestCommitPagination(t *testing.T) {
  802. s := NewMemoryStorage()
  803. cfg := newTestConfig(1, []uint64{1}, 10, 1, s)
  804. cfg.MaxSizePerMsg = 2048
  805. r := newRaft(cfg)
  806. n := newNode()
  807. go n.run(r)
  808. n.Campaign(context.TODO())
  809. rd := readyWithTimeout(&n)
  810. if len(rd.CommittedEntries) != 1 {
  811. t.Fatalf("expected 1 (empty) entry, got %d", len(rd.CommittedEntries))
  812. }
  813. s.Append(rd.Entries)
  814. n.Advance()
  815. blob := []byte(strings.Repeat("a", 1000))
  816. for i := 0; i < 3; i++ {
  817. if err := n.Propose(context.TODO(), blob); err != nil {
  818. t.Fatal(err)
  819. }
  820. }
  821. // The 3 proposals will commit in two batches.
  822. rd = readyWithTimeout(&n)
  823. if len(rd.CommittedEntries) != 2 {
  824. t.Fatalf("expected 2 entries in first batch, got %d", len(rd.CommittedEntries))
  825. }
  826. s.Append(rd.Entries)
  827. n.Advance()
  828. rd = readyWithTimeout(&n)
  829. if len(rd.CommittedEntries) != 1 {
  830. t.Fatalf("expected 1 entry in second batch, got %d", len(rd.CommittedEntries))
  831. }
  832. s.Append(rd.Entries)
  833. n.Advance()
  834. }
  835. type ignoreSizeHintMemStorage struct {
  836. *MemoryStorage
  837. }
  838. func (s *ignoreSizeHintMemStorage) Entries(lo, hi uint64, maxSize uint64) ([]raftpb.Entry, error) {
  839. return s.MemoryStorage.Entries(lo, hi, math.MaxUint64)
  840. }
  841. // TestNodeCommitPaginationAfterRestart regression tests a scenario in which the
  842. // Storage's Entries size limitation is slightly more permissive than Raft's
  843. // internal one. The original bug was the following:
  844. //
  845. // - node learns that index 11 (or 100, doesn't matter) is committed
  846. // - nextEnts returns index 1..10 in CommittedEntries due to size limiting. However,
  847. // index 10 already exceeds maxBytes, due to a user-provided impl of Entries.
  848. // - Commit index gets bumped to 10
  849. // - the node persists the HardState, but crashes before applying the entries
  850. // - upon restart, the storage returns the same entries, but `slice` takes a different code path
  851. // (since it is now called with an upper bound of 10) and removes the last entry.
  852. // - Raft emits a HardState with a regressing commit index.
  853. //
  854. // A simpler version of this test would have the storage return a lot less entries than dictated
  855. // by maxSize (for example, exactly one entry) after the restart, resulting in a larger regression.
  856. // This wouldn't need to exploit anything about Raft-internal code paths to fail.
  857. func TestNodeCommitPaginationAfterRestart(t *testing.T) {
  858. s := &ignoreSizeHintMemStorage{
  859. MemoryStorage: NewMemoryStorage(),
  860. }
  861. persistedHardState := raftpb.HardState{
  862. Term: 1,
  863. Vote: 1,
  864. Commit: 10,
  865. }
  866. s.hardState = persistedHardState
  867. s.ents = make([]raftpb.Entry, 10)
  868. var size uint64
  869. for i := range s.ents {
  870. ent := raftpb.Entry{
  871. Term: 1,
  872. Index: uint64(i + 1),
  873. Type: raftpb.EntryNormal,
  874. Data: []byte("a"),
  875. }
  876. s.ents[i] = ent
  877. size += uint64(ent.Size())
  878. }
  879. cfg := newTestConfig(1, []uint64{1}, 10, 1, s)
  880. // Set a MaxSizePerMsg that would suggest to Raft that the last committed entry should
  881. // not be included in the initial rd.CommittedEntries. However, our storage will ignore
  882. // this and *will* return it (which is how the Commit index ended up being 10 initially).
  883. cfg.MaxSizePerMsg = size - uint64(s.ents[len(s.ents)-1].Size()) - 1
  884. r := newRaft(cfg)
  885. n := newNode()
  886. go n.run(r)
  887. defer n.Stop()
  888. rd := readyWithTimeout(&n)
  889. if !IsEmptyHardState(rd.HardState) && rd.HardState.Commit < persistedHardState.Commit {
  890. t.Errorf("HardState regressed: Commit %d -> %d\nCommitting:\n%+v",
  891. persistedHardState.Commit, rd.HardState.Commit,
  892. DescribeEntries(rd.CommittedEntries, func(data []byte) string { return fmt.Sprintf("%q", data) }),
  893. )
  894. }
  895. }
  896. // TestNodeBoundedLogGrowthWithPartition tests a scenario where a leader is
  897. // partitioned from a quorum of nodes. It verifies that the leader's log is
  898. // protected from unbounded growth even as new entries continue to be proposed.
  899. // This protection is provided by the MaxUncommittedEntriesSize configuration.
  900. func TestNodeBoundedLogGrowthWithPartition(t *testing.T) {
  901. const maxEntries = 16
  902. data := []byte("testdata")
  903. testEntry := raftpb.Entry{Data: data}
  904. maxEntrySize := uint64(maxEntries * PayloadSize(testEntry))
  905. s := NewMemoryStorage()
  906. cfg := newTestConfig(1, []uint64{1}, 10, 1, s)
  907. cfg.MaxUncommittedEntriesSize = maxEntrySize
  908. r := newRaft(cfg)
  909. n := newNode()
  910. go n.run(r)
  911. defer n.Stop()
  912. n.Campaign(context.TODO())
  913. rd := readyWithTimeout(&n)
  914. if len(rd.CommittedEntries) != 1 {
  915. t.Fatalf("expected 1 (empty) entry, got %d", len(rd.CommittedEntries))
  916. }
  917. s.Append(rd.Entries)
  918. n.Advance()
  919. // Simulate a network partition while we make our proposals by never
  920. // committing anything. These proposals should not cause the leader's
  921. // log to grow indefinitely.
  922. for i := 0; i < 1024; i++ {
  923. n.Propose(context.TODO(), data)
  924. }
  925. // Check the size of leader's uncommitted log tail. It should not exceed the
  926. // MaxUncommittedEntriesSize limit.
  927. checkUncommitted := func(exp uint64) {
  928. t.Helper()
  929. if a := r.uncommittedSize; exp != a {
  930. t.Fatalf("expected %d uncommitted entry bytes, found %d", exp, a)
  931. }
  932. }
  933. checkUncommitted(maxEntrySize)
  934. // Recover from the partition. The uncommitted tail of the Raft log should
  935. // disappear as entries are committed.
  936. rd = readyWithTimeout(&n)
  937. if len(rd.CommittedEntries) != maxEntries {
  938. t.Fatalf("expected %d entries, got %d", maxEntries, len(rd.CommittedEntries))
  939. }
  940. s.Append(rd.Entries)
  941. n.Advance()
  942. checkUncommitted(0)
  943. }