node_test.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877
  1. // Copyright 2015 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package raft
  15. import (
  16. "bytes"
  17. "context"
  18. "reflect"
  19. "strings"
  20. "testing"
  21. "time"
  22. "github.com/coreos/etcd/pkg/testutil"
  23. "github.com/coreos/etcd/raft/raftpb"
  24. )
  25. // TestNodeStep ensures that node.Step sends msgProp to propc chan
  26. // and other kinds of messages to recvc chan.
  27. func TestNodeStep(t *testing.T) {
  28. for i, msgn := range raftpb.MessageType_name {
  29. n := &node{
  30. propc: make(chan msgWithResult, 1),
  31. recvc: make(chan raftpb.Message, 1),
  32. }
  33. msgt := raftpb.MessageType(i)
  34. n.Step(context.TODO(), raftpb.Message{Type: msgt})
  35. // Proposal goes to proc chan. Others go to recvc chan.
  36. if msgt == raftpb.MsgProp {
  37. select {
  38. case <-n.propc:
  39. default:
  40. t.Errorf("%d: cannot receive %s on propc chan", msgt, msgn)
  41. }
  42. } else {
  43. if IsLocalMsg(msgt) {
  44. select {
  45. case <-n.recvc:
  46. t.Errorf("%d: step should ignore %s", msgt, msgn)
  47. default:
  48. }
  49. } else {
  50. select {
  51. case <-n.recvc:
  52. default:
  53. t.Errorf("%d: cannot receive %s on recvc chan", msgt, msgn)
  54. }
  55. }
  56. }
  57. }
  58. }
  59. // Cancel and Stop should unblock Step()
  60. func TestNodeStepUnblock(t *testing.T) {
  61. // a node without buffer to block step
  62. n := &node{
  63. propc: make(chan msgWithResult),
  64. done: make(chan struct{}),
  65. }
  66. ctx, cancel := context.WithCancel(context.Background())
  67. stopFunc := func() { close(n.done) }
  68. tests := []struct {
  69. unblock func()
  70. werr error
  71. }{
  72. {stopFunc, ErrStopped},
  73. {cancel, context.Canceled},
  74. }
  75. for i, tt := range tests {
  76. errc := make(chan error, 1)
  77. go func() {
  78. err := n.Step(ctx, raftpb.Message{Type: raftpb.MsgProp})
  79. errc <- err
  80. }()
  81. tt.unblock()
  82. select {
  83. case err := <-errc:
  84. if err != tt.werr {
  85. t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
  86. }
  87. //clean up side-effect
  88. if ctx.Err() != nil {
  89. ctx = context.TODO()
  90. }
  91. select {
  92. case <-n.done:
  93. n.done = make(chan struct{})
  94. default:
  95. }
  96. case <-time.After(1 * time.Second):
  97. t.Fatalf("#%d: failed to unblock step", i)
  98. }
  99. }
  100. }
  101. // TestNodePropose ensures that node.Propose sends the given proposal to the underlying raft.
  102. func TestNodePropose(t *testing.T) {
  103. msgs := []raftpb.Message{}
  104. appendStep := func(r *raft, m raftpb.Message) error {
  105. msgs = append(msgs, m)
  106. return nil
  107. }
  108. n := newNode()
  109. s := NewMemoryStorage()
  110. r := newTestRaft(1, []uint64{1}, 10, 1, s)
  111. go n.run(r)
  112. n.Campaign(context.TODO())
  113. for {
  114. rd := <-n.Ready()
  115. s.Append(rd.Entries)
  116. // change the step function to appendStep until this raft becomes leader
  117. if rd.SoftState.Lead == r.id {
  118. r.step = appendStep
  119. n.Advance()
  120. break
  121. }
  122. n.Advance()
  123. }
  124. n.Propose(context.TODO(), []byte("somedata"))
  125. n.Stop()
  126. if len(msgs) != 1 {
  127. t.Fatalf("len(msgs) = %d, want %d", len(msgs), 1)
  128. }
  129. if msgs[0].Type != raftpb.MsgProp {
  130. t.Errorf("msg type = %d, want %d", msgs[0].Type, raftpb.MsgProp)
  131. }
  132. if !bytes.Equal(msgs[0].Entries[0].Data, []byte("somedata")) {
  133. t.Errorf("data = %v, want %v", msgs[0].Entries[0].Data, []byte("somedata"))
  134. }
  135. }
  136. // TestNodeReadIndex ensures that node.ReadIndex sends the MsgReadIndex message to the underlying raft.
  137. // It also ensures that ReadState can be read out through ready chan.
  138. func TestNodeReadIndex(t *testing.T) {
  139. msgs := []raftpb.Message{}
  140. appendStep := func(r *raft, m raftpb.Message) error {
  141. msgs = append(msgs, m)
  142. return nil
  143. }
  144. wrs := []ReadState{{Index: uint64(1), RequestCtx: []byte("somedata")}}
  145. n := newNode()
  146. s := NewMemoryStorage()
  147. r := newTestRaft(1, []uint64{1}, 10, 1, s)
  148. r.readStates = wrs
  149. go n.run(r)
  150. n.Campaign(context.TODO())
  151. for {
  152. rd := <-n.Ready()
  153. if !reflect.DeepEqual(rd.ReadStates, wrs) {
  154. t.Errorf("ReadStates = %v, want %v", rd.ReadStates, wrs)
  155. }
  156. s.Append(rd.Entries)
  157. if rd.SoftState.Lead == r.id {
  158. n.Advance()
  159. break
  160. }
  161. n.Advance()
  162. }
  163. r.step = appendStep
  164. wrequestCtx := []byte("somedata2")
  165. n.ReadIndex(context.TODO(), wrequestCtx)
  166. n.Stop()
  167. if len(msgs) != 1 {
  168. t.Fatalf("len(msgs) = %d, want %d", len(msgs), 1)
  169. }
  170. if msgs[0].Type != raftpb.MsgReadIndex {
  171. t.Errorf("msg type = %d, want %d", msgs[0].Type, raftpb.MsgReadIndex)
  172. }
  173. if !bytes.Equal(msgs[0].Entries[0].Data, wrequestCtx) {
  174. t.Errorf("data = %v, want %v", msgs[0].Entries[0].Data, wrequestCtx)
  175. }
  176. }
  177. // TestDisableProposalForwarding ensures that proposals are not forwarded to
  178. // the leader when DisableProposalForwarding is true.
  179. func TestDisableProposalForwarding(t *testing.T) {
  180. r1 := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
  181. r2 := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
  182. cfg3 := newTestConfig(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
  183. cfg3.DisableProposalForwarding = true
  184. r3 := newRaft(cfg3)
  185. nt := newNetwork(r1, r2, r3)
  186. // elect r1 as leader
  187. nt.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgHup})
  188. var testEntries = []raftpb.Entry{{Data: []byte("testdata")}}
  189. // send proposal to r2(follower) where DisableProposalForwarding is false
  190. r2.Step(raftpb.Message{From: 2, To: 2, Type: raftpb.MsgProp, Entries: testEntries})
  191. // verify r2(follower) does forward the proposal when DisableProposalForwarding is false
  192. if len(r2.msgs) != 1 {
  193. t.Fatalf("len(r2.msgs) expected 1, got %d", len(r2.msgs))
  194. }
  195. // send proposal to r3(follower) where DisableProposalForwarding is true
  196. r3.Step(raftpb.Message{From: 3, To: 3, Type: raftpb.MsgProp, Entries: testEntries})
  197. // verify r3(follower) does not forward the proposal when DisableProposalForwarding is true
  198. if len(r3.msgs) != 0 {
  199. t.Fatalf("len(r3.msgs) expected 0, got %d", len(r3.msgs))
  200. }
  201. }
  202. // TestNodeReadIndexToOldLeader ensures that raftpb.MsgReadIndex to old leader
  203. // gets forwarded to the new leader and 'send' method does not attach its term.
  204. func TestNodeReadIndexToOldLeader(t *testing.T) {
  205. r1 := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
  206. r2 := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
  207. r3 := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
  208. nt := newNetwork(r1, r2, r3)
  209. // elect r1 as leader
  210. nt.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgHup})
  211. var testEntries = []raftpb.Entry{{Data: []byte("testdata")}}
  212. // send readindex request to r2(follower)
  213. r2.Step(raftpb.Message{From: 2, To: 2, Type: raftpb.MsgReadIndex, Entries: testEntries})
  214. // verify r2(follower) forwards this message to r1(leader) with term not set
  215. if len(r2.msgs) != 1 {
  216. t.Fatalf("len(r2.msgs) expected 1, got %d", len(r2.msgs))
  217. }
  218. readIndxMsg1 := raftpb.Message{From: 2, To: 1, Type: raftpb.MsgReadIndex, Entries: testEntries}
  219. if !reflect.DeepEqual(r2.msgs[0], readIndxMsg1) {
  220. t.Fatalf("r2.msgs[0] expected %+v, got %+v", readIndxMsg1, r2.msgs[0])
  221. }
  222. // send readindex request to r3(follower)
  223. r3.Step(raftpb.Message{From: 3, To: 3, Type: raftpb.MsgReadIndex, Entries: testEntries})
  224. // verify r3(follower) forwards this message to r1(leader) with term not set as well.
  225. if len(r3.msgs) != 1 {
  226. t.Fatalf("len(r3.msgs) expected 1, got %d", len(r3.msgs))
  227. }
  228. readIndxMsg2 := raftpb.Message{From: 3, To: 1, Type: raftpb.MsgReadIndex, Entries: testEntries}
  229. if !reflect.DeepEqual(r3.msgs[0], readIndxMsg2) {
  230. t.Fatalf("r3.msgs[0] expected %+v, got %+v", readIndxMsg2, r3.msgs[0])
  231. }
  232. // now elect r3 as leader
  233. nt.send(raftpb.Message{From: 3, To: 3, Type: raftpb.MsgHup})
  234. // let r1 steps the two messages previously we got from r2, r3
  235. r1.Step(readIndxMsg1)
  236. r1.Step(readIndxMsg2)
  237. // verify r1(follower) forwards these messages again to r3(new leader)
  238. if len(r1.msgs) != 2 {
  239. t.Fatalf("len(r1.msgs) expected 1, got %d", len(r1.msgs))
  240. }
  241. readIndxMsg3 := raftpb.Message{From: 1, To: 3, Type: raftpb.MsgReadIndex, Entries: testEntries}
  242. if !reflect.DeepEqual(r1.msgs[0], readIndxMsg3) {
  243. t.Fatalf("r1.msgs[0] expected %+v, got %+v", readIndxMsg3, r1.msgs[0])
  244. }
  245. if !reflect.DeepEqual(r1.msgs[1], readIndxMsg3) {
  246. t.Fatalf("r1.msgs[1] expected %+v, got %+v", readIndxMsg3, r1.msgs[1])
  247. }
  248. }
  249. // TestNodeProposeConfig ensures that node.ProposeConfChange sends the given configuration proposal
  250. // to the underlying raft.
  251. func TestNodeProposeConfig(t *testing.T) {
  252. msgs := []raftpb.Message{}
  253. appendStep := func(r *raft, m raftpb.Message) error {
  254. msgs = append(msgs, m)
  255. return nil
  256. }
  257. n := newNode()
  258. s := NewMemoryStorage()
  259. r := newTestRaft(1, []uint64{1}, 10, 1, s)
  260. go n.run(r)
  261. n.Campaign(context.TODO())
  262. for {
  263. rd := <-n.Ready()
  264. s.Append(rd.Entries)
  265. // change the step function to appendStep until this raft becomes leader
  266. if rd.SoftState.Lead == r.id {
  267. r.step = appendStep
  268. n.Advance()
  269. break
  270. }
  271. n.Advance()
  272. }
  273. cc := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1}
  274. ccdata, err := cc.Marshal()
  275. if err != nil {
  276. t.Fatal(err)
  277. }
  278. n.ProposeConfChange(context.TODO(), cc)
  279. n.Stop()
  280. if len(msgs) != 1 {
  281. t.Fatalf("len(msgs) = %d, want %d", len(msgs), 1)
  282. }
  283. if msgs[0].Type != raftpb.MsgProp {
  284. t.Errorf("msg type = %d, want %d", msgs[0].Type, raftpb.MsgProp)
  285. }
  286. if !bytes.Equal(msgs[0].Entries[0].Data, ccdata) {
  287. t.Errorf("data = %v, want %v", msgs[0].Entries[0].Data, ccdata)
  288. }
  289. }
  290. // TestNodeProposeAddDuplicateNode ensures that two proposes to add the same node should
  291. // not affect the later propose to add new node.
  292. func TestNodeProposeAddDuplicateNode(t *testing.T) {
  293. n := newNode()
  294. s := NewMemoryStorage()
  295. r := newTestRaft(1, []uint64{1}, 10, 1, s)
  296. go n.run(r)
  297. n.Campaign(context.TODO())
  298. rdyEntries := make([]raftpb.Entry, 0)
  299. ticker := time.NewTicker(time.Millisecond * 100)
  300. defer ticker.Stop()
  301. done := make(chan struct{})
  302. stop := make(chan struct{})
  303. applyConfChan := make(chan struct{})
  304. go func() {
  305. defer close(done)
  306. for {
  307. select {
  308. case <-stop:
  309. return
  310. case <-ticker.C:
  311. n.Tick()
  312. case rd := <-n.Ready():
  313. s.Append(rd.Entries)
  314. applied := false
  315. for _, e := range rd.Entries {
  316. rdyEntries = append(rdyEntries, e)
  317. switch e.Type {
  318. case raftpb.EntryNormal:
  319. case raftpb.EntryConfChange:
  320. var cc raftpb.ConfChange
  321. cc.Unmarshal(e.Data)
  322. n.ApplyConfChange(cc)
  323. applied = true
  324. }
  325. }
  326. n.Advance()
  327. if applied {
  328. applyConfChan <- struct{}{}
  329. }
  330. }
  331. }
  332. }()
  333. cc1 := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1}
  334. ccdata1, _ := cc1.Marshal()
  335. n.ProposeConfChange(context.TODO(), cc1)
  336. <-applyConfChan
  337. // try add the same node again
  338. n.ProposeConfChange(context.TODO(), cc1)
  339. <-applyConfChan
  340. // the new node join should be ok
  341. cc2 := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 2}
  342. ccdata2, _ := cc2.Marshal()
  343. n.ProposeConfChange(context.TODO(), cc2)
  344. <-applyConfChan
  345. close(stop)
  346. <-done
  347. if len(rdyEntries) != 4 {
  348. t.Errorf("len(entry) = %d, want %d, %v\n", len(rdyEntries), 4, rdyEntries)
  349. }
  350. if !bytes.Equal(rdyEntries[1].Data, ccdata1) {
  351. t.Errorf("data = %v, want %v", rdyEntries[1].Data, ccdata1)
  352. }
  353. if !bytes.Equal(rdyEntries[3].Data, ccdata2) {
  354. t.Errorf("data = %v, want %v", rdyEntries[3].Data, ccdata2)
  355. }
  356. n.Stop()
  357. }
  358. // TestBlockProposal ensures that node will block proposal when it does not
  359. // know who is the current leader; node will accept proposal when it knows
  360. // who is the current leader.
  361. func TestBlockProposal(t *testing.T) {
  362. n := newNode()
  363. r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
  364. go n.run(r)
  365. defer n.Stop()
  366. errc := make(chan error, 1)
  367. go func() {
  368. errc <- n.Propose(context.TODO(), []byte("somedata"))
  369. }()
  370. testutil.WaitSchedule()
  371. select {
  372. case err := <-errc:
  373. t.Errorf("err = %v, want blocking", err)
  374. default:
  375. }
  376. n.Campaign(context.TODO())
  377. select {
  378. case err := <-errc:
  379. if err != nil {
  380. t.Errorf("err = %v, want %v", err, nil)
  381. }
  382. case <-time.After(10 * time.Second):
  383. t.Errorf("blocking proposal, want unblocking")
  384. }
  385. }
  386. func TestNodeProposeWaitDropped(t *testing.T) {
  387. msgs := []raftpb.Message{}
  388. droppingMsg := []byte("test_dropping")
  389. dropStep := func(r *raft, m raftpb.Message) error {
  390. if m.Type == raftpb.MsgProp && strings.Contains(m.String(), string(droppingMsg)) {
  391. t.Logf("dropping message: %v", m.String())
  392. return ErrProposalDropped
  393. }
  394. msgs = append(msgs, m)
  395. return nil
  396. }
  397. n := newNode()
  398. s := NewMemoryStorage()
  399. r := newTestRaft(1, []uint64{1}, 10, 1, s)
  400. go n.run(r)
  401. n.Campaign(context.TODO())
  402. for {
  403. rd := <-n.Ready()
  404. s.Append(rd.Entries)
  405. // change the step function to dropStep until this raft becomes leader
  406. if rd.SoftState.Lead == r.id {
  407. r.step = dropStep
  408. n.Advance()
  409. break
  410. }
  411. n.Advance()
  412. }
  413. proposalTimeout := time.Millisecond * 100
  414. ctx, cancel := context.WithTimeout(context.Background(), proposalTimeout)
  415. // propose with cancel should be cancelled earyly if dropped
  416. err := n.Propose(ctx, droppingMsg)
  417. if err != ErrProposalDropped {
  418. t.Errorf("should drop proposal : %v", err)
  419. }
  420. cancel()
  421. n.Stop()
  422. if len(msgs) != 0 {
  423. t.Fatalf("len(msgs) = %d, want %d", len(msgs), 1)
  424. }
  425. }
  426. // TestNodeTick ensures that node.Tick() will increase the
  427. // elapsed of the underlying raft state machine.
  428. func TestNodeTick(t *testing.T) {
  429. n := newNode()
  430. s := NewMemoryStorage()
  431. r := newTestRaft(1, []uint64{1}, 10, 1, s)
  432. go n.run(r)
  433. elapsed := r.electionElapsed
  434. n.Tick()
  435. for len(n.tickc) != 0 {
  436. time.Sleep(100 * time.Millisecond)
  437. }
  438. n.Stop()
  439. if r.electionElapsed != elapsed+1 {
  440. t.Errorf("elapsed = %d, want %d", r.electionElapsed, elapsed+1)
  441. }
  442. }
  443. // TestNodeStop ensures that node.Stop() blocks until the node has stopped
  444. // processing, and that it is idempotent
  445. func TestNodeStop(t *testing.T) {
  446. n := newNode()
  447. s := NewMemoryStorage()
  448. r := newTestRaft(1, []uint64{1}, 10, 1, s)
  449. donec := make(chan struct{})
  450. go func() {
  451. n.run(r)
  452. close(donec)
  453. }()
  454. status := n.Status()
  455. n.Stop()
  456. select {
  457. case <-donec:
  458. case <-time.After(time.Second):
  459. t.Fatalf("timed out waiting for node to stop!")
  460. }
  461. emptyStatus := Status{}
  462. if reflect.DeepEqual(status, emptyStatus) {
  463. t.Errorf("status = %v, want not empty", status)
  464. }
  465. // Further status should return be empty, the node is stopped.
  466. status = n.Status()
  467. if !reflect.DeepEqual(status, emptyStatus) {
  468. t.Errorf("status = %v, want empty", status)
  469. }
  470. // Subsequent Stops should have no effect.
  471. n.Stop()
  472. }
  473. func TestReadyContainUpdates(t *testing.T) {
  474. tests := []struct {
  475. rd Ready
  476. wcontain bool
  477. }{
  478. {Ready{}, false},
  479. {Ready{SoftState: &SoftState{Lead: 1}}, true},
  480. {Ready{HardState: raftpb.HardState{Vote: 1}}, true},
  481. {Ready{Entries: make([]raftpb.Entry, 1)}, true},
  482. {Ready{CommittedEntries: make([]raftpb.Entry, 1)}, true},
  483. {Ready{Messages: make([]raftpb.Message, 1)}, true},
  484. {Ready{Snapshot: raftpb.Snapshot{Metadata: raftpb.SnapshotMetadata{Index: 1}}}, true},
  485. }
  486. for i, tt := range tests {
  487. if g := tt.rd.containsUpdates(); g != tt.wcontain {
  488. t.Errorf("#%d: containUpdates = %v, want %v", i, g, tt.wcontain)
  489. }
  490. }
  491. }
  492. // TestNodeStart ensures that a node can be started correctly. The node should
  493. // start with correct configuration change entries, and can accept and commit
  494. // proposals.
  495. func TestNodeStart(t *testing.T) {
  496. ctx, cancel := context.WithCancel(context.Background())
  497. defer cancel()
  498. cc := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1}
  499. ccdata, err := cc.Marshal()
  500. if err != nil {
  501. t.Fatalf("unexpected marshal error: %v", err)
  502. }
  503. wants := []Ready{
  504. {
  505. HardState: raftpb.HardState{Term: 1, Commit: 1, Vote: 0},
  506. Entries: []raftpb.Entry{
  507. {Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata},
  508. },
  509. CommittedEntries: []raftpb.Entry{
  510. {Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata},
  511. },
  512. MustSync: true,
  513. },
  514. {
  515. HardState: raftpb.HardState{Term: 2, Commit: 3, Vote: 1},
  516. Entries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
  517. CommittedEntries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
  518. MustSync: true,
  519. },
  520. }
  521. storage := NewMemoryStorage()
  522. c := &Config{
  523. ID: 1,
  524. ElectionTick: 10,
  525. HeartbeatTick: 1,
  526. Storage: storage,
  527. MaxSizePerMsg: noLimit,
  528. MaxInflightMsgs: 256,
  529. }
  530. n := StartNode(c, []Peer{{ID: 1}})
  531. defer n.Stop()
  532. g := <-n.Ready()
  533. if !reflect.DeepEqual(g, wants[0]) {
  534. t.Fatalf("#%d: g = %+v,\n w %+v", 1, g, wants[0])
  535. } else {
  536. storage.Append(g.Entries)
  537. n.Advance()
  538. }
  539. n.Campaign(ctx)
  540. rd := <-n.Ready()
  541. storage.Append(rd.Entries)
  542. n.Advance()
  543. n.Propose(ctx, []byte("foo"))
  544. if g2 := <-n.Ready(); !reflect.DeepEqual(g2, wants[1]) {
  545. t.Errorf("#%d: g = %+v,\n w %+v", 2, g2, wants[1])
  546. } else {
  547. storage.Append(g2.Entries)
  548. n.Advance()
  549. }
  550. select {
  551. case rd := <-n.Ready():
  552. t.Errorf("unexpected Ready: %+v", rd)
  553. case <-time.After(time.Millisecond):
  554. }
  555. }
  556. func TestNodeRestart(t *testing.T) {
  557. entries := []raftpb.Entry{
  558. {Term: 1, Index: 1},
  559. {Term: 1, Index: 2, Data: []byte("foo")},
  560. }
  561. st := raftpb.HardState{Term: 1, Commit: 1}
  562. want := Ready{
  563. HardState: st,
  564. // commit up to index commit index in st
  565. CommittedEntries: entries[:st.Commit],
  566. MustSync: true,
  567. }
  568. storage := NewMemoryStorage()
  569. storage.SetHardState(st)
  570. storage.Append(entries)
  571. c := &Config{
  572. ID: 1,
  573. ElectionTick: 10,
  574. HeartbeatTick: 1,
  575. Storage: storage,
  576. MaxSizePerMsg: noLimit,
  577. MaxInflightMsgs: 256,
  578. }
  579. n := RestartNode(c)
  580. defer n.Stop()
  581. if g := <-n.Ready(); !reflect.DeepEqual(g, want) {
  582. t.Errorf("g = %+v,\n w %+v", g, want)
  583. }
  584. n.Advance()
  585. select {
  586. case rd := <-n.Ready():
  587. t.Errorf("unexpected Ready: %+v", rd)
  588. case <-time.After(time.Millisecond):
  589. }
  590. }
  591. func TestNodeRestartFromSnapshot(t *testing.T) {
  592. snap := raftpb.Snapshot{
  593. Metadata: raftpb.SnapshotMetadata{
  594. ConfState: raftpb.ConfState{Nodes: []uint64{1, 2}},
  595. Index: 2,
  596. Term: 1,
  597. },
  598. }
  599. entries := []raftpb.Entry{
  600. {Term: 1, Index: 3, Data: []byte("foo")},
  601. }
  602. st := raftpb.HardState{Term: 1, Commit: 3}
  603. want := Ready{
  604. HardState: st,
  605. // commit up to index commit index in st
  606. CommittedEntries: entries,
  607. MustSync: true,
  608. }
  609. s := NewMemoryStorage()
  610. s.SetHardState(st)
  611. s.ApplySnapshot(snap)
  612. s.Append(entries)
  613. c := &Config{
  614. ID: 1,
  615. ElectionTick: 10,
  616. HeartbeatTick: 1,
  617. Storage: s,
  618. MaxSizePerMsg: noLimit,
  619. MaxInflightMsgs: 256,
  620. }
  621. n := RestartNode(c)
  622. defer n.Stop()
  623. if g := <-n.Ready(); !reflect.DeepEqual(g, want) {
  624. t.Errorf("g = %+v,\n w %+v", g, want)
  625. } else {
  626. n.Advance()
  627. }
  628. select {
  629. case rd := <-n.Ready():
  630. t.Errorf("unexpected Ready: %+v", rd)
  631. case <-time.After(time.Millisecond):
  632. }
  633. }
  634. func TestNodeAdvance(t *testing.T) {
  635. ctx, cancel := context.WithCancel(context.Background())
  636. defer cancel()
  637. storage := NewMemoryStorage()
  638. c := &Config{
  639. ID: 1,
  640. ElectionTick: 10,
  641. HeartbeatTick: 1,
  642. Storage: storage,
  643. MaxSizePerMsg: noLimit,
  644. MaxInflightMsgs: 256,
  645. }
  646. n := StartNode(c, []Peer{{ID: 1}})
  647. defer n.Stop()
  648. rd := <-n.Ready()
  649. storage.Append(rd.Entries)
  650. n.Advance()
  651. n.Campaign(ctx)
  652. <-n.Ready()
  653. n.Propose(ctx, []byte("foo"))
  654. select {
  655. case rd = <-n.Ready():
  656. t.Fatalf("unexpected Ready before Advance: %+v", rd)
  657. case <-time.After(time.Millisecond):
  658. }
  659. storage.Append(rd.Entries)
  660. n.Advance()
  661. select {
  662. case <-n.Ready():
  663. case <-time.After(100 * time.Millisecond):
  664. t.Errorf("expect Ready after Advance, but there is no Ready available")
  665. }
  666. }
  667. func TestSoftStateEqual(t *testing.T) {
  668. tests := []struct {
  669. st *SoftState
  670. we bool
  671. }{
  672. {&SoftState{}, true},
  673. {&SoftState{Lead: 1}, false},
  674. {&SoftState{RaftState: StateLeader}, false},
  675. }
  676. for i, tt := range tests {
  677. if g := tt.st.equal(&SoftState{}); g != tt.we {
  678. t.Errorf("#%d, equal = %v, want %v", i, g, tt.we)
  679. }
  680. }
  681. }
  682. func TestIsHardStateEqual(t *testing.T) {
  683. tests := []struct {
  684. st raftpb.HardState
  685. we bool
  686. }{
  687. {emptyState, true},
  688. {raftpb.HardState{Vote: 1}, false},
  689. {raftpb.HardState{Commit: 1}, false},
  690. {raftpb.HardState{Term: 1}, false},
  691. }
  692. for i, tt := range tests {
  693. if isHardStateEqual(tt.st, emptyState) != tt.we {
  694. t.Errorf("#%d, equal = %v, want %v", i, isHardStateEqual(tt.st, emptyState), tt.we)
  695. }
  696. }
  697. }
  698. func TestNodeProposeAddLearnerNode(t *testing.T) {
  699. ticker := time.NewTicker(time.Millisecond * 100)
  700. defer ticker.Stop()
  701. n := newNode()
  702. s := NewMemoryStorage()
  703. r := newTestRaft(1, []uint64{1}, 10, 1, s)
  704. go n.run(r)
  705. n.Campaign(context.TODO())
  706. stop := make(chan struct{})
  707. done := make(chan struct{})
  708. applyConfChan := make(chan struct{})
  709. go func() {
  710. defer close(done)
  711. for {
  712. select {
  713. case <-stop:
  714. return
  715. case <-ticker.C:
  716. n.Tick()
  717. case rd := <-n.Ready():
  718. s.Append(rd.Entries)
  719. t.Logf("raft: %v", rd.Entries)
  720. for _, ent := range rd.Entries {
  721. if ent.Type != raftpb.EntryConfChange {
  722. continue
  723. }
  724. var cc raftpb.ConfChange
  725. cc.Unmarshal(ent.Data)
  726. state := n.ApplyConfChange(cc)
  727. if len(state.Learners) == 0 ||
  728. state.Learners[0] != cc.NodeID ||
  729. cc.NodeID != 2 {
  730. t.Errorf("apply conf change should return new added learner: %v", state.String())
  731. }
  732. if len(state.Nodes) != 1 {
  733. t.Errorf("add learner should not change the nodes: %v", state.String())
  734. }
  735. t.Logf("apply raft conf %v changed to: %v", cc, state.String())
  736. applyConfChan <- struct{}{}
  737. }
  738. n.Advance()
  739. }
  740. }
  741. }()
  742. cc := raftpb.ConfChange{Type: raftpb.ConfChangeAddLearnerNode, NodeID: 2}
  743. n.ProposeConfChange(context.TODO(), cc)
  744. <-applyConfChan
  745. close(stop)
  746. <-done
  747. }
  748. func TestAppendPagination(t *testing.T) {
  749. const maxSizePerMsg = 2048
  750. n := newNetworkWithConfig(func(c *Config) {
  751. c.MaxSizePerMsg = maxSizePerMsg
  752. }, nil, nil, nil)
  753. seenFullMessage := false
  754. // Inspect all messages to see that we never exceed the limit, but
  755. // we do see messages of larger than half the limit.
  756. n.msgHook = func(m raftpb.Message) bool {
  757. if m.Type == raftpb.MsgApp {
  758. size := 0
  759. for _, e := range m.Entries {
  760. size += len(e.Data)
  761. }
  762. if size > maxSizePerMsg {
  763. t.Errorf("sent MsgApp that is too large: %d bytes", size)
  764. }
  765. if size > maxSizePerMsg/2 {
  766. seenFullMessage = true
  767. }
  768. }
  769. return true
  770. }
  771. n.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgHup})
  772. // Partition the network while we make our proposals. This forces
  773. // the entries to be batched into larger messages.
  774. n.isolate(1)
  775. blob := []byte(strings.Repeat("a", 1000))
  776. for i := 0; i < 5; i++ {
  777. n.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgProp, Entries: []raftpb.Entry{{Data: blob}}})
  778. }
  779. n.recover()
  780. // After the partition recovers, tick the clock to wake everything
  781. // back up and send the messages.
  782. n.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgBeat})
  783. if !seenFullMessage {
  784. t.Error("didn't see any messages more than half the max size; something is wrong with this test")
  785. }
  786. }