rawnode_test.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681
  1. // Copyright 2015 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package raft
  15. import (
  16. "bytes"
  17. "context"
  18. "fmt"
  19. "reflect"
  20. "testing"
  21. "go.etcd.io/etcd/raft/quorum"
  22. "go.etcd.io/etcd/raft/raftpb"
  23. "go.etcd.io/etcd/raft/tracker"
  24. )
  25. // rawNodeAdapter is essentially a lint that makes sure that RawNode implements
  26. // "most of" Node. The exceptions (some of which are easy to fix) are listed
  27. // below.
  28. type rawNodeAdapter struct {
  29. *RawNode
  30. }
  31. var _ Node = (*rawNodeAdapter)(nil)
  32. // Node specifies lead, which is pointless, can just be filled in.
  33. func (a *rawNodeAdapter) TransferLeadership(ctx context.Context, lead, transferee uint64) {
  34. a.RawNode.TransferLeader(transferee)
  35. }
  36. // Node has a goroutine, RawNode doesn't need this.
  37. func (a *rawNodeAdapter) Stop() {}
  38. // RawNode returns a *Status.
  39. func (a *rawNodeAdapter) Status() Status { return *a.RawNode.Status() }
  40. // RawNode takes a Ready. It doesn't really have to do that I think? It can hold on
  41. // to it internally. But maybe that approach is frail.
  42. func (a *rawNodeAdapter) Advance() { a.RawNode.Advance(Ready{}) }
  43. // RawNode returns a Ready, not a chan of one.
  44. func (a *rawNodeAdapter) Ready() <-chan Ready { return nil }
  45. // Node takes more contexts. Easy enough to fix.
  46. func (a *rawNodeAdapter) Campaign(context.Context) error { return a.RawNode.Campaign() }
  47. func (a *rawNodeAdapter) ReadIndex(_ context.Context, rctx []byte) error {
  48. a.RawNode.ReadIndex(rctx)
  49. // RawNode swallowed the error in ReadIndex, it probably should not do that.
  50. return nil
  51. }
  52. func (a *rawNodeAdapter) Step(_ context.Context, m raftpb.Message) error { return a.RawNode.Step(m) }
  53. func (a *rawNodeAdapter) Propose(_ context.Context, data []byte) error { return a.RawNode.Propose(data) }
  54. func (a *rawNodeAdapter) ProposeConfChange(_ context.Context, cc raftpb.ConfChange) error {
  55. return a.RawNode.ProposeConfChange(cc)
  56. }
  57. // TestRawNodeStep ensures that RawNode.Step ignore local message.
  58. func TestRawNodeStep(t *testing.T) {
  59. for i, msgn := range raftpb.MessageType_name {
  60. s := NewMemoryStorage()
  61. rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, s), []Peer{{ID: 1}})
  62. if err != nil {
  63. t.Fatal(err)
  64. }
  65. msgt := raftpb.MessageType(i)
  66. err = rawNode.Step(raftpb.Message{Type: msgt})
  67. // LocalMsg should be ignored.
  68. if IsLocalMsg(msgt) {
  69. if err != ErrStepLocalMsg {
  70. t.Errorf("%d: step should ignore %s", msgt, msgn)
  71. }
  72. }
  73. }
  74. }
  75. // TestNodeStepUnblock from node_test.go has no equivalent in rawNode because there is
  76. // no goroutine in RawNode.
  77. // TestRawNodeProposeAndConfChange ensures that RawNode.Propose and RawNode.ProposeConfChange
  78. // send the given proposal and ConfChange to the underlying raft.
  79. func TestRawNodeProposeAndConfChange(t *testing.T) {
  80. s := NewMemoryStorage()
  81. var err error
  82. rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, s), []Peer{{ID: 1}})
  83. if err != nil {
  84. t.Fatal(err)
  85. }
  86. rd := rawNode.Ready()
  87. s.Append(rd.Entries)
  88. rawNode.Advance(rd)
  89. if d := rawNode.Ready(); d.MustSync || !IsEmptyHardState(d.HardState) || len(d.Entries) > 0 {
  90. t.Fatalf("expected empty hard state with must-sync=false: %#v", d)
  91. }
  92. rawNode.Campaign()
  93. proposed := false
  94. var (
  95. lastIndex uint64
  96. ccdata []byte
  97. )
  98. for {
  99. rd = rawNode.Ready()
  100. s.Append(rd.Entries)
  101. // Once we are the leader, propose a command and a ConfChange.
  102. if !proposed && rd.SoftState.Lead == rawNode.raft.id {
  103. rawNode.Propose([]byte("somedata"))
  104. cc := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1}
  105. ccdata, err = cc.Marshal()
  106. if err != nil {
  107. t.Fatal(err)
  108. }
  109. rawNode.ProposeConfChange(cc)
  110. proposed = true
  111. }
  112. rawNode.Advance(rd)
  113. // Exit when we have four entries: one ConfChange, one no-op for the election,
  114. // our proposed command and proposed ConfChange.
  115. lastIndex, err = s.LastIndex()
  116. if err != nil {
  117. t.Fatal(err)
  118. }
  119. if lastIndex >= 4 {
  120. break
  121. }
  122. }
  123. entries, err := s.Entries(lastIndex-1, lastIndex+1, noLimit)
  124. if err != nil {
  125. t.Fatal(err)
  126. }
  127. if len(entries) != 2 {
  128. t.Fatalf("len(entries) = %d, want %d", len(entries), 2)
  129. }
  130. if !bytes.Equal(entries[0].Data, []byte("somedata")) {
  131. t.Errorf("entries[0].Data = %v, want %v", entries[0].Data, []byte("somedata"))
  132. }
  133. if entries[1].Type != raftpb.EntryConfChange {
  134. t.Fatalf("type = %v, want %v", entries[1].Type, raftpb.EntryConfChange)
  135. }
  136. if !bytes.Equal(entries[1].Data, ccdata) {
  137. t.Errorf("data = %v, want %v", entries[1].Data, ccdata)
  138. }
  139. }
  140. // TestRawNodeProposeAddDuplicateNode ensures that two proposes to add the same node should
  141. // not affect the later propose to add new node.
  142. func TestRawNodeProposeAddDuplicateNode(t *testing.T) {
  143. s := NewMemoryStorage()
  144. rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, s), []Peer{{ID: 1}})
  145. if err != nil {
  146. t.Fatal(err)
  147. }
  148. rd := rawNode.Ready()
  149. s.Append(rd.Entries)
  150. rawNode.Advance(rd)
  151. rawNode.Campaign()
  152. for {
  153. rd = rawNode.Ready()
  154. s.Append(rd.Entries)
  155. if rd.SoftState.Lead == rawNode.raft.id {
  156. rawNode.Advance(rd)
  157. break
  158. }
  159. rawNode.Advance(rd)
  160. }
  161. proposeConfChangeAndApply := func(cc raftpb.ConfChange) {
  162. rawNode.ProposeConfChange(cc)
  163. rd = rawNode.Ready()
  164. s.Append(rd.Entries)
  165. for _, entry := range rd.CommittedEntries {
  166. if entry.Type == raftpb.EntryConfChange {
  167. var cc raftpb.ConfChange
  168. cc.Unmarshal(entry.Data)
  169. rawNode.ApplyConfChange(cc)
  170. }
  171. }
  172. rawNode.Advance(rd)
  173. }
  174. cc1 := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1}
  175. ccdata1, err := cc1.Marshal()
  176. if err != nil {
  177. t.Fatal(err)
  178. }
  179. proposeConfChangeAndApply(cc1)
  180. // try to add the same node again
  181. proposeConfChangeAndApply(cc1)
  182. // the new node join should be ok
  183. cc2 := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 2}
  184. ccdata2, err := cc2.Marshal()
  185. if err != nil {
  186. t.Fatal(err)
  187. }
  188. proposeConfChangeAndApply(cc2)
  189. lastIndex, err := s.LastIndex()
  190. if err != nil {
  191. t.Fatal(err)
  192. }
  193. // the last three entries should be: ConfChange cc1, cc1, cc2
  194. entries, err := s.Entries(lastIndex-2, lastIndex+1, noLimit)
  195. if err != nil {
  196. t.Fatal(err)
  197. }
  198. if len(entries) != 3 {
  199. t.Fatalf("len(entries) = %d, want %d", len(entries), 3)
  200. }
  201. if !bytes.Equal(entries[0].Data, ccdata1) {
  202. t.Errorf("entries[0].Data = %v, want %v", entries[0].Data, ccdata1)
  203. }
  204. if !bytes.Equal(entries[2].Data, ccdata2) {
  205. t.Errorf("entries[2].Data = %v, want %v", entries[2].Data, ccdata2)
  206. }
  207. }
  208. // TestRawNodeReadIndex ensures that Rawnode.ReadIndex sends the MsgReadIndex message
  209. // to the underlying raft. It also ensures that ReadState can be read out.
  210. func TestRawNodeReadIndex(t *testing.T) {
  211. msgs := []raftpb.Message{}
  212. appendStep := func(r *raft, m raftpb.Message) error {
  213. msgs = append(msgs, m)
  214. return nil
  215. }
  216. wrs := []ReadState{{Index: uint64(1), RequestCtx: []byte("somedata")}}
  217. s := NewMemoryStorage()
  218. c := newTestConfig(1, nil, 10, 1, s)
  219. rawNode, err := NewRawNode(c, []Peer{{ID: 1}})
  220. if err != nil {
  221. t.Fatal(err)
  222. }
  223. rawNode.raft.readStates = wrs
  224. // ensure the ReadStates can be read out
  225. hasReady := rawNode.HasReady()
  226. if !hasReady {
  227. t.Errorf("HasReady() returns %t, want %t", hasReady, true)
  228. }
  229. rd := rawNode.Ready()
  230. if !reflect.DeepEqual(rd.ReadStates, wrs) {
  231. t.Errorf("ReadStates = %d, want %d", rd.ReadStates, wrs)
  232. }
  233. s.Append(rd.Entries)
  234. rawNode.Advance(rd)
  235. // ensure raft.readStates is reset after advance
  236. if rawNode.raft.readStates != nil {
  237. t.Errorf("readStates = %v, want %v", rawNode.raft.readStates, nil)
  238. }
  239. wrequestCtx := []byte("somedata2")
  240. rawNode.Campaign()
  241. for {
  242. rd = rawNode.Ready()
  243. s.Append(rd.Entries)
  244. if rd.SoftState.Lead == rawNode.raft.id {
  245. rawNode.Advance(rd)
  246. // Once we are the leader, issue a ReadIndex request
  247. rawNode.raft.step = appendStep
  248. rawNode.ReadIndex(wrequestCtx)
  249. break
  250. }
  251. rawNode.Advance(rd)
  252. }
  253. // ensure that MsgReadIndex message is sent to the underlying raft
  254. if len(msgs) != 1 {
  255. t.Fatalf("len(msgs) = %d, want %d", len(msgs), 1)
  256. }
  257. if msgs[0].Type != raftpb.MsgReadIndex {
  258. t.Errorf("msg type = %d, want %d", msgs[0].Type, raftpb.MsgReadIndex)
  259. }
  260. if !bytes.Equal(msgs[0].Entries[0].Data, wrequestCtx) {
  261. t.Errorf("data = %v, want %v", msgs[0].Entries[0].Data, wrequestCtx)
  262. }
  263. }
  264. // TestBlockProposal from node_test.go has no equivalent in rawNode because there is
  265. // no leader check in RawNode.
  266. // TestNodeTick from node_test.go has no equivalent in rawNode because
  267. // it reaches into the raft object which is not exposed.
  268. // TestNodeStop from node_test.go has no equivalent in rawNode because there is
  269. // no goroutine in RawNode.
  270. // TestRawNodeStart ensures that a node can be started correctly. The node should
  271. // start with correct configuration change entries, and can accept and commit
  272. // proposals.
  273. func TestRawNodeStart(t *testing.T) {
  274. cc := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1}
  275. ccdata, err := cc.Marshal()
  276. if err != nil {
  277. t.Fatalf("unexpected marshal error: %v", err)
  278. }
  279. wants := []Ready{
  280. {
  281. HardState: raftpb.HardState{Term: 1, Commit: 1, Vote: 0},
  282. Entries: []raftpb.Entry{
  283. {Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata},
  284. },
  285. CommittedEntries: []raftpb.Entry{
  286. {Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata},
  287. },
  288. MustSync: true,
  289. },
  290. {
  291. HardState: raftpb.HardState{Term: 2, Commit: 3, Vote: 1},
  292. Entries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
  293. CommittedEntries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
  294. MustSync: true,
  295. },
  296. }
  297. storage := NewMemoryStorage()
  298. rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, storage), []Peer{{ID: 1}})
  299. if err != nil {
  300. t.Fatal(err)
  301. }
  302. rd := rawNode.Ready()
  303. t.Logf("rd %v", rd)
  304. if !reflect.DeepEqual(rd, wants[0]) {
  305. t.Fatalf("#%d: g = %+v,\n w %+v", 1, rd, wants[0])
  306. } else {
  307. storage.Append(rd.Entries)
  308. rawNode.Advance(rd)
  309. }
  310. storage.Append(rd.Entries)
  311. rawNode.Advance(rd)
  312. rawNode.Campaign()
  313. rd = rawNode.Ready()
  314. storage.Append(rd.Entries)
  315. rawNode.Advance(rd)
  316. rawNode.Propose([]byte("foo"))
  317. if rd = rawNode.Ready(); !reflect.DeepEqual(rd, wants[1]) {
  318. t.Errorf("#%d: g = %+v,\n w %+v", 2, rd, wants[1])
  319. } else {
  320. storage.Append(rd.Entries)
  321. rawNode.Advance(rd)
  322. }
  323. if rawNode.HasReady() {
  324. t.Errorf("unexpected Ready: %+v", rawNode.Ready())
  325. }
  326. }
  327. func TestRawNodeRestart(t *testing.T) {
  328. entries := []raftpb.Entry{
  329. {Term: 1, Index: 1},
  330. {Term: 1, Index: 2, Data: []byte("foo")},
  331. }
  332. st := raftpb.HardState{Term: 1, Commit: 1}
  333. want := Ready{
  334. HardState: emptyState,
  335. // commit up to commit index in st
  336. CommittedEntries: entries[:st.Commit],
  337. MustSync: false,
  338. }
  339. storage := NewMemoryStorage()
  340. storage.SetHardState(st)
  341. storage.Append(entries)
  342. rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, storage), nil)
  343. if err != nil {
  344. t.Fatal(err)
  345. }
  346. rd := rawNode.Ready()
  347. if !reflect.DeepEqual(rd, want) {
  348. t.Errorf("g = %+v,\n w %+v", rd, want)
  349. }
  350. rawNode.Advance(rd)
  351. if rawNode.HasReady() {
  352. t.Errorf("unexpected Ready: %+v", rawNode.Ready())
  353. }
  354. }
  355. func TestRawNodeRestartFromSnapshot(t *testing.T) {
  356. snap := raftpb.Snapshot{
  357. Metadata: raftpb.SnapshotMetadata{
  358. ConfState: raftpb.ConfState{Nodes: []uint64{1, 2}},
  359. Index: 2,
  360. Term: 1,
  361. },
  362. }
  363. entries := []raftpb.Entry{
  364. {Term: 1, Index: 3, Data: []byte("foo")},
  365. }
  366. st := raftpb.HardState{Term: 1, Commit: 3}
  367. want := Ready{
  368. HardState: emptyState,
  369. // commit up to commit index in st
  370. CommittedEntries: entries,
  371. MustSync: false,
  372. }
  373. s := NewMemoryStorage()
  374. s.SetHardState(st)
  375. s.ApplySnapshot(snap)
  376. s.Append(entries)
  377. rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, s), nil)
  378. if err != nil {
  379. t.Fatal(err)
  380. }
  381. if rd := rawNode.Ready(); !reflect.DeepEqual(rd, want) {
  382. t.Errorf("g = %+v,\n w %+v", rd, want)
  383. } else {
  384. rawNode.Advance(rd)
  385. }
  386. if rawNode.HasReady() {
  387. t.Errorf("unexpected Ready: %+v", rawNode.HasReady())
  388. }
  389. }
  390. // TestNodeAdvance from node_test.go has no equivalent in rawNode because there is
  391. // no dependency check between Ready() and Advance()
  392. func TestRawNodeStatus(t *testing.T) {
  393. s := NewMemoryStorage()
  394. rn, err := NewRawNode(newTestConfig(1, []uint64{1}, 10, 1, s), nil)
  395. if err != nil {
  396. t.Fatal(err)
  397. }
  398. if status := rn.Status(); status.Progress != nil {
  399. t.Fatalf("expected no Progress because not leader: %+v", status.Progress)
  400. }
  401. if err := rn.Campaign(); err != nil {
  402. t.Fatal(err)
  403. }
  404. status := rn.Status()
  405. if status.Lead != 1 {
  406. t.Fatal("not lead")
  407. }
  408. if status.RaftState != StateLeader {
  409. t.Fatal("not leader")
  410. }
  411. if exp, act := *rn.raft.prs.Progress[1], status.Progress[1]; !reflect.DeepEqual(exp, act) {
  412. t.Fatalf("want: %+v\ngot: %+v", exp, act)
  413. }
  414. expCfg := tracker.Config{Voters: quorum.JointConfig{
  415. quorum.MajorityConfig{1: {}},
  416. nil,
  417. }}
  418. if !reflect.DeepEqual(expCfg, status.Config) {
  419. t.Fatalf("want: %+v\ngot: %+v", expCfg, status.Config)
  420. }
  421. }
  422. // TestRawNodeCommitPaginationAfterRestart is the RawNode version of
  423. // TestNodeCommitPaginationAfterRestart. The anomaly here was even worse as the
  424. // Raft group would forget to apply entries:
  425. //
  426. // - node learns that index 11 is committed
  427. // - nextEnts returns index 1..10 in CommittedEntries (but index 10 already
  428. // exceeds maxBytes), which isn't noticed internally by Raft
  429. // - Commit index gets bumped to 10
  430. // - the node persists the HardState, but crashes before applying the entries
  431. // - upon restart, the storage returns the same entries, but `slice` takes a
  432. // different code path and removes the last entry.
  433. // - Raft does not emit a HardState, but when the app calls Advance(), it bumps
  434. // its internal applied index cursor to 10 (when it should be 9)
  435. // - the next Ready asks the app to apply index 11 (omitting index 10), losing a
  436. // write.
  437. func TestRawNodeCommitPaginationAfterRestart(t *testing.T) {
  438. s := &ignoreSizeHintMemStorage{
  439. MemoryStorage: NewMemoryStorage(),
  440. }
  441. persistedHardState := raftpb.HardState{
  442. Term: 1,
  443. Vote: 1,
  444. Commit: 10,
  445. }
  446. s.hardState = persistedHardState
  447. s.ents = make([]raftpb.Entry, 10)
  448. var size uint64
  449. for i := range s.ents {
  450. ent := raftpb.Entry{
  451. Term: 1,
  452. Index: uint64(i + 1),
  453. Type: raftpb.EntryNormal,
  454. Data: []byte("a"),
  455. }
  456. s.ents[i] = ent
  457. size += uint64(ent.Size())
  458. }
  459. cfg := newTestConfig(1, []uint64{1}, 10, 1, s)
  460. // Set a MaxSizePerMsg that would suggest to Raft that the last committed entry should
  461. // not be included in the initial rd.CommittedEntries. However, our storage will ignore
  462. // this and *will* return it (which is how the Commit index ended up being 10 initially).
  463. cfg.MaxSizePerMsg = size - uint64(s.ents[len(s.ents)-1].Size()) - 1
  464. s.ents = append(s.ents, raftpb.Entry{
  465. Term: 1,
  466. Index: uint64(11),
  467. Type: raftpb.EntryNormal,
  468. Data: []byte("boom"),
  469. })
  470. rawNode, err := NewRawNode(cfg, []Peer{{ID: 1}})
  471. if err != nil {
  472. t.Fatal(err)
  473. }
  474. for highestApplied := uint64(0); highestApplied != 11; {
  475. rd := rawNode.Ready()
  476. n := len(rd.CommittedEntries)
  477. if n == 0 {
  478. t.Fatalf("stopped applying entries at index %d", highestApplied)
  479. }
  480. if next := rd.CommittedEntries[0].Index; highestApplied != 0 && highestApplied+1 != next {
  481. t.Fatalf("attempting to apply index %d after index %d, leaving a gap", next, highestApplied)
  482. }
  483. highestApplied = rd.CommittedEntries[n-1].Index
  484. rawNode.Advance(rd)
  485. rawNode.Step(raftpb.Message{
  486. Type: raftpb.MsgHeartbeat,
  487. To: 1,
  488. From: 1, // illegal, but we get away with it
  489. Term: 1,
  490. Commit: 11,
  491. })
  492. }
  493. }
  494. // TestRawNodeBoundedLogGrowthWithPartition tests a scenario where a leader is
  495. // partitioned from a quorum of nodes. It verifies that the leader's log is
  496. // protected from unbounded growth even as new entries continue to be proposed.
  497. // This protection is provided by the MaxUncommittedEntriesSize configuration.
  498. func TestRawNodeBoundedLogGrowthWithPartition(t *testing.T) {
  499. const maxEntries = 16
  500. data := []byte("testdata")
  501. testEntry := raftpb.Entry{Data: data}
  502. maxEntrySize := uint64(maxEntries * PayloadSize(testEntry))
  503. s := NewMemoryStorage()
  504. cfg := newTestConfig(1, []uint64{1}, 10, 1, s)
  505. cfg.MaxUncommittedEntriesSize = maxEntrySize
  506. rawNode, err := NewRawNode(cfg, []Peer{{ID: 1}})
  507. if err != nil {
  508. t.Fatal(err)
  509. }
  510. rd := rawNode.Ready()
  511. s.Append(rd.Entries)
  512. rawNode.Advance(rd)
  513. // Become the leader.
  514. rawNode.Campaign()
  515. for {
  516. rd = rawNode.Ready()
  517. s.Append(rd.Entries)
  518. if rd.SoftState.Lead == rawNode.raft.id {
  519. rawNode.Advance(rd)
  520. break
  521. }
  522. rawNode.Advance(rd)
  523. }
  524. // Simulate a network partition while we make our proposals by never
  525. // committing anything. These proposals should not cause the leader's
  526. // log to grow indefinitely.
  527. for i := 0; i < 1024; i++ {
  528. rawNode.Propose(data)
  529. }
  530. // Check the size of leader's uncommitted log tail. It should not exceed the
  531. // MaxUncommittedEntriesSize limit.
  532. checkUncommitted := func(exp uint64) {
  533. t.Helper()
  534. if a := rawNode.raft.uncommittedSize; exp != a {
  535. t.Fatalf("expected %d uncommitted entry bytes, found %d", exp, a)
  536. }
  537. }
  538. checkUncommitted(maxEntrySize)
  539. // Recover from the partition. The uncommitted tail of the Raft log should
  540. // disappear as entries are committed.
  541. rd = rawNode.Ready()
  542. if len(rd.CommittedEntries) != maxEntries {
  543. t.Fatalf("expected %d entries, got %d", maxEntries, len(rd.CommittedEntries))
  544. }
  545. s.Append(rd.Entries)
  546. rawNode.Advance(rd)
  547. checkUncommitted(0)
  548. }
  549. func BenchmarkStatusProgress(b *testing.B) {
  550. setup := func(members int) *RawNode {
  551. peers := make([]uint64, members)
  552. for i := range peers {
  553. peers[i] = uint64(i + 1)
  554. }
  555. cfg := newTestConfig(1, peers, 3, 1, NewMemoryStorage())
  556. cfg.Logger = discardLogger
  557. r := newRaft(cfg)
  558. r.becomeFollower(1, 1)
  559. r.becomeCandidate()
  560. r.becomeLeader()
  561. return &RawNode{raft: r}
  562. }
  563. for _, members := range []int{1, 3, 5, 100} {
  564. b.Run(fmt.Sprintf("members=%d", members), func(b *testing.B) {
  565. // NB: call getStatus through rn.Status because that incurs an additional
  566. // allocation.
  567. rn := setup(members)
  568. b.Run("Status", func(b *testing.B) {
  569. b.ReportAllocs()
  570. for i := 0; i < b.N; i++ {
  571. _ = rn.Status()
  572. }
  573. })
  574. b.Run("Status-example", func(b *testing.B) {
  575. b.ReportAllocs()
  576. for i := 0; i < b.N; i++ {
  577. s := rn.Status()
  578. var n uint64
  579. for _, pr := range s.Progress {
  580. n += pr.Match
  581. }
  582. _ = n
  583. }
  584. })
  585. b.Run("StatusWithoutProgress", func(b *testing.B) {
  586. b.ReportAllocs()
  587. for i := 0; i < b.N; i++ {
  588. _ = rn.StatusWithoutProgress()
  589. }
  590. })
  591. b.Run("WithProgress", func(b *testing.B) {
  592. b.ReportAllocs()
  593. visit := func(uint64, ProgressType, tracker.Progress) {}
  594. for i := 0; i < b.N; i++ {
  595. rn.WithProgress(visit)
  596. }
  597. })
  598. b.Run("WithProgress-example", func(b *testing.B) {
  599. b.ReportAllocs()
  600. for i := 0; i < b.N; i++ {
  601. var n uint64
  602. visit := func(_ uint64, _ ProgressType, pr tracker.Progress) {
  603. n += pr.Match
  604. }
  605. rn.WithProgress(visit)
  606. _ = n
  607. }
  608. })
  609. })
  610. }
  611. }