rawnode_test.go 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960
  1. // Copyright 2015 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package raft
  15. import (
  16. "bytes"
  17. "context"
  18. "fmt"
  19. "math"
  20. "reflect"
  21. "testing"
  22. "go.etcd.io/etcd/raft/quorum"
  23. pb "go.etcd.io/etcd/raft/raftpb"
  24. "go.etcd.io/etcd/raft/tracker"
  25. )
  26. // rawNodeAdapter is essentially a lint that makes sure that RawNode implements
  27. // "most of" Node. The exceptions (some of which are easy to fix) are listed
  28. // below.
  29. type rawNodeAdapter struct {
  30. *RawNode
  31. }
  32. var _ Node = (*rawNodeAdapter)(nil)
  33. // Node specifies lead, which is pointless, can just be filled in.
  34. func (a *rawNodeAdapter) TransferLeadership(ctx context.Context, lead, transferee uint64) {
  35. a.RawNode.TransferLeader(transferee)
  36. }
  37. // Node has a goroutine, RawNode doesn't need this.
  38. func (a *rawNodeAdapter) Stop() {}
  39. // RawNode returns a *Status.
  40. func (a *rawNodeAdapter) Status() Status { return a.RawNode.Status() }
  41. // RawNode takes a Ready. It doesn't really have to do that I think? It can hold on
  42. // to it internally. But maybe that approach is frail.
  43. func (a *rawNodeAdapter) Advance() { a.RawNode.Advance(Ready{}) }
  44. // RawNode returns a Ready, not a chan of one.
  45. func (a *rawNodeAdapter) Ready() <-chan Ready { return nil }
  46. // Node takes more contexts. Easy enough to fix.
  47. func (a *rawNodeAdapter) Campaign(context.Context) error { return a.RawNode.Campaign() }
  48. func (a *rawNodeAdapter) ReadIndex(_ context.Context, rctx []byte) error {
  49. a.RawNode.ReadIndex(rctx)
  50. // RawNode swallowed the error in ReadIndex, it probably should not do that.
  51. return nil
  52. }
  53. func (a *rawNodeAdapter) Step(_ context.Context, m pb.Message) error { return a.RawNode.Step(m) }
  54. func (a *rawNodeAdapter) Propose(_ context.Context, data []byte) error { return a.RawNode.Propose(data) }
  55. func (a *rawNodeAdapter) ProposeConfChange(_ context.Context, cc pb.ConfChangeI) error {
  56. return a.RawNode.ProposeConfChange(cc)
  57. }
  58. // TestRawNodeStep ensures that RawNode.Step ignore local message.
  59. func TestRawNodeStep(t *testing.T) {
  60. for i, msgn := range pb.MessageType_name {
  61. t.Run(msgn, func(t *testing.T) {
  62. s := NewMemoryStorage()
  63. s.SetHardState(pb.HardState{Term: 1, Commit: 1})
  64. s.Append([]pb.Entry{{Term: 1, Index: 1}})
  65. if err := s.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{
  66. ConfState: pb.ConfState{
  67. Voters: []uint64{1},
  68. },
  69. Index: 1,
  70. Term: 1,
  71. }}); err != nil {
  72. t.Fatal(err)
  73. }
  74. // Append an empty entry to make sure the non-local messages (like
  75. // vote requests) are ignored and don't trigger assertions.
  76. rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, s))
  77. if err != nil {
  78. t.Fatal(err)
  79. }
  80. msgt := pb.MessageType(i)
  81. err = rawNode.Step(pb.Message{Type: msgt})
  82. // LocalMsg should be ignored.
  83. if IsLocalMsg(msgt) {
  84. if err != ErrStepLocalMsg {
  85. t.Errorf("%d: step should ignore %s", msgt, msgn)
  86. }
  87. }
  88. })
  89. }
  90. }
  91. // TestNodeStepUnblock from node_test.go has no equivalent in rawNode because there is
  92. // no goroutine in RawNode.
  93. // TestRawNodeProposeAndConfChange tests the configuration change mechanism. Each
  94. // test case sends a configuration change which is either simple or joint, verifies
  95. // that it applies and that the resulting ConfState matches expectations, and for
  96. // joint configurations makes sure that they are exited successfully.
  97. func TestRawNodeProposeAndConfChange(t *testing.T) {
  98. testCases := []struct {
  99. cc pb.ConfChangeI
  100. exp pb.ConfState
  101. exp2 *pb.ConfState
  102. }{
  103. // V1 config change.
  104. {
  105. pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: 2},
  106. pb.ConfState{Voters: []uint64{1, 2}},
  107. nil,
  108. },
  109. // Proposing the same as a V2 change works just the same, without entering
  110. // a joint config.
  111. {
  112. pb.ConfChangeV2{Changes: []pb.ConfChangeSingle{
  113. {Type: pb.ConfChangeAddNode, NodeID: 2},
  114. },
  115. },
  116. pb.ConfState{Voters: []uint64{1, 2}},
  117. nil,
  118. },
  119. // Ditto if we add it as a learner instead.
  120. {
  121. pb.ConfChangeV2{Changes: []pb.ConfChangeSingle{
  122. {Type: pb.ConfChangeAddLearnerNode, NodeID: 2},
  123. },
  124. },
  125. pb.ConfState{Voters: []uint64{1}, Learners: []uint64{2}},
  126. nil,
  127. },
  128. // We can ask explicitly for joint consensus if we want it.
  129. {
  130. pb.ConfChangeV2{Changes: []pb.ConfChangeSingle{
  131. {Type: pb.ConfChangeAddLearnerNode, NodeID: 2},
  132. },
  133. Transition: pb.ConfChangeTransitionJointExplicit,
  134. },
  135. pb.ConfState{Voters: []uint64{1}, VotersOutgoing: []uint64{1}, Learners: []uint64{2}},
  136. &pb.ConfState{Voters: []uint64{1}, Learners: []uint64{2}},
  137. },
  138. // Ditto, but with implicit transition (the harness checks this).
  139. {
  140. pb.ConfChangeV2{Changes: []pb.ConfChangeSingle{
  141. {Type: pb.ConfChangeAddLearnerNode, NodeID: 2},
  142. },
  143. Transition: pb.ConfChangeTransitionJointImplicit,
  144. },
  145. pb.ConfState{
  146. Voters: []uint64{1}, VotersOutgoing: []uint64{1}, Learners: []uint64{2},
  147. AutoLeave: true,
  148. },
  149. &pb.ConfState{Voters: []uint64{1}, Learners: []uint64{2}},
  150. },
  151. // Add a new node and demote n1. This exercises the interesting case in
  152. // which we really need joint config changes and also need LearnersNext.
  153. {
  154. pb.ConfChangeV2{Changes: []pb.ConfChangeSingle{
  155. {NodeID: 2, Type: pb.ConfChangeAddNode},
  156. {NodeID: 1, Type: pb.ConfChangeAddLearnerNode},
  157. {NodeID: 3, Type: pb.ConfChangeAddLearnerNode},
  158. },
  159. },
  160. pb.ConfState{
  161. Voters: []uint64{2},
  162. VotersOutgoing: []uint64{1},
  163. Learners: []uint64{3},
  164. LearnersNext: []uint64{1},
  165. AutoLeave: true,
  166. },
  167. &pb.ConfState{Voters: []uint64{2}, Learners: []uint64{1, 3}},
  168. },
  169. // Ditto explicit.
  170. {
  171. pb.ConfChangeV2{Changes: []pb.ConfChangeSingle{
  172. {NodeID: 2, Type: pb.ConfChangeAddNode},
  173. {NodeID: 1, Type: pb.ConfChangeAddLearnerNode},
  174. {NodeID: 3, Type: pb.ConfChangeAddLearnerNode},
  175. },
  176. Transition: pb.ConfChangeTransitionJointExplicit,
  177. },
  178. pb.ConfState{
  179. Voters: []uint64{2},
  180. VotersOutgoing: []uint64{1},
  181. Learners: []uint64{3},
  182. LearnersNext: []uint64{1},
  183. },
  184. &pb.ConfState{Voters: []uint64{2}, Learners: []uint64{1, 3}},
  185. },
  186. // Ditto implicit.
  187. {
  188. pb.ConfChangeV2{Changes: []pb.ConfChangeSingle{
  189. {NodeID: 2, Type: pb.ConfChangeAddNode},
  190. {NodeID: 1, Type: pb.ConfChangeAddLearnerNode},
  191. {NodeID: 3, Type: pb.ConfChangeAddLearnerNode},
  192. },
  193. Transition: pb.ConfChangeTransitionJointImplicit,
  194. },
  195. pb.ConfState{
  196. Voters: []uint64{2},
  197. VotersOutgoing: []uint64{1},
  198. Learners: []uint64{3},
  199. LearnersNext: []uint64{1},
  200. AutoLeave: true,
  201. },
  202. &pb.ConfState{Voters: []uint64{2}, Learners: []uint64{1, 3}},
  203. },
  204. }
  205. for _, tc := range testCases {
  206. t.Run("", func(t *testing.T) {
  207. s := NewMemoryStorage()
  208. rawNode, err := NewRawNode(newTestConfig(1, []uint64{1}, 10, 1, s))
  209. if err != nil {
  210. t.Fatal(err)
  211. }
  212. rawNode.Campaign()
  213. proposed := false
  214. var (
  215. lastIndex uint64
  216. ccdata []byte
  217. )
  218. // Propose the ConfChange, wait until it applies, save the resulting
  219. // ConfState.
  220. var cs *pb.ConfState
  221. for cs == nil {
  222. rd := rawNode.Ready()
  223. s.Append(rd.Entries)
  224. for _, ent := range rd.CommittedEntries {
  225. var cc pb.ConfChangeI
  226. if ent.Type == pb.EntryConfChange {
  227. var ccc pb.ConfChange
  228. if err = ccc.Unmarshal(ent.Data); err != nil {
  229. t.Fatal(err)
  230. }
  231. cc = ccc
  232. } else if ent.Type == pb.EntryConfChangeV2 {
  233. var ccc pb.ConfChangeV2
  234. if err = ccc.Unmarshal(ent.Data); err != nil {
  235. t.Fatal(err)
  236. }
  237. cc = ccc
  238. }
  239. if cc != nil {
  240. cs = rawNode.ApplyConfChange(cc)
  241. }
  242. }
  243. rawNode.Advance(rd)
  244. // Once we are the leader, propose a command and a ConfChange.
  245. if !proposed && rd.SoftState.Lead == rawNode.raft.id {
  246. if err = rawNode.Propose([]byte("somedata")); err != nil {
  247. t.Fatal(err)
  248. }
  249. if ccv1, ok := tc.cc.AsV1(); ok {
  250. ccdata, err = ccv1.Marshal()
  251. if err != nil {
  252. t.Fatal(err)
  253. }
  254. rawNode.ProposeConfChange(ccv1)
  255. } else {
  256. ccv2 := tc.cc.AsV2()
  257. ccdata, err = ccv2.Marshal()
  258. if err != nil {
  259. t.Fatal(err)
  260. }
  261. rawNode.ProposeConfChange(ccv2)
  262. }
  263. proposed = true
  264. }
  265. }
  266. // Check that the last index is exactly the conf change we put in,
  267. // down to the bits.
  268. lastIndex, err = s.LastIndex()
  269. if err != nil {
  270. t.Fatal(err)
  271. }
  272. entries, err := s.Entries(lastIndex-1, lastIndex+1, noLimit)
  273. if err != nil {
  274. t.Fatal(err)
  275. }
  276. if len(entries) != 2 {
  277. t.Fatalf("len(entries) = %d, want %d", len(entries), 2)
  278. }
  279. if !bytes.Equal(entries[0].Data, []byte("somedata")) {
  280. t.Errorf("entries[0].Data = %v, want %v", entries[0].Data, []byte("somedata"))
  281. }
  282. typ := pb.EntryConfChange
  283. if _, ok := tc.cc.AsV1(); !ok {
  284. typ = pb.EntryConfChangeV2
  285. }
  286. if entries[1].Type != typ {
  287. t.Fatalf("type = %v, want %v", entries[1].Type, typ)
  288. }
  289. if !bytes.Equal(entries[1].Data, ccdata) {
  290. t.Errorf("data = %v, want %v", entries[1].Data, ccdata)
  291. }
  292. if exp := &tc.exp; !reflect.DeepEqual(exp, cs) {
  293. t.Fatalf("exp:\n%+v\nact:\n%+v", exp, cs)
  294. }
  295. if exp, act := lastIndex, rawNode.raft.pendingConfIndex; exp != act {
  296. t.Fatalf("pendingConfIndex: expected %d, got %d", exp, act)
  297. }
  298. // Move the RawNode along. If the ConfChange was simple, nothing else
  299. // should happen. Otherwise, we're in a joint state, which is either
  300. // left automatically or not. If not, we add the proposal that leaves
  301. // it manually.
  302. rd := rawNode.Ready()
  303. var context []byte
  304. if !tc.exp.AutoLeave {
  305. if len(rd.Entries) > 0 {
  306. t.Fatal("expected no more entries")
  307. }
  308. if tc.exp2 == nil {
  309. return
  310. }
  311. context = []byte("manual")
  312. t.Log("leaving joint state manually")
  313. if err := rawNode.ProposeConfChange(pb.ConfChangeV2{Context: context}); err != nil {
  314. t.Fatal(err)
  315. }
  316. rd = rawNode.Ready()
  317. }
  318. // Check that the right ConfChange comes out.
  319. if len(rd.Entries) != 1 || rd.Entries[0].Type != pb.EntryConfChangeV2 {
  320. t.Fatalf("expected exactly one more entry, got %+v", rd)
  321. }
  322. var cc pb.ConfChangeV2
  323. if err := cc.Unmarshal(rd.Entries[0].Data); err != nil {
  324. t.Fatal(err)
  325. }
  326. if !reflect.DeepEqual(cc, pb.ConfChangeV2{Context: context}) {
  327. t.Fatalf("expected zero ConfChangeV2, got %+v", cc)
  328. }
  329. // Lie and pretend the ConfChange applied. It won't do so because now
  330. // we require the joint quorum and we're only running one node.
  331. cs = rawNode.ApplyConfChange(cc)
  332. if exp := tc.exp2; !reflect.DeepEqual(exp, cs) {
  333. t.Fatalf("exp:\n%+v\nact:\n%+v", exp, cs)
  334. }
  335. })
  336. }
  337. }
  338. // TestRawNodeProposeAddDuplicateNode ensures that two proposes to add the same node should
  339. // not affect the later propose to add new node.
  340. func TestRawNodeProposeAddDuplicateNode(t *testing.T) {
  341. s := NewMemoryStorage()
  342. rawNode, err := NewRawNode(newTestConfig(1, []uint64{1}, 10, 1, s))
  343. if err != nil {
  344. t.Fatal(err)
  345. }
  346. rd := rawNode.Ready()
  347. s.Append(rd.Entries)
  348. rawNode.Advance(rd)
  349. rawNode.Campaign()
  350. for {
  351. rd = rawNode.Ready()
  352. s.Append(rd.Entries)
  353. if rd.SoftState.Lead == rawNode.raft.id {
  354. rawNode.Advance(rd)
  355. break
  356. }
  357. rawNode.Advance(rd)
  358. }
  359. proposeConfChangeAndApply := func(cc pb.ConfChange) {
  360. rawNode.ProposeConfChange(cc)
  361. rd = rawNode.Ready()
  362. s.Append(rd.Entries)
  363. for _, entry := range rd.CommittedEntries {
  364. if entry.Type == pb.EntryConfChange {
  365. var cc pb.ConfChange
  366. cc.Unmarshal(entry.Data)
  367. rawNode.ApplyConfChange(cc)
  368. }
  369. }
  370. rawNode.Advance(rd)
  371. }
  372. cc1 := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: 1}
  373. ccdata1, err := cc1.Marshal()
  374. if err != nil {
  375. t.Fatal(err)
  376. }
  377. proposeConfChangeAndApply(cc1)
  378. // try to add the same node again
  379. proposeConfChangeAndApply(cc1)
  380. // the new node join should be ok
  381. cc2 := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: 2}
  382. ccdata2, err := cc2.Marshal()
  383. if err != nil {
  384. t.Fatal(err)
  385. }
  386. proposeConfChangeAndApply(cc2)
  387. lastIndex, err := s.LastIndex()
  388. if err != nil {
  389. t.Fatal(err)
  390. }
  391. // the last three entries should be: ConfChange cc1, cc1, cc2
  392. entries, err := s.Entries(lastIndex-2, lastIndex+1, noLimit)
  393. if err != nil {
  394. t.Fatal(err)
  395. }
  396. if len(entries) != 3 {
  397. t.Fatalf("len(entries) = %d, want %d", len(entries), 3)
  398. }
  399. if !bytes.Equal(entries[0].Data, ccdata1) {
  400. t.Errorf("entries[0].Data = %v, want %v", entries[0].Data, ccdata1)
  401. }
  402. if !bytes.Equal(entries[2].Data, ccdata2) {
  403. t.Errorf("entries[2].Data = %v, want %v", entries[2].Data, ccdata2)
  404. }
  405. }
  406. // TestRawNodeReadIndex ensures that Rawnode.ReadIndex sends the MsgReadIndex message
  407. // to the underlying raft. It also ensures that ReadState can be read out.
  408. func TestRawNodeReadIndex(t *testing.T) {
  409. msgs := []pb.Message{}
  410. appendStep := func(r *raft, m pb.Message) error {
  411. msgs = append(msgs, m)
  412. return nil
  413. }
  414. wrs := []ReadState{{Index: uint64(1), RequestCtx: []byte("somedata")}}
  415. s := NewMemoryStorage()
  416. c := newTestConfig(1, []uint64{1}, 10, 1, s)
  417. rawNode, err := NewRawNode(c)
  418. if err != nil {
  419. t.Fatal(err)
  420. }
  421. rawNode.raft.readStates = wrs
  422. // ensure the ReadStates can be read out
  423. hasReady := rawNode.HasReady()
  424. if !hasReady {
  425. t.Errorf("HasReady() returns %t, want %t", hasReady, true)
  426. }
  427. rd := rawNode.Ready()
  428. if !reflect.DeepEqual(rd.ReadStates, wrs) {
  429. t.Errorf("ReadStates = %d, want %d", rd.ReadStates, wrs)
  430. }
  431. s.Append(rd.Entries)
  432. rawNode.Advance(rd)
  433. // ensure raft.readStates is reset after advance
  434. if rawNode.raft.readStates != nil {
  435. t.Errorf("readStates = %v, want %v", rawNode.raft.readStates, nil)
  436. }
  437. wrequestCtx := []byte("somedata2")
  438. rawNode.Campaign()
  439. for {
  440. rd = rawNode.Ready()
  441. s.Append(rd.Entries)
  442. if rd.SoftState.Lead == rawNode.raft.id {
  443. rawNode.Advance(rd)
  444. // Once we are the leader, issue a ReadIndex request
  445. rawNode.raft.step = appendStep
  446. rawNode.ReadIndex(wrequestCtx)
  447. break
  448. }
  449. rawNode.Advance(rd)
  450. }
  451. // ensure that MsgReadIndex message is sent to the underlying raft
  452. if len(msgs) != 1 {
  453. t.Fatalf("len(msgs) = %d, want %d", len(msgs), 1)
  454. }
  455. if msgs[0].Type != pb.MsgReadIndex {
  456. t.Errorf("msg type = %d, want %d", msgs[0].Type, pb.MsgReadIndex)
  457. }
  458. if !bytes.Equal(msgs[0].Entries[0].Data, wrequestCtx) {
  459. t.Errorf("data = %v, want %v", msgs[0].Entries[0].Data, wrequestCtx)
  460. }
  461. }
  462. // TestBlockProposal from node_test.go has no equivalent in rawNode because there is
  463. // no leader check in RawNode.
  464. // TestNodeTick from node_test.go has no equivalent in rawNode because
  465. // it reaches into the raft object which is not exposed.
  466. // TestNodeStop from node_test.go has no equivalent in rawNode because there is
  467. // no goroutine in RawNode.
  468. // TestRawNodeStart ensures that a node can be started correctly. Note that RawNode
  469. // requires the application to bootstrap the state, i.e. it does not accept peers
  470. // and will not create faux configuration change entries.
  471. func TestRawNodeStart(t *testing.T) {
  472. want := Ready{
  473. SoftState: &SoftState{Lead: 1, RaftState: StateLeader},
  474. HardState: pb.HardState{Term: 1, Commit: 3, Vote: 1},
  475. Entries: []pb.Entry{
  476. {Term: 1, Index: 2, Data: nil}, // empty entry
  477. {Term: 1, Index: 3, Data: []byte("foo")}, // empty entry
  478. },
  479. CommittedEntries: []pb.Entry{
  480. {Term: 1, Index: 2, Data: nil}, // empty entry
  481. {Term: 1, Index: 3, Data: []byte("foo")}, // empty entry
  482. },
  483. MustSync: true,
  484. }
  485. storage := NewMemoryStorage()
  486. storage.ents[0].Index = 1
  487. // TODO(tbg): this is a first prototype of what bootstrapping could look
  488. // like (without the annoying faux ConfChanges). We want to persist a
  489. // ConfState at some index and make sure that this index can't be reached
  490. // from log position 1, so that followers are forced to pick up the
  491. // ConfState in order to move away from log position 1 (unless they got
  492. // bootstrapped in the same way already). Failing to do so would mean that
  493. // followers diverge from the bootstrapped nodes and don't learn about the
  494. // initial config.
  495. //
  496. // NB: this is exactly what CockroachDB does. The Raft log really begins at
  497. // index 10, so empty followers (at index 1) always need a snapshot first.
  498. type appenderStorage interface {
  499. Storage
  500. ApplySnapshot(pb.Snapshot) error
  501. }
  502. bootstrap := func(storage appenderStorage, cs pb.ConfState) error {
  503. if len(cs.Voters) == 0 {
  504. return fmt.Errorf("no voters specified")
  505. }
  506. fi, err := storage.FirstIndex()
  507. if err != nil {
  508. return err
  509. }
  510. if fi < 2 {
  511. return fmt.Errorf("FirstIndex >= 2 is prerequisite for bootstrap")
  512. }
  513. if _, err = storage.Entries(fi, fi, math.MaxUint64); err == nil {
  514. // TODO(tbg): match exact error
  515. return fmt.Errorf("should not have been able to load first index")
  516. }
  517. li, err := storage.LastIndex()
  518. if err != nil {
  519. return err
  520. }
  521. if _, err = storage.Entries(li, li, math.MaxUint64); err == nil {
  522. return fmt.Errorf("should not have been able to load last index")
  523. }
  524. hs, ics, err := storage.InitialState()
  525. if err != nil {
  526. return err
  527. }
  528. if !IsEmptyHardState(hs) {
  529. return fmt.Errorf("HardState not empty")
  530. }
  531. if len(ics.Voters) != 0 {
  532. return fmt.Errorf("ConfState not empty")
  533. }
  534. meta := pb.SnapshotMetadata{
  535. Index: 1,
  536. Term: 0,
  537. ConfState: cs,
  538. }
  539. snap := pb.Snapshot{Metadata: meta}
  540. return storage.ApplySnapshot(snap)
  541. }
  542. if err := bootstrap(storage, pb.ConfState{Voters: []uint64{1}}); err != nil {
  543. t.Fatal(err)
  544. }
  545. rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, storage))
  546. if err != nil {
  547. t.Fatal(err)
  548. }
  549. if rawNode.HasReady() {
  550. t.Fatalf("unexpected ready: %+v", rawNode.Ready())
  551. }
  552. rawNode.Campaign()
  553. rawNode.Propose([]byte("foo"))
  554. if !rawNode.HasReady() {
  555. t.Fatal("expected a Ready")
  556. }
  557. rd := rawNode.Ready()
  558. storage.Append(rd.Entries)
  559. rawNode.Advance(rd)
  560. rd.SoftState, want.SoftState = nil, nil
  561. if !reflect.DeepEqual(rd, want) {
  562. t.Fatalf("unexpected Ready:\n%+v\nvs\n%+v", rd, want)
  563. }
  564. if rawNode.HasReady() {
  565. t.Errorf("unexpected Ready: %+v", rawNode.Ready())
  566. }
  567. }
  568. func TestRawNodeRestart(t *testing.T) {
  569. entries := []pb.Entry{
  570. {Term: 1, Index: 1},
  571. {Term: 1, Index: 2, Data: []byte("foo")},
  572. }
  573. st := pb.HardState{Term: 1, Commit: 1}
  574. want := Ready{
  575. HardState: emptyState,
  576. // commit up to commit index in st
  577. CommittedEntries: entries[:st.Commit],
  578. MustSync: false,
  579. }
  580. storage := NewMemoryStorage()
  581. storage.SetHardState(st)
  582. storage.Append(entries)
  583. rawNode, err := NewRawNode(newTestConfig(1, []uint64{1}, 10, 1, storage))
  584. if err != nil {
  585. t.Fatal(err)
  586. }
  587. rd := rawNode.Ready()
  588. if !reflect.DeepEqual(rd, want) {
  589. t.Errorf("g = %+v,\n w %+v", rd, want)
  590. }
  591. rawNode.Advance(rd)
  592. if rawNode.HasReady() {
  593. t.Errorf("unexpected Ready: %+v", rawNode.Ready())
  594. }
  595. }
  596. func TestRawNodeRestartFromSnapshot(t *testing.T) {
  597. snap := pb.Snapshot{
  598. Metadata: pb.SnapshotMetadata{
  599. ConfState: pb.ConfState{Voters: []uint64{1, 2}},
  600. Index: 2,
  601. Term: 1,
  602. },
  603. }
  604. entries := []pb.Entry{
  605. {Term: 1, Index: 3, Data: []byte("foo")},
  606. }
  607. st := pb.HardState{Term: 1, Commit: 3}
  608. want := Ready{
  609. HardState: emptyState,
  610. // commit up to commit index in st
  611. CommittedEntries: entries,
  612. MustSync: false,
  613. }
  614. s := NewMemoryStorage()
  615. s.SetHardState(st)
  616. s.ApplySnapshot(snap)
  617. s.Append(entries)
  618. rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, s))
  619. if err != nil {
  620. t.Fatal(err)
  621. }
  622. if rd := rawNode.Ready(); !reflect.DeepEqual(rd, want) {
  623. t.Errorf("g = %+v,\n w %+v", rd, want)
  624. } else {
  625. rawNode.Advance(rd)
  626. }
  627. if rawNode.HasReady() {
  628. t.Errorf("unexpected Ready: %+v", rawNode.HasReady())
  629. }
  630. }
  631. // TestNodeAdvance from node_test.go has no equivalent in rawNode because there is
  632. // no dependency check between Ready() and Advance()
  633. func TestRawNodeStatus(t *testing.T) {
  634. s := NewMemoryStorage()
  635. rn, err := NewRawNode(newTestConfig(1, []uint64{1}, 10, 1, s))
  636. if err != nil {
  637. t.Fatal(err)
  638. }
  639. if status := rn.Status(); status.Progress != nil {
  640. t.Fatalf("expected no Progress because not leader: %+v", status.Progress)
  641. }
  642. if err := rn.Campaign(); err != nil {
  643. t.Fatal(err)
  644. }
  645. status := rn.Status()
  646. if status.Lead != 1 {
  647. t.Fatal("not lead")
  648. }
  649. if status.RaftState != StateLeader {
  650. t.Fatal("not leader")
  651. }
  652. if exp, act := *rn.raft.prs.Progress[1], status.Progress[1]; !reflect.DeepEqual(exp, act) {
  653. t.Fatalf("want: %+v\ngot: %+v", exp, act)
  654. }
  655. expCfg := tracker.Config{Voters: quorum.JointConfig{
  656. quorum.MajorityConfig{1: {}},
  657. nil,
  658. }}
  659. if !reflect.DeepEqual(expCfg, status.Config) {
  660. t.Fatalf("want: %+v\ngot: %+v", expCfg, status.Config)
  661. }
  662. }
  663. // TestRawNodeCommitPaginationAfterRestart is the RawNode version of
  664. // TestNodeCommitPaginationAfterRestart. The anomaly here was even worse as the
  665. // Raft group would forget to apply entries:
  666. //
  667. // - node learns that index 11 is committed
  668. // - nextEnts returns index 1..10 in CommittedEntries (but index 10 already
  669. // exceeds maxBytes), which isn't noticed internally by Raft
  670. // - Commit index gets bumped to 10
  671. // - the node persists the HardState, but crashes before applying the entries
  672. // - upon restart, the storage returns the same entries, but `slice` takes a
  673. // different code path and removes the last entry.
  674. // - Raft does not emit a HardState, but when the app calls Advance(), it bumps
  675. // its internal applied index cursor to 10 (when it should be 9)
  676. // - the next Ready asks the app to apply index 11 (omitting index 10), losing a
  677. // write.
  678. func TestRawNodeCommitPaginationAfterRestart(t *testing.T) {
  679. s := &ignoreSizeHintMemStorage{
  680. MemoryStorage: NewMemoryStorage(),
  681. }
  682. persistedHardState := pb.HardState{
  683. Term: 1,
  684. Vote: 1,
  685. Commit: 10,
  686. }
  687. s.hardState = persistedHardState
  688. s.ents = make([]pb.Entry, 10)
  689. var size uint64
  690. for i := range s.ents {
  691. ent := pb.Entry{
  692. Term: 1,
  693. Index: uint64(i + 1),
  694. Type: pb.EntryNormal,
  695. Data: []byte("a"),
  696. }
  697. s.ents[i] = ent
  698. size += uint64(ent.Size())
  699. }
  700. cfg := newTestConfig(1, []uint64{1}, 10, 1, s)
  701. // Set a MaxSizePerMsg that would suggest to Raft that the last committed entry should
  702. // not be included in the initial rd.CommittedEntries. However, our storage will ignore
  703. // this and *will* return it (which is how the Commit index ended up being 10 initially).
  704. cfg.MaxSizePerMsg = size - uint64(s.ents[len(s.ents)-1].Size()) - 1
  705. s.ents = append(s.ents, pb.Entry{
  706. Term: 1,
  707. Index: uint64(11),
  708. Type: pb.EntryNormal,
  709. Data: []byte("boom"),
  710. })
  711. rawNode, err := NewRawNode(cfg)
  712. if err != nil {
  713. t.Fatal(err)
  714. }
  715. for highestApplied := uint64(0); highestApplied != 11; {
  716. rd := rawNode.Ready()
  717. n := len(rd.CommittedEntries)
  718. if n == 0 {
  719. t.Fatalf("stopped applying entries at index %d", highestApplied)
  720. }
  721. if next := rd.CommittedEntries[0].Index; highestApplied != 0 && highestApplied+1 != next {
  722. t.Fatalf("attempting to apply index %d after index %d, leaving a gap", next, highestApplied)
  723. }
  724. highestApplied = rd.CommittedEntries[n-1].Index
  725. rawNode.Advance(rd)
  726. rawNode.Step(pb.Message{
  727. Type: pb.MsgHeartbeat,
  728. To: 1,
  729. From: 1, // illegal, but we get away with it
  730. Term: 1,
  731. Commit: 11,
  732. })
  733. }
  734. }
  735. // TestRawNodeBoundedLogGrowthWithPartition tests a scenario where a leader is
  736. // partitioned from a quorum of nodes. It verifies that the leader's log is
  737. // protected from unbounded growth even as new entries continue to be proposed.
  738. // This protection is provided by the MaxUncommittedEntriesSize configuration.
  739. func TestRawNodeBoundedLogGrowthWithPartition(t *testing.T) {
  740. const maxEntries = 16
  741. data := []byte("testdata")
  742. testEntry := pb.Entry{Data: data}
  743. maxEntrySize := uint64(maxEntries * PayloadSize(testEntry))
  744. s := NewMemoryStorage()
  745. cfg := newTestConfig(1, []uint64{1}, 10, 1, s)
  746. cfg.MaxUncommittedEntriesSize = maxEntrySize
  747. rawNode, err := NewRawNode(cfg)
  748. if err != nil {
  749. t.Fatal(err)
  750. }
  751. rd := rawNode.Ready()
  752. s.Append(rd.Entries)
  753. rawNode.Advance(rd)
  754. // Become the leader.
  755. rawNode.Campaign()
  756. for {
  757. rd = rawNode.Ready()
  758. s.Append(rd.Entries)
  759. if rd.SoftState.Lead == rawNode.raft.id {
  760. rawNode.Advance(rd)
  761. break
  762. }
  763. rawNode.Advance(rd)
  764. }
  765. // Simulate a network partition while we make our proposals by never
  766. // committing anything. These proposals should not cause the leader's
  767. // log to grow indefinitely.
  768. for i := 0; i < 1024; i++ {
  769. rawNode.Propose(data)
  770. }
  771. // Check the size of leader's uncommitted log tail. It should not exceed the
  772. // MaxUncommittedEntriesSize limit.
  773. checkUncommitted := func(exp uint64) {
  774. t.Helper()
  775. if a := rawNode.raft.uncommittedSize; exp != a {
  776. t.Fatalf("expected %d uncommitted entry bytes, found %d", exp, a)
  777. }
  778. }
  779. checkUncommitted(maxEntrySize)
  780. // Recover from the partition. The uncommitted tail of the Raft log should
  781. // disappear as entries are committed.
  782. rd = rawNode.Ready()
  783. if len(rd.CommittedEntries) != maxEntries {
  784. t.Fatalf("expected %d entries, got %d", maxEntries, len(rd.CommittedEntries))
  785. }
  786. s.Append(rd.Entries)
  787. rawNode.Advance(rd)
  788. checkUncommitted(0)
  789. }
  790. func BenchmarkStatus(b *testing.B) {
  791. setup := func(members int) *RawNode {
  792. peers := make([]uint64, members)
  793. for i := range peers {
  794. peers[i] = uint64(i + 1)
  795. }
  796. cfg := newTestConfig(1, peers, 3, 1, NewMemoryStorage())
  797. cfg.Logger = discardLogger
  798. r := newRaft(cfg)
  799. r.becomeFollower(1, 1)
  800. r.becomeCandidate()
  801. r.becomeLeader()
  802. return &RawNode{raft: r}
  803. }
  804. for _, members := range []int{1, 3, 5, 100} {
  805. b.Run(fmt.Sprintf("members=%d", members), func(b *testing.B) {
  806. rn := setup(members)
  807. b.Run("Status", func(b *testing.B) {
  808. b.ReportAllocs()
  809. for i := 0; i < b.N; i++ {
  810. _ = rn.Status()
  811. }
  812. })
  813. b.Run("Status-example", func(b *testing.B) {
  814. b.ReportAllocs()
  815. for i := 0; i < b.N; i++ {
  816. s := rn.Status()
  817. var n uint64
  818. for _, pr := range s.Progress {
  819. n += pr.Match
  820. }
  821. _ = n
  822. }
  823. })
  824. b.Run("BasicStatus", func(b *testing.B) {
  825. b.ReportAllocs()
  826. for i := 0; i < b.N; i++ {
  827. _ = rn.BasicStatus()
  828. }
  829. })
  830. b.Run("WithProgress", func(b *testing.B) {
  831. b.ReportAllocs()
  832. visit := func(uint64, ProgressType, tracker.Progress) {}
  833. for i := 0; i < b.N; i++ {
  834. rn.WithProgress(visit)
  835. }
  836. })
  837. b.Run("WithProgress-example", func(b *testing.B) {
  838. b.ReportAllocs()
  839. for i := 0; i < b.N; i++ {
  840. var n uint64
  841. visit := func(_ uint64, _ ProgressType, pr tracker.Progress) {
  842. n += pr.Match
  843. }
  844. rn.WithProgress(visit)
  845. _ = n
  846. }
  847. })
  848. })
  849. }
  850. }
  851. func TestRawNodeConsumeReady(t *testing.T) {
  852. // Check that readyWithoutAccept() does not call acceptReady (which resets
  853. // the messages) but Ready() does.
  854. s := NewMemoryStorage()
  855. rn := newTestRawNode(1, []uint64{1}, 3, 1, s)
  856. m1 := pb.Message{Context: []byte("foo")}
  857. m2 := pb.Message{Context: []byte("bar")}
  858. // Inject first message, make sure it's visible via readyWithoutAccept.
  859. rn.raft.msgs = append(rn.raft.msgs, m1)
  860. rd := rn.readyWithoutAccept()
  861. if len(rd.Messages) != 1 || !reflect.DeepEqual(rd.Messages[0], m1) {
  862. t.Fatalf("expected only m1 sent, got %+v", rd.Messages)
  863. }
  864. if len(rn.raft.msgs) != 1 || !reflect.DeepEqual(rn.raft.msgs[0], m1) {
  865. t.Fatalf("expected only m1 in raft.msgs, got %+v", rn.raft.msgs)
  866. }
  867. // Now call Ready() which should move the message into the Ready (as opposed
  868. // to leaving it in both places).
  869. rd = rn.Ready()
  870. if len(rn.raft.msgs) > 0 {
  871. t.Fatalf("messages not reset: %+v", rn.raft.msgs)
  872. }
  873. if len(rd.Messages) != 1 || !reflect.DeepEqual(rd.Messages[0], m1) {
  874. t.Fatalf("expected only m1 sent, got %+v", rd.Messages)
  875. }
  876. // Add a message to raft to make sure that Advance() doesn't drop it.
  877. rn.raft.msgs = append(rn.raft.msgs, m2)
  878. rn.Advance(rd)
  879. if len(rn.raft.msgs) != 1 || !reflect.DeepEqual(rn.raft.msgs[0], m2) {
  880. t.Fatalf("expected only m2 in raft.msgs, got %+v", rn.raft.msgs)
  881. }
  882. }