rawnode_test.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673
  1. // Copyright 2015 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package raft
  15. import (
  16. "bytes"
  17. "context"
  18. "fmt"
  19. "reflect"
  20. "testing"
  21. "go.etcd.io/etcd/raft/raftpb"
  22. "go.etcd.io/etcd/raft/tracker"
  23. )
  24. // rawNodeAdapter is essentially a lint that makes sure that RawNode implements
  25. // "most of" Node. The exceptions (some of which are easy to fix) are listed
  26. // below.
  27. type rawNodeAdapter struct {
  28. *RawNode
  29. }
  30. var _ Node = (*rawNodeAdapter)(nil)
  31. // Node specifies lead, which is pointless, can just be filled in.
  32. func (a *rawNodeAdapter) TransferLeadership(ctx context.Context, lead, transferee uint64) {
  33. a.RawNode.TransferLeader(transferee)
  34. }
  35. // Node has a goroutine, RawNode doesn't need this.
  36. func (a *rawNodeAdapter) Stop() {}
  37. // RawNode returns a *Status.
  38. func (a *rawNodeAdapter) Status() Status { return *a.RawNode.Status() }
  39. // RawNode takes a Ready. It doesn't really have to do that I think? It can hold on
  40. // to it internally. But maybe that approach is frail.
  41. func (a *rawNodeAdapter) Advance() { a.RawNode.Advance(Ready{}) }
  42. // RawNode returns a Ready, not a chan of one.
  43. func (a *rawNodeAdapter) Ready() <-chan Ready { return nil }
  44. // Node takes more contexts. Easy enough to fix.
  45. func (a *rawNodeAdapter) Campaign(context.Context) error { return a.RawNode.Campaign() }
  46. func (a *rawNodeAdapter) ReadIndex(_ context.Context, rctx []byte) error {
  47. a.RawNode.ReadIndex(rctx)
  48. // RawNode swallowed the error in ReadIndex, it probably should not do that.
  49. return nil
  50. }
  51. func (a *rawNodeAdapter) Step(_ context.Context, m raftpb.Message) error { return a.RawNode.Step(m) }
  52. func (a *rawNodeAdapter) Propose(_ context.Context, data []byte) error { return a.RawNode.Propose(data) }
  53. func (a *rawNodeAdapter) ProposeConfChange(_ context.Context, cc raftpb.ConfChange) error {
  54. return a.RawNode.ProposeConfChange(cc)
  55. }
  56. // TestRawNodeStep ensures that RawNode.Step ignore local message.
  57. func TestRawNodeStep(t *testing.T) {
  58. for i, msgn := range raftpb.MessageType_name {
  59. s := NewMemoryStorage()
  60. rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, s), []Peer{{ID: 1}})
  61. if err != nil {
  62. t.Fatal(err)
  63. }
  64. msgt := raftpb.MessageType(i)
  65. err = rawNode.Step(raftpb.Message{Type: msgt})
  66. // LocalMsg should be ignored.
  67. if IsLocalMsg(msgt) {
  68. if err != ErrStepLocalMsg {
  69. t.Errorf("%d: step should ignore %s", msgt, msgn)
  70. }
  71. }
  72. }
  73. }
  74. // TestNodeStepUnblock from node_test.go has no equivalent in rawNode because there is
  75. // no goroutine in RawNode.
  76. // TestRawNodeProposeAndConfChange ensures that RawNode.Propose and RawNode.ProposeConfChange
  77. // send the given proposal and ConfChange to the underlying raft.
  78. func TestRawNodeProposeAndConfChange(t *testing.T) {
  79. s := NewMemoryStorage()
  80. var err error
  81. rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, s), []Peer{{ID: 1}})
  82. if err != nil {
  83. t.Fatal(err)
  84. }
  85. rd := rawNode.Ready()
  86. s.Append(rd.Entries)
  87. rawNode.Advance(rd)
  88. if d := rawNode.Ready(); d.MustSync || !IsEmptyHardState(d.HardState) || len(d.Entries) > 0 {
  89. t.Fatalf("expected empty hard state with must-sync=false: %#v", d)
  90. }
  91. rawNode.Campaign()
  92. proposed := false
  93. var (
  94. lastIndex uint64
  95. ccdata []byte
  96. )
  97. for {
  98. rd = rawNode.Ready()
  99. s.Append(rd.Entries)
  100. // Once we are the leader, propose a command and a ConfChange.
  101. if !proposed && rd.SoftState.Lead == rawNode.raft.id {
  102. rawNode.Propose([]byte("somedata"))
  103. cc := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1}
  104. ccdata, err = cc.Marshal()
  105. if err != nil {
  106. t.Fatal(err)
  107. }
  108. rawNode.ProposeConfChange(cc)
  109. proposed = true
  110. }
  111. rawNode.Advance(rd)
  112. // Exit when we have four entries: one ConfChange, one no-op for the election,
  113. // our proposed command and proposed ConfChange.
  114. lastIndex, err = s.LastIndex()
  115. if err != nil {
  116. t.Fatal(err)
  117. }
  118. if lastIndex >= 4 {
  119. break
  120. }
  121. }
  122. entries, err := s.Entries(lastIndex-1, lastIndex+1, noLimit)
  123. if err != nil {
  124. t.Fatal(err)
  125. }
  126. if len(entries) != 2 {
  127. t.Fatalf("len(entries) = %d, want %d", len(entries), 2)
  128. }
  129. if !bytes.Equal(entries[0].Data, []byte("somedata")) {
  130. t.Errorf("entries[0].Data = %v, want %v", entries[0].Data, []byte("somedata"))
  131. }
  132. if entries[1].Type != raftpb.EntryConfChange {
  133. t.Fatalf("type = %v, want %v", entries[1].Type, raftpb.EntryConfChange)
  134. }
  135. if !bytes.Equal(entries[1].Data, ccdata) {
  136. t.Errorf("data = %v, want %v", entries[1].Data, ccdata)
  137. }
  138. }
  139. // TestRawNodeProposeAddDuplicateNode ensures that two proposes to add the same node should
  140. // not affect the later propose to add new node.
  141. func TestRawNodeProposeAddDuplicateNode(t *testing.T) {
  142. s := NewMemoryStorage()
  143. rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, s), []Peer{{ID: 1}})
  144. if err != nil {
  145. t.Fatal(err)
  146. }
  147. rd := rawNode.Ready()
  148. s.Append(rd.Entries)
  149. rawNode.Advance(rd)
  150. rawNode.Campaign()
  151. for {
  152. rd = rawNode.Ready()
  153. s.Append(rd.Entries)
  154. if rd.SoftState.Lead == rawNode.raft.id {
  155. rawNode.Advance(rd)
  156. break
  157. }
  158. rawNode.Advance(rd)
  159. }
  160. proposeConfChangeAndApply := func(cc raftpb.ConfChange) {
  161. rawNode.ProposeConfChange(cc)
  162. rd = rawNode.Ready()
  163. s.Append(rd.Entries)
  164. for _, entry := range rd.CommittedEntries {
  165. if entry.Type == raftpb.EntryConfChange {
  166. var cc raftpb.ConfChange
  167. cc.Unmarshal(entry.Data)
  168. rawNode.ApplyConfChange(cc)
  169. }
  170. }
  171. rawNode.Advance(rd)
  172. }
  173. cc1 := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1}
  174. ccdata1, err := cc1.Marshal()
  175. if err != nil {
  176. t.Fatal(err)
  177. }
  178. proposeConfChangeAndApply(cc1)
  179. // try to add the same node again
  180. proposeConfChangeAndApply(cc1)
  181. // the new node join should be ok
  182. cc2 := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 2}
  183. ccdata2, err := cc2.Marshal()
  184. if err != nil {
  185. t.Fatal(err)
  186. }
  187. proposeConfChangeAndApply(cc2)
  188. lastIndex, err := s.LastIndex()
  189. if err != nil {
  190. t.Fatal(err)
  191. }
  192. // the last three entries should be: ConfChange cc1, cc1, cc2
  193. entries, err := s.Entries(lastIndex-2, lastIndex+1, noLimit)
  194. if err != nil {
  195. t.Fatal(err)
  196. }
  197. if len(entries) != 3 {
  198. t.Fatalf("len(entries) = %d, want %d", len(entries), 3)
  199. }
  200. if !bytes.Equal(entries[0].Data, ccdata1) {
  201. t.Errorf("entries[0].Data = %v, want %v", entries[0].Data, ccdata1)
  202. }
  203. if !bytes.Equal(entries[2].Data, ccdata2) {
  204. t.Errorf("entries[2].Data = %v, want %v", entries[2].Data, ccdata2)
  205. }
  206. }
  207. // TestRawNodeReadIndex ensures that Rawnode.ReadIndex sends the MsgReadIndex message
  208. // to the underlying raft. It also ensures that ReadState can be read out.
  209. func TestRawNodeReadIndex(t *testing.T) {
  210. msgs := []raftpb.Message{}
  211. appendStep := func(r *raft, m raftpb.Message) error {
  212. msgs = append(msgs, m)
  213. return nil
  214. }
  215. wrs := []ReadState{{Index: uint64(1), RequestCtx: []byte("somedata")}}
  216. s := NewMemoryStorage()
  217. c := newTestConfig(1, nil, 10, 1, s)
  218. rawNode, err := NewRawNode(c, []Peer{{ID: 1}})
  219. if err != nil {
  220. t.Fatal(err)
  221. }
  222. rawNode.raft.readStates = wrs
  223. // ensure the ReadStates can be read out
  224. hasReady := rawNode.HasReady()
  225. if !hasReady {
  226. t.Errorf("HasReady() returns %t, want %t", hasReady, true)
  227. }
  228. rd := rawNode.Ready()
  229. if !reflect.DeepEqual(rd.ReadStates, wrs) {
  230. t.Errorf("ReadStates = %d, want %d", rd.ReadStates, wrs)
  231. }
  232. s.Append(rd.Entries)
  233. rawNode.Advance(rd)
  234. // ensure raft.readStates is reset after advance
  235. if rawNode.raft.readStates != nil {
  236. t.Errorf("readStates = %v, want %v", rawNode.raft.readStates, nil)
  237. }
  238. wrequestCtx := []byte("somedata2")
  239. rawNode.Campaign()
  240. for {
  241. rd = rawNode.Ready()
  242. s.Append(rd.Entries)
  243. if rd.SoftState.Lead == rawNode.raft.id {
  244. rawNode.Advance(rd)
  245. // Once we are the leader, issue a ReadIndex request
  246. rawNode.raft.step = appendStep
  247. rawNode.ReadIndex(wrequestCtx)
  248. break
  249. }
  250. rawNode.Advance(rd)
  251. }
  252. // ensure that MsgReadIndex message is sent to the underlying raft
  253. if len(msgs) != 1 {
  254. t.Fatalf("len(msgs) = %d, want %d", len(msgs), 1)
  255. }
  256. if msgs[0].Type != raftpb.MsgReadIndex {
  257. t.Errorf("msg type = %d, want %d", msgs[0].Type, raftpb.MsgReadIndex)
  258. }
  259. if !bytes.Equal(msgs[0].Entries[0].Data, wrequestCtx) {
  260. t.Errorf("data = %v, want %v", msgs[0].Entries[0].Data, wrequestCtx)
  261. }
  262. }
  263. // TestBlockProposal from node_test.go has no equivalent in rawNode because there is
  264. // no leader check in RawNode.
  265. // TestNodeTick from node_test.go has no equivalent in rawNode because
  266. // it reaches into the raft object which is not exposed.
  267. // TestNodeStop from node_test.go has no equivalent in rawNode because there is
  268. // no goroutine in RawNode.
  269. // TestRawNodeStart ensures that a node can be started correctly. The node should
  270. // start with correct configuration change entries, and can accept and commit
  271. // proposals.
  272. func TestRawNodeStart(t *testing.T) {
  273. cc := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1}
  274. ccdata, err := cc.Marshal()
  275. if err != nil {
  276. t.Fatalf("unexpected marshal error: %v", err)
  277. }
  278. wants := []Ready{
  279. {
  280. HardState: raftpb.HardState{Term: 1, Commit: 1, Vote: 0},
  281. Entries: []raftpb.Entry{
  282. {Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata},
  283. },
  284. CommittedEntries: []raftpb.Entry{
  285. {Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata},
  286. },
  287. MustSync: true,
  288. },
  289. {
  290. HardState: raftpb.HardState{Term: 2, Commit: 3, Vote: 1},
  291. Entries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
  292. CommittedEntries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
  293. MustSync: true,
  294. },
  295. }
  296. storage := NewMemoryStorage()
  297. rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, storage), []Peer{{ID: 1}})
  298. if err != nil {
  299. t.Fatal(err)
  300. }
  301. rd := rawNode.Ready()
  302. t.Logf("rd %v", rd)
  303. if !reflect.DeepEqual(rd, wants[0]) {
  304. t.Fatalf("#%d: g = %+v,\n w %+v", 1, rd, wants[0])
  305. } else {
  306. storage.Append(rd.Entries)
  307. rawNode.Advance(rd)
  308. }
  309. storage.Append(rd.Entries)
  310. rawNode.Advance(rd)
  311. rawNode.Campaign()
  312. rd = rawNode.Ready()
  313. storage.Append(rd.Entries)
  314. rawNode.Advance(rd)
  315. rawNode.Propose([]byte("foo"))
  316. if rd = rawNode.Ready(); !reflect.DeepEqual(rd, wants[1]) {
  317. t.Errorf("#%d: g = %+v,\n w %+v", 2, rd, wants[1])
  318. } else {
  319. storage.Append(rd.Entries)
  320. rawNode.Advance(rd)
  321. }
  322. if rawNode.HasReady() {
  323. t.Errorf("unexpected Ready: %+v", rawNode.Ready())
  324. }
  325. }
  326. func TestRawNodeRestart(t *testing.T) {
  327. entries := []raftpb.Entry{
  328. {Term: 1, Index: 1},
  329. {Term: 1, Index: 2, Data: []byte("foo")},
  330. }
  331. st := raftpb.HardState{Term: 1, Commit: 1}
  332. want := Ready{
  333. HardState: emptyState,
  334. // commit up to commit index in st
  335. CommittedEntries: entries[:st.Commit],
  336. MustSync: false,
  337. }
  338. storage := NewMemoryStorage()
  339. storage.SetHardState(st)
  340. storage.Append(entries)
  341. rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, storage), nil)
  342. if err != nil {
  343. t.Fatal(err)
  344. }
  345. rd := rawNode.Ready()
  346. if !reflect.DeepEqual(rd, want) {
  347. t.Errorf("g = %+v,\n w %+v", rd, want)
  348. }
  349. rawNode.Advance(rd)
  350. if rawNode.HasReady() {
  351. t.Errorf("unexpected Ready: %+v", rawNode.Ready())
  352. }
  353. }
  354. func TestRawNodeRestartFromSnapshot(t *testing.T) {
  355. snap := raftpb.Snapshot{
  356. Metadata: raftpb.SnapshotMetadata{
  357. ConfState: raftpb.ConfState{Nodes: []uint64{1, 2}},
  358. Index: 2,
  359. Term: 1,
  360. },
  361. }
  362. entries := []raftpb.Entry{
  363. {Term: 1, Index: 3, Data: []byte("foo")},
  364. }
  365. st := raftpb.HardState{Term: 1, Commit: 3}
  366. want := Ready{
  367. HardState: emptyState,
  368. // commit up to commit index in st
  369. CommittedEntries: entries,
  370. MustSync: false,
  371. }
  372. s := NewMemoryStorage()
  373. s.SetHardState(st)
  374. s.ApplySnapshot(snap)
  375. s.Append(entries)
  376. rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, s), nil)
  377. if err != nil {
  378. t.Fatal(err)
  379. }
  380. if rd := rawNode.Ready(); !reflect.DeepEqual(rd, want) {
  381. t.Errorf("g = %+v,\n w %+v", rd, want)
  382. } else {
  383. rawNode.Advance(rd)
  384. }
  385. if rawNode.HasReady() {
  386. t.Errorf("unexpected Ready: %+v", rawNode.HasReady())
  387. }
  388. }
  389. // TestNodeAdvance from node_test.go has no equivalent in rawNode because there is
  390. // no dependency check between Ready() and Advance()
  391. func TestRawNodeStatus(t *testing.T) {
  392. s := NewMemoryStorage()
  393. rn, err := NewRawNode(newTestConfig(1, []uint64{1}, 10, 1, s), nil)
  394. if err != nil {
  395. t.Fatal(err)
  396. }
  397. if status := rn.Status(); status.Progress != nil {
  398. t.Fatalf("expected no Progress because not leader: %+v", status.Progress)
  399. }
  400. if err := rn.Campaign(); err != nil {
  401. t.Fatal(err)
  402. }
  403. status := rn.Status()
  404. if status.Lead != 1 {
  405. t.Fatal("not lead")
  406. }
  407. if status.RaftState != StateLeader {
  408. t.Fatal("not leader")
  409. }
  410. if exp, act := *rn.raft.prs.Progress[1], status.Progress[1]; !reflect.DeepEqual(exp, act) {
  411. t.Fatalf("want: %+v\ngot: %+v", exp, act)
  412. }
  413. }
  414. // TestRawNodeCommitPaginationAfterRestart is the RawNode version of
  415. // TestNodeCommitPaginationAfterRestart. The anomaly here was even worse as the
  416. // Raft group would forget to apply entries:
  417. //
  418. // - node learns that index 11 is committed
  419. // - nextEnts returns index 1..10 in CommittedEntries (but index 10 already
  420. // exceeds maxBytes), which isn't noticed internally by Raft
  421. // - Commit index gets bumped to 10
  422. // - the node persists the HardState, but crashes before applying the entries
  423. // - upon restart, the storage returns the same entries, but `slice` takes a
  424. // different code path and removes the last entry.
  425. // - Raft does not emit a HardState, but when the app calls Advance(), it bumps
  426. // its internal applied index cursor to 10 (when it should be 9)
  427. // - the next Ready asks the app to apply index 11 (omitting index 10), losing a
  428. // write.
  429. func TestRawNodeCommitPaginationAfterRestart(t *testing.T) {
  430. s := &ignoreSizeHintMemStorage{
  431. MemoryStorage: NewMemoryStorage(),
  432. }
  433. persistedHardState := raftpb.HardState{
  434. Term: 1,
  435. Vote: 1,
  436. Commit: 10,
  437. }
  438. s.hardState = persistedHardState
  439. s.ents = make([]raftpb.Entry, 10)
  440. var size uint64
  441. for i := range s.ents {
  442. ent := raftpb.Entry{
  443. Term: 1,
  444. Index: uint64(i + 1),
  445. Type: raftpb.EntryNormal,
  446. Data: []byte("a"),
  447. }
  448. s.ents[i] = ent
  449. size += uint64(ent.Size())
  450. }
  451. cfg := newTestConfig(1, []uint64{1}, 10, 1, s)
  452. // Set a MaxSizePerMsg that would suggest to Raft that the last committed entry should
  453. // not be included in the initial rd.CommittedEntries. However, our storage will ignore
  454. // this and *will* return it (which is how the Commit index ended up being 10 initially).
  455. cfg.MaxSizePerMsg = size - uint64(s.ents[len(s.ents)-1].Size()) - 1
  456. s.ents = append(s.ents, raftpb.Entry{
  457. Term: 1,
  458. Index: uint64(11),
  459. Type: raftpb.EntryNormal,
  460. Data: []byte("boom"),
  461. })
  462. rawNode, err := NewRawNode(cfg, []Peer{{ID: 1}})
  463. if err != nil {
  464. t.Fatal(err)
  465. }
  466. for highestApplied := uint64(0); highestApplied != 11; {
  467. rd := rawNode.Ready()
  468. n := len(rd.CommittedEntries)
  469. if n == 0 {
  470. t.Fatalf("stopped applying entries at index %d", highestApplied)
  471. }
  472. if next := rd.CommittedEntries[0].Index; highestApplied != 0 && highestApplied+1 != next {
  473. t.Fatalf("attempting to apply index %d after index %d, leaving a gap", next, highestApplied)
  474. }
  475. highestApplied = rd.CommittedEntries[n-1].Index
  476. rawNode.Advance(rd)
  477. rawNode.Step(raftpb.Message{
  478. Type: raftpb.MsgHeartbeat,
  479. To: 1,
  480. From: 1, // illegal, but we get away with it
  481. Term: 1,
  482. Commit: 11,
  483. })
  484. }
  485. }
  486. // TestRawNodeBoundedLogGrowthWithPartition tests a scenario where a leader is
  487. // partitioned from a quorum of nodes. It verifies that the leader's log is
  488. // protected from unbounded growth even as new entries continue to be proposed.
  489. // This protection is provided by the MaxUncommittedEntriesSize configuration.
  490. func TestRawNodeBoundedLogGrowthWithPartition(t *testing.T) {
  491. const maxEntries = 16
  492. data := []byte("testdata")
  493. testEntry := raftpb.Entry{Data: data}
  494. maxEntrySize := uint64(maxEntries * PayloadSize(testEntry))
  495. s := NewMemoryStorage()
  496. cfg := newTestConfig(1, []uint64{1}, 10, 1, s)
  497. cfg.MaxUncommittedEntriesSize = maxEntrySize
  498. rawNode, err := NewRawNode(cfg, []Peer{{ID: 1}})
  499. if err != nil {
  500. t.Fatal(err)
  501. }
  502. rd := rawNode.Ready()
  503. s.Append(rd.Entries)
  504. rawNode.Advance(rd)
  505. // Become the leader.
  506. rawNode.Campaign()
  507. for {
  508. rd = rawNode.Ready()
  509. s.Append(rd.Entries)
  510. if rd.SoftState.Lead == rawNode.raft.id {
  511. rawNode.Advance(rd)
  512. break
  513. }
  514. rawNode.Advance(rd)
  515. }
  516. // Simulate a network partition while we make our proposals by never
  517. // committing anything. These proposals should not cause the leader's
  518. // log to grow indefinitely.
  519. for i := 0; i < 1024; i++ {
  520. rawNode.Propose(data)
  521. }
  522. // Check the size of leader's uncommitted log tail. It should not exceed the
  523. // MaxUncommittedEntriesSize limit.
  524. checkUncommitted := func(exp uint64) {
  525. t.Helper()
  526. if a := rawNode.raft.uncommittedSize; exp != a {
  527. t.Fatalf("expected %d uncommitted entry bytes, found %d", exp, a)
  528. }
  529. }
  530. checkUncommitted(maxEntrySize)
  531. // Recover from the partition. The uncommitted tail of the Raft log should
  532. // disappear as entries are committed.
  533. rd = rawNode.Ready()
  534. if len(rd.CommittedEntries) != maxEntries {
  535. t.Fatalf("expected %d entries, got %d", maxEntries, len(rd.CommittedEntries))
  536. }
  537. s.Append(rd.Entries)
  538. rawNode.Advance(rd)
  539. checkUncommitted(0)
  540. }
  541. func BenchmarkStatusProgress(b *testing.B) {
  542. setup := func(members int) *RawNode {
  543. peers := make([]uint64, members)
  544. for i := range peers {
  545. peers[i] = uint64(i + 1)
  546. }
  547. cfg := newTestConfig(1, peers, 3, 1, NewMemoryStorage())
  548. cfg.Logger = discardLogger
  549. r := newRaft(cfg)
  550. r.becomeFollower(1, 1)
  551. r.becomeCandidate()
  552. r.becomeLeader()
  553. return &RawNode{raft: r}
  554. }
  555. for _, members := range []int{1, 3, 5, 100} {
  556. b.Run(fmt.Sprintf("members=%d", members), func(b *testing.B) {
  557. // NB: call getStatus through rn.Status because that incurs an additional
  558. // allocation.
  559. rn := setup(members)
  560. b.Run("Status", func(b *testing.B) {
  561. b.ReportAllocs()
  562. for i := 0; i < b.N; i++ {
  563. _ = rn.Status()
  564. }
  565. })
  566. b.Run("Status-example", func(b *testing.B) {
  567. b.ReportAllocs()
  568. for i := 0; i < b.N; i++ {
  569. s := rn.Status()
  570. var n uint64
  571. for _, pr := range s.Progress {
  572. n += pr.Match
  573. }
  574. _ = n
  575. }
  576. })
  577. b.Run("StatusWithoutProgress", func(b *testing.B) {
  578. b.ReportAllocs()
  579. for i := 0; i < b.N; i++ {
  580. _ = rn.StatusWithoutProgress()
  581. }
  582. })
  583. b.Run("WithProgress", func(b *testing.B) {
  584. b.ReportAllocs()
  585. visit := func(uint64, ProgressType, tracker.Progress) {}
  586. for i := 0; i < b.N; i++ {
  587. rn.WithProgress(visit)
  588. }
  589. })
  590. b.Run("WithProgress-example", func(b *testing.B) {
  591. b.ReportAllocs()
  592. for i := 0; i < b.N; i++ {
  593. var n uint64
  594. visit := func(_ uint64, _ ProgressType, pr tracker.Progress) {
  595. n += pr.Match
  596. }
  597. rn.WithProgress(visit)
  598. _ = n
  599. }
  600. })
  601. })
  602. }
  603. }