raft_test.go 37 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package raft
  14. import (
  15. "bytes"
  16. "fmt"
  17. "math"
  18. "math/rand"
  19. "reflect"
  20. "sort"
  21. "testing"
  22. pb "github.com/coreos/etcd/raft/raftpb"
  23. )
  24. // nextEnts returns the appliable entries and updates the applied index
  25. func nextEnts(r *raft) (ents []pb.Entry) {
  26. ents = r.raftLog.nextEnts()
  27. r.raftLog.appliedTo(r.raftLog.committed)
  28. return ents
  29. }
  30. type Interface interface {
  31. Step(m pb.Message) error
  32. readMessages() []pb.Message
  33. }
  34. func (r *raft) readMessages() []pb.Message {
  35. msgs := r.msgs
  36. r.msgs = make([]pb.Message, 0)
  37. return msgs
  38. }
  39. func TestProgressMaybeDecr(t *testing.T) {
  40. tests := []struct {
  41. m uint64
  42. n uint64
  43. to uint64
  44. w bool
  45. wn uint64
  46. }{
  47. {
  48. // match != 0 is always false
  49. 1, 0, 0, false, 0,
  50. },
  51. {
  52. // match != 0 is always false
  53. 5, 10, 9, false, 10,
  54. },
  55. {
  56. // next-1 != to is always false
  57. 0, 0, 0, false, 0,
  58. },
  59. {
  60. // next-1 != to is always false
  61. 0, 10, 5, false, 10,
  62. },
  63. {
  64. // next>1 = decremented by 1
  65. 0, 10, 9, true, 9,
  66. },
  67. {
  68. // next>1 = decremented by 1
  69. 0, 2, 1, true, 1,
  70. },
  71. {
  72. // next<=1 = reset to 1
  73. 0, 1, 0, true, 1,
  74. },
  75. }
  76. for i, tt := range tests {
  77. p := &progress{
  78. match: tt.m,
  79. next: tt.n,
  80. }
  81. if g := p.maybeDecrTo(tt.to); g != tt.w {
  82. t.Errorf("#%d: maybeDecrTo=%t, want %t", i, g, tt.w)
  83. }
  84. if gm := p.match; gm != tt.m {
  85. t.Errorf("#%d: match=%d, want %d", i, gm, tt.m)
  86. }
  87. if gn := p.next; gn != tt.wn {
  88. t.Errorf("#%d: next=%d, want %d", i, gn, tt.wn)
  89. }
  90. }
  91. }
  92. func TestLeaderElection(t *testing.T) {
  93. tests := []struct {
  94. *network
  95. state StateType
  96. }{
  97. {newNetwork(nil, nil, nil), StateLeader},
  98. {newNetwork(nil, nil, nopStepper), StateLeader},
  99. {newNetwork(nil, nopStepper, nopStepper), StateCandidate},
  100. {newNetwork(nil, nopStepper, nopStepper, nil), StateCandidate},
  101. {newNetwork(nil, nopStepper, nopStepper, nil, nil), StateLeader},
  102. // three logs further along than 0
  103. {newNetwork(nil, ents(1), ents(2), ents(1, 3), nil), StateFollower},
  104. // logs converge
  105. {newNetwork(ents(1), nil, ents(2), ents(1), nil), StateLeader},
  106. }
  107. for i, tt := range tests {
  108. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  109. sm := tt.network.peers[1].(*raft)
  110. if sm.state != tt.state {
  111. t.Errorf("#%d: state = %s, want %s", i, sm.state, tt.state)
  112. }
  113. if g := sm.Term; g != 1 {
  114. t.Errorf("#%d: term = %d, want %d", i, g, 1)
  115. }
  116. }
  117. }
  118. func TestLogReplication(t *testing.T) {
  119. tests := []struct {
  120. *network
  121. msgs []pb.Message
  122. wcommitted uint64
  123. }{
  124. {
  125. newNetwork(nil, nil, nil),
  126. []pb.Message{
  127. {From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}},
  128. },
  129. 2,
  130. },
  131. {
  132. newNetwork(nil, nil, nil),
  133. []pb.Message{
  134. {From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}},
  135. {From: 1, To: 2, Type: pb.MsgHup},
  136. {From: 1, To: 2, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}},
  137. },
  138. 4,
  139. },
  140. }
  141. for i, tt := range tests {
  142. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  143. for _, m := range tt.msgs {
  144. tt.send(m)
  145. }
  146. for j, x := range tt.network.peers {
  147. sm := x.(*raft)
  148. if sm.raftLog.committed != tt.wcommitted {
  149. t.Errorf("#%d.%d: committed = %d, want %d", i, j, sm.raftLog.committed, tt.wcommitted)
  150. }
  151. ents := []pb.Entry{}
  152. for _, e := range nextEnts(sm) {
  153. if e.Data != nil {
  154. ents = append(ents, e)
  155. }
  156. }
  157. props := []pb.Message{}
  158. for _, m := range tt.msgs {
  159. if m.Type == pb.MsgProp {
  160. props = append(props, m)
  161. }
  162. }
  163. for k, m := range props {
  164. if !bytes.Equal(ents[k].Data, m.Entries[0].Data) {
  165. t.Errorf("#%d.%d: data = %d, want %d", i, j, ents[k].Data, m.Entries[0].Data)
  166. }
  167. }
  168. }
  169. }
  170. }
  171. func TestSingleNodeCommit(t *testing.T) {
  172. tt := newNetwork(nil)
  173. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  174. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  175. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  176. sm := tt.peers[1].(*raft)
  177. if sm.raftLog.committed != 3 {
  178. t.Errorf("committed = %d, want %d", sm.raftLog.committed, 3)
  179. }
  180. }
  181. // TestCannotCommitWithoutNewTermEntry tests the entries cannot be committed
  182. // when leader changes, no new proposal comes in and ChangeTerm proposal is
  183. // filtered.
  184. func TestCannotCommitWithoutNewTermEntry(t *testing.T) {
  185. tt := newNetwork(nil, nil, nil, nil, nil)
  186. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  187. // 0 cannot reach 2,3,4
  188. tt.cut(1, 3)
  189. tt.cut(1, 4)
  190. tt.cut(1, 5)
  191. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  192. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  193. sm := tt.peers[1].(*raft)
  194. if sm.raftLog.committed != 1 {
  195. t.Errorf("committed = %d, want %d", sm.raftLog.committed, 1)
  196. }
  197. // network recovery
  198. tt.recover()
  199. // avoid committing ChangeTerm proposal
  200. tt.ignore(pb.MsgApp)
  201. // elect 1 as the new leader with term 2
  202. tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup})
  203. // no log entries from previous term should be committed
  204. sm = tt.peers[2].(*raft)
  205. if sm.raftLog.committed != 1 {
  206. t.Errorf("committed = %d, want %d", sm.raftLog.committed, 1)
  207. }
  208. tt.recover()
  209. // still be able to append a entry
  210. tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  211. if sm.raftLog.committed != 5 {
  212. t.Errorf("committed = %d, want %d", sm.raftLog.committed, 5)
  213. }
  214. }
  215. // TestCommitWithoutNewTermEntry tests the entries could be committed
  216. // when leader changes, no new proposal comes in.
  217. func TestCommitWithoutNewTermEntry(t *testing.T) {
  218. tt := newNetwork(nil, nil, nil, nil, nil)
  219. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  220. // 0 cannot reach 2,3,4
  221. tt.cut(1, 3)
  222. tt.cut(1, 4)
  223. tt.cut(1, 5)
  224. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  225. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  226. sm := tt.peers[1].(*raft)
  227. if sm.raftLog.committed != 1 {
  228. t.Errorf("committed = %d, want %d", sm.raftLog.committed, 1)
  229. }
  230. // network recovery
  231. tt.recover()
  232. // elect 1 as the new leader with term 2
  233. // after append a ChangeTerm entry from the current term, all entries
  234. // should be committed
  235. tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup})
  236. if sm.raftLog.committed != 4 {
  237. t.Errorf("committed = %d, want %d", sm.raftLog.committed, 4)
  238. }
  239. }
  240. func TestDuelingCandidates(t *testing.T) {
  241. a := newRaft(1, []uint64{1, 2, 3}, 10, 1)
  242. b := newRaft(2, []uint64{1, 2, 3}, 10, 1)
  243. c := newRaft(3, []uint64{1, 2, 3}, 10, 1)
  244. nt := newNetwork(a, b, c)
  245. nt.cut(1, 3)
  246. nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  247. nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
  248. nt.recover()
  249. nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
  250. wlog := &raftLog{ents: []pb.Entry{{}, pb.Entry{Data: nil, Term: 1, Index: 1}}, committed: 1}
  251. tests := []struct {
  252. sm *raft
  253. state StateType
  254. term uint64
  255. raftLog *raftLog
  256. }{
  257. {a, StateFollower, 2, wlog},
  258. {b, StateFollower, 2, wlog},
  259. {c, StateFollower, 2, newLog()},
  260. }
  261. for i, tt := range tests {
  262. if g := tt.sm.state; g != tt.state {
  263. t.Errorf("#%d: state = %s, want %s", i, g, tt.state)
  264. }
  265. if g := tt.sm.Term; g != tt.term {
  266. t.Errorf("#%d: term = %d, want %d", i, g, tt.term)
  267. }
  268. base := ltoa(tt.raftLog)
  269. if sm, ok := nt.peers[1+uint64(i)].(*raft); ok {
  270. l := ltoa(sm.raftLog)
  271. if g := diffu(base, l); g != "" {
  272. t.Errorf("#%d: diff:\n%s", i, g)
  273. }
  274. } else {
  275. t.Logf("#%d: empty log", i)
  276. }
  277. }
  278. }
  279. func TestCandidateConcede(t *testing.T) {
  280. tt := newNetwork(nil, nil, nil)
  281. tt.isolate(1)
  282. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  283. tt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
  284. // heal the partition
  285. tt.recover()
  286. data := []byte("force follower")
  287. // send a proposal to 2 to flush out a MsgApp to 0
  288. tt.send(pb.Message{From: 3, To: 3, Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
  289. a := tt.peers[1].(*raft)
  290. if g := a.state; g != StateFollower {
  291. t.Errorf("state = %s, want %s", g, StateFollower)
  292. }
  293. if g := a.Term; g != 1 {
  294. t.Errorf("term = %d, want %d", g, 1)
  295. }
  296. wantLog := ltoa(&raftLog{ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}}, committed: 2})
  297. for i, p := range tt.peers {
  298. if sm, ok := p.(*raft); ok {
  299. l := ltoa(sm.raftLog)
  300. if g := diffu(wantLog, l); g != "" {
  301. t.Errorf("#%d: diff:\n%s", i, g)
  302. }
  303. } else {
  304. t.Logf("#%d: empty log", i)
  305. }
  306. }
  307. }
  308. func TestSingleNodeCandidate(t *testing.T) {
  309. tt := newNetwork(nil)
  310. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  311. sm := tt.peers[1].(*raft)
  312. if sm.state != StateLeader {
  313. t.Errorf("state = %d, want %d", sm.state, StateLeader)
  314. }
  315. }
  316. func TestOldMessages(t *testing.T) {
  317. tt := newNetwork(nil, nil, nil)
  318. // make 0 leader @ term 3
  319. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  320. tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup})
  321. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  322. // pretend we're an old leader trying to make progress
  323. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgApp, Term: 1, Entries: []pb.Entry{{Term: 1}}})
  324. l := &raftLog{
  325. ents: []pb.Entry{
  326. {}, {Data: nil, Term: 1, Index: 1},
  327. {Data: nil, Term: 2, Index: 2}, {Data: nil, Term: 3, Index: 3},
  328. },
  329. committed: 3,
  330. }
  331. base := ltoa(l)
  332. for i, p := range tt.peers {
  333. if sm, ok := p.(*raft); ok {
  334. l := ltoa(sm.raftLog)
  335. if g := diffu(base, l); g != "" {
  336. t.Errorf("#%d: diff:\n%s", i, g)
  337. }
  338. } else {
  339. t.Logf("#%d: empty log", i)
  340. }
  341. }
  342. }
  343. // TestOldMessagesReply - optimization - reply with new term.
  344. func TestProposal(t *testing.T) {
  345. tests := []struct {
  346. *network
  347. success bool
  348. }{
  349. {newNetwork(nil, nil, nil), true},
  350. {newNetwork(nil, nil, nopStepper), true},
  351. {newNetwork(nil, nopStepper, nopStepper), false},
  352. {newNetwork(nil, nopStepper, nopStepper, nil), false},
  353. {newNetwork(nil, nopStepper, nopStepper, nil, nil), true},
  354. }
  355. for i, tt := range tests {
  356. send := func(m pb.Message) {
  357. defer func() {
  358. // only recover is we expect it to panic so
  359. // panics we don't expect go up.
  360. if !tt.success {
  361. e := recover()
  362. if e != nil {
  363. t.Logf("#%d: err: %s", i, e)
  364. }
  365. }
  366. }()
  367. tt.send(m)
  368. }
  369. data := []byte("somedata")
  370. // promote 0 the leader
  371. send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  372. send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
  373. wantLog := newLog()
  374. if tt.success {
  375. wantLog = &raftLog{ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}}, committed: 2}
  376. }
  377. base := ltoa(wantLog)
  378. for i, p := range tt.peers {
  379. if sm, ok := p.(*raft); ok {
  380. l := ltoa(sm.raftLog)
  381. if g := diffu(base, l); g != "" {
  382. t.Errorf("#%d: diff:\n%s", i, g)
  383. }
  384. } else {
  385. t.Logf("#%d: empty log", i)
  386. }
  387. }
  388. sm := tt.network.peers[1].(*raft)
  389. if g := sm.Term; g != 1 {
  390. t.Errorf("#%d: term = %d, want %d", i, g, 1)
  391. }
  392. }
  393. }
  394. func TestProposalByProxy(t *testing.T) {
  395. data := []byte("somedata")
  396. tests := []*network{
  397. newNetwork(nil, nil, nil),
  398. newNetwork(nil, nil, nopStepper),
  399. }
  400. for i, tt := range tests {
  401. // promote 0 the leader
  402. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  403. // propose via follower
  404. tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
  405. wantLog := &raftLog{ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Data: data, Index: 2}}, committed: 2}
  406. base := ltoa(wantLog)
  407. for i, p := range tt.peers {
  408. if sm, ok := p.(*raft); ok {
  409. l := ltoa(sm.raftLog)
  410. if g := diffu(base, l); g != "" {
  411. t.Errorf("#%d: diff:\n%s", i, g)
  412. }
  413. } else {
  414. t.Logf("#%d: empty log", i)
  415. }
  416. }
  417. sm := tt.peers[1].(*raft)
  418. if g := sm.Term; g != 1 {
  419. t.Errorf("#%d: term = %d, want %d", i, g, 1)
  420. }
  421. }
  422. }
  423. func TestCompact(t *testing.T) {
  424. tests := []struct {
  425. compacti uint64
  426. nodes []uint64
  427. snapd []byte
  428. wpanic bool
  429. }{
  430. {1, []uint64{1, 2, 3}, []byte("some data"), false},
  431. {2, []uint64{1, 2, 3}, []byte("some data"), false},
  432. {4, []uint64{1, 2, 3}, []byte("some data"), true}, // compact out of range
  433. }
  434. for i, tt := range tests {
  435. func() {
  436. defer func() {
  437. if r := recover(); r != nil {
  438. if tt.wpanic != true {
  439. t.Errorf("%d: panic = %v, want %v", i, true, tt.wpanic)
  440. }
  441. }
  442. }()
  443. sm := &raft{
  444. state: StateLeader,
  445. raftLog: &raftLog{
  446. committed: 2,
  447. applied: 2,
  448. ents: []pb.Entry{{}, {Term: 1}, {Term: 1}, {Term: 1}},
  449. },
  450. }
  451. sm.compact(tt.compacti, tt.nodes, tt.snapd)
  452. sort.Sort(uint64Slice(sm.raftLog.snapshot.Nodes))
  453. if sm.raftLog.offset != tt.compacti {
  454. t.Errorf("%d: log.offset = %d, want %d", i, sm.raftLog.offset, tt.compacti)
  455. }
  456. if !reflect.DeepEqual(sm.raftLog.snapshot.Nodes, tt.nodes) {
  457. t.Errorf("%d: snap.nodes = %v, want %v", i, sm.raftLog.snapshot.Nodes, tt.nodes)
  458. }
  459. if !reflect.DeepEqual(sm.raftLog.snapshot.Data, tt.snapd) {
  460. t.Errorf("%d: snap.data = %v, want %v", i, sm.raftLog.snapshot.Data, tt.snapd)
  461. }
  462. }()
  463. }
  464. }
  465. func TestCommit(t *testing.T) {
  466. tests := []struct {
  467. matches []uint64
  468. logs []pb.Entry
  469. smTerm uint64
  470. w uint64
  471. }{
  472. // single
  473. {[]uint64{1}, []pb.Entry{{}, {Term: 1}}, 1, 1},
  474. {[]uint64{1}, []pb.Entry{{}, {Term: 1}}, 2, 0},
  475. {[]uint64{2}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 2, 2},
  476. {[]uint64{1}, []pb.Entry{{}, {Term: 2}}, 2, 1},
  477. // odd
  478. {[]uint64{2, 1, 1}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
  479. {[]uint64{2, 1, 1}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
  480. {[]uint64{2, 1, 2}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 2, 2},
  481. {[]uint64{2, 1, 2}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
  482. // even
  483. {[]uint64{2, 1, 1, 1}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
  484. {[]uint64{2, 1, 1, 1}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
  485. {[]uint64{2, 1, 1, 2}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
  486. {[]uint64{2, 1, 1, 2}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
  487. {[]uint64{2, 1, 2, 2}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 2, 2},
  488. {[]uint64{2, 1, 2, 2}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
  489. }
  490. for i, tt := range tests {
  491. prs := make(map[uint64]*progress)
  492. for j := 0; j < len(tt.matches); j++ {
  493. prs[uint64(j)] = &progress{tt.matches[j], tt.matches[j] + 1}
  494. }
  495. sm := &raft{raftLog: &raftLog{ents: tt.logs}, prs: prs, HardState: pb.HardState{Term: tt.smTerm}}
  496. sm.maybeCommit()
  497. if g := sm.raftLog.committed; g != tt.w {
  498. t.Errorf("#%d: committed = %d, want %d", i, g, tt.w)
  499. }
  500. }
  501. }
  502. func TestIsElectionTimeout(t *testing.T) {
  503. tests := []struct {
  504. elapse int
  505. wprobability float64
  506. round bool
  507. }{
  508. {5, 0, false},
  509. {13, 0.3, true},
  510. {15, 0.5, true},
  511. {18, 0.8, true},
  512. {20, 1, false},
  513. }
  514. for i, tt := range tests {
  515. sm := newRaft(1, []uint64{1}, 10, 1)
  516. sm.elapsed = tt.elapse
  517. c := 0
  518. for j := 0; j < 10000; j++ {
  519. if sm.isElectionTimeout() {
  520. c++
  521. }
  522. }
  523. got := float64(c) / 10000.0
  524. if tt.round {
  525. got = math.Floor(got*10+0.5) / 10.0
  526. }
  527. if got != tt.wprobability {
  528. t.Errorf("#%d: possibility = %v, want %v", i, got, tt.wprobability)
  529. }
  530. }
  531. }
  532. // ensure that the Step function ignores the message from old term and does not pass it to the
  533. // acutal stepX function.
  534. func TestStepIgnoreOldTermMsg(t *testing.T) {
  535. called := false
  536. fakeStep := func(r *raft, m pb.Message) {
  537. called = true
  538. }
  539. sm := newRaft(1, []uint64{1}, 10, 1)
  540. sm.step = fakeStep
  541. sm.Term = 2
  542. sm.Step(pb.Message{Type: pb.MsgApp, Term: sm.Term - 1})
  543. if called == true {
  544. t.Errorf("stepFunc called = %v , want %v", called, false)
  545. }
  546. }
  547. // TestHandleMsgApp ensures:
  548. // 1. Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm.
  549. // 2. If an existing entry conflicts with a new one (same index but different terms),
  550. // delete the existing entry and all that follow it; append any new entries not already in the log.
  551. // 3. If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry).
  552. func TestHandleMsgApp(t *testing.T) {
  553. tests := []struct {
  554. m pb.Message
  555. wIndex uint64
  556. wCommit uint64
  557. wReject bool
  558. }{
  559. // Ensure 1
  560. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 3, Index: 2, Commit: 3}, 2, 0, true}, // previous log mismatch
  561. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 3, Index: 3, Commit: 3}, 2, 0, true}, // previous log non-exist
  562. // Ensure 2
  563. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 1, Index: 1, Commit: 1}, 2, 1, false},
  564. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 0, Index: 0, Commit: 1, Entries: []pb.Entry{{Term: 2}}}, 1, 1, false},
  565. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 3, Entries: []pb.Entry{{Term: 2}, {Term: 2}}}, 4, 3, false},
  566. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 4, Entries: []pb.Entry{{Term: 2}}}, 3, 3, false},
  567. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 1, Index: 1, Commit: 4, Entries: []pb.Entry{{Term: 2}}}, 2, 2, false},
  568. // Ensure 3
  569. {pb.Message{Type: pb.MsgApp, Term: 1, LogTerm: 1, Index: 1, Commit: 3}, 2, 1, false}, // match entry 1, commit upto last new entry 1
  570. {pb.Message{Type: pb.MsgApp, Term: 1, LogTerm: 1, Index: 1, Commit: 3, Entries: []pb.Entry{{Term: 2}}}, 2, 2, false}, // match entry 1, commit upto last new entry 2
  571. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 3}, 2, 2, false}, // match entry 2, commit upto last new entry 2
  572. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 4}, 2, 2, false}, // commit upto log.last()
  573. }
  574. for i, tt := range tests {
  575. sm := &raft{
  576. state: StateFollower,
  577. HardState: pb.HardState{Term: 2},
  578. raftLog: &raftLog{committed: 0, ents: []pb.Entry{{}, {Term: 1}, {Term: 2}}},
  579. }
  580. sm.handleAppendEntries(tt.m)
  581. if sm.raftLog.lastIndex() != tt.wIndex {
  582. t.Errorf("#%d: lastIndex = %d, want %d", i, sm.raftLog.lastIndex(), tt.wIndex)
  583. }
  584. if sm.raftLog.committed != tt.wCommit {
  585. t.Errorf("#%d: committed = %d, want %d", i, sm.raftLog.committed, tt.wCommit)
  586. }
  587. m := sm.readMessages()
  588. if len(m) != 1 {
  589. t.Fatalf("#%d: msg = nil, want 1", i)
  590. }
  591. if m[0].Reject != tt.wReject {
  592. t.Errorf("#%d: reject = %v, want %v", i, m[0].Reject, tt.wReject)
  593. }
  594. }
  595. }
  596. // TestHandleHeartbeat ensures that the follower commits to the commit in the message.
  597. func TestHandleHeartbeat(t *testing.T) {
  598. commit := uint64(2)
  599. tests := []struct {
  600. m pb.Message
  601. wCommit uint64
  602. }{
  603. {pb.Message{Type: pb.MsgApp, Term: 2, Commit: commit + 1}, commit + 1},
  604. {pb.Message{Type: pb.MsgApp, Term: 2, Commit: commit - 1}, commit}, // do not decrease commit
  605. }
  606. for i, tt := range tests {
  607. sm := &raft{
  608. state: StateFollower,
  609. HardState: pb.HardState{Term: 2},
  610. raftLog: &raftLog{committed: 0, ents: []pb.Entry{{}, {Term: 1}, {Term: 2}, {Term: 3}}},
  611. }
  612. sm.raftLog.commitTo(commit)
  613. sm.handleHeartbeat(tt.m)
  614. if sm.raftLog.committed != tt.wCommit {
  615. t.Errorf("#%d: committed = %d, want %d", i, sm.raftLog.committed, tt.wCommit)
  616. }
  617. m := sm.readMessages()
  618. if len(m) != 0 {
  619. t.Fatalf("#%d: msg = nil, want 0", i)
  620. }
  621. }
  622. }
  623. func TestRecvMsgVote(t *testing.T) {
  624. tests := []struct {
  625. state StateType
  626. i, term uint64
  627. voteFor uint64
  628. wreject bool
  629. }{
  630. {StateFollower, 0, 0, None, true},
  631. {StateFollower, 0, 1, None, true},
  632. {StateFollower, 0, 2, None, true},
  633. {StateFollower, 0, 3, None, false},
  634. {StateFollower, 1, 0, None, true},
  635. {StateFollower, 1, 1, None, true},
  636. {StateFollower, 1, 2, None, true},
  637. {StateFollower, 1, 3, None, false},
  638. {StateFollower, 2, 0, None, true},
  639. {StateFollower, 2, 1, None, true},
  640. {StateFollower, 2, 2, None, false},
  641. {StateFollower, 2, 3, None, false},
  642. {StateFollower, 3, 0, None, true},
  643. {StateFollower, 3, 1, None, true},
  644. {StateFollower, 3, 2, None, false},
  645. {StateFollower, 3, 3, None, false},
  646. {StateFollower, 3, 2, 2, false},
  647. {StateFollower, 3, 2, 1, true},
  648. {StateLeader, 3, 3, 1, true},
  649. {StateCandidate, 3, 3, 1, true},
  650. }
  651. for i, tt := range tests {
  652. sm := newRaft(1, []uint64{1}, 10, 1)
  653. sm.state = tt.state
  654. switch tt.state {
  655. case StateFollower:
  656. sm.step = stepFollower
  657. case StateCandidate:
  658. sm.step = stepCandidate
  659. case StateLeader:
  660. sm.step = stepLeader
  661. }
  662. sm.HardState = pb.HardState{Vote: tt.voteFor}
  663. sm.raftLog = &raftLog{ents: []pb.Entry{{}, {Term: 2}, {Term: 2}}}
  664. sm.Step(pb.Message{Type: pb.MsgVote, From: 2, Index: tt.i, LogTerm: tt.term})
  665. msgs := sm.readMessages()
  666. if g := len(msgs); g != 1 {
  667. t.Fatalf("#%d: len(msgs) = %d, want 1", i, g)
  668. continue
  669. }
  670. if g := msgs[0].Reject; g != tt.wreject {
  671. t.Errorf("#%d, m.Reject = %v, want %v", i, g, tt.wreject)
  672. }
  673. }
  674. }
  675. func TestStateTransition(t *testing.T) {
  676. tests := []struct {
  677. from StateType
  678. to StateType
  679. wallow bool
  680. wterm uint64
  681. wlead uint64
  682. }{
  683. {StateFollower, StateFollower, true, 1, None},
  684. {StateFollower, StateCandidate, true, 1, None},
  685. {StateFollower, StateLeader, false, 0, None},
  686. {StateCandidate, StateFollower, true, 0, None},
  687. {StateCandidate, StateCandidate, true, 1, None},
  688. {StateCandidate, StateLeader, true, 0, 1},
  689. {StateLeader, StateFollower, true, 1, None},
  690. {StateLeader, StateCandidate, false, 1, None},
  691. {StateLeader, StateLeader, true, 0, 1},
  692. }
  693. for i, tt := range tests {
  694. func() {
  695. defer func() {
  696. if r := recover(); r != nil {
  697. if tt.wallow == true {
  698. t.Errorf("%d: allow = %v, want %v", i, false, true)
  699. }
  700. }
  701. }()
  702. sm := newRaft(1, []uint64{1}, 10, 1)
  703. sm.state = tt.from
  704. switch tt.to {
  705. case StateFollower:
  706. sm.becomeFollower(tt.wterm, tt.wlead)
  707. case StateCandidate:
  708. sm.becomeCandidate()
  709. case StateLeader:
  710. sm.becomeLeader()
  711. }
  712. if sm.Term != tt.wterm {
  713. t.Errorf("%d: term = %d, want %d", i, sm.Term, tt.wterm)
  714. }
  715. if sm.lead != tt.wlead {
  716. t.Errorf("%d: lead = %d, want %d", i, sm.lead, tt.wlead)
  717. }
  718. }()
  719. }
  720. }
  721. func TestAllServerStepdown(t *testing.T) {
  722. tests := []struct {
  723. state StateType
  724. wstate StateType
  725. wterm uint64
  726. windex uint64
  727. }{
  728. {StateFollower, StateFollower, 3, 1},
  729. {StateCandidate, StateFollower, 3, 1},
  730. {StateLeader, StateFollower, 3, 2},
  731. }
  732. tmsgTypes := [...]pb.MessageType{pb.MsgVote, pb.MsgApp}
  733. tterm := uint64(3)
  734. for i, tt := range tests {
  735. sm := newRaft(1, []uint64{1, 2, 3}, 10, 1)
  736. switch tt.state {
  737. case StateFollower:
  738. sm.becomeFollower(1, None)
  739. case StateCandidate:
  740. sm.becomeCandidate()
  741. case StateLeader:
  742. sm.becomeCandidate()
  743. sm.becomeLeader()
  744. }
  745. for j, msgType := range tmsgTypes {
  746. sm.Step(pb.Message{From: 2, Type: msgType, Term: tterm, LogTerm: tterm})
  747. if sm.state != tt.wstate {
  748. t.Errorf("#%d.%d state = %v , want %v", i, j, sm.state, tt.wstate)
  749. }
  750. if sm.Term != tt.wterm {
  751. t.Errorf("#%d.%d term = %v , want %v", i, j, sm.Term, tt.wterm)
  752. }
  753. if uint64(len(sm.raftLog.ents)) != tt.windex {
  754. t.Errorf("#%d.%d index = %v , want %v", i, j, len(sm.raftLog.ents), tt.windex)
  755. }
  756. wlead := uint64(2)
  757. if msgType == pb.MsgVote {
  758. wlead = None
  759. }
  760. if sm.lead != wlead {
  761. t.Errorf("#%d, sm.lead = %d, want %d", i, sm.lead, None)
  762. }
  763. }
  764. }
  765. }
  766. func TestLeaderAppResp(t *testing.T) {
  767. // initial progress: match = 0; netx = 3
  768. tests := []struct {
  769. index uint64
  770. reject bool
  771. // progress
  772. wmatch uint64
  773. wnext uint64
  774. // message
  775. wmsgNum int
  776. windex uint64
  777. wcommitted uint64
  778. }{
  779. {3, true, 0, 3, 0, 0, 0}, // stale resp; no replies
  780. {2, true, 0, 2, 1, 1, 0}, // denied resp; leader does not commit; decrese next and send probing msg
  781. {2, false, 2, 3, 2, 2, 2}, // accept resp; leader commits; broadcast with commit index
  782. {0, false, 0, 3, 0, 0, 0}, // ignore heartbeat replies
  783. }
  784. for i, tt := range tests {
  785. // sm term is 1 after it becomes the leader.
  786. // thus the last log term must be 1 to be committed.
  787. sm := newRaft(1, []uint64{1, 2, 3}, 10, 1)
  788. sm.raftLog = &raftLog{ents: []pb.Entry{{}, {Term: 0}, {Term: 1}}}
  789. sm.becomeCandidate()
  790. sm.becomeLeader()
  791. sm.readMessages()
  792. sm.Step(pb.Message{From: 2, Type: pb.MsgAppResp, Index: tt.index, Term: sm.Term, Reject: tt.reject})
  793. p := sm.prs[2]
  794. if p.match != tt.wmatch {
  795. t.Errorf("#%d match = %d, want %d", i, p.match, tt.wmatch)
  796. }
  797. if p.next != tt.wnext {
  798. t.Errorf("#%d next = %d, want %d", i, p.next, tt.wnext)
  799. }
  800. msgs := sm.readMessages()
  801. if len(msgs) != tt.wmsgNum {
  802. t.Errorf("#%d msgNum = %d, want %d", i, len(msgs), tt.wmsgNum)
  803. }
  804. for j, msg := range msgs {
  805. if msg.Index != tt.windex {
  806. t.Errorf("#%d.%d index = %d, want %d", i, j, msg.Index, tt.windex)
  807. }
  808. if msg.Commit != tt.wcommitted {
  809. t.Errorf("#%d.%d commit = %d, want %d", i, j, msg.Commit, tt.wcommitted)
  810. }
  811. }
  812. }
  813. }
  814. // When the leader receives a heartbeat tick, it should
  815. // send a MsgApp with m.Index = 0, m.LogTerm=0 and empty entries.
  816. func TestBcastBeat(t *testing.T) {
  817. offset := uint64(1000)
  818. // make a state machine with log.offset = 1000
  819. s := pb.Snapshot{
  820. Index: offset,
  821. Term: 1,
  822. Nodes: []uint64{1, 2, 3},
  823. }
  824. sm := newRaft(1, []uint64{1, 2, 3}, 10, 1)
  825. sm.Term = 1
  826. sm.restore(s)
  827. sm.becomeCandidate()
  828. sm.becomeLeader()
  829. for i := 0; i < 10; i++ {
  830. sm.appendEntry(pb.Entry{})
  831. }
  832. // slow follower
  833. sm.prs[2].match, sm.prs[2].next = 5, 6
  834. // normal follower
  835. sm.prs[3].match, sm.prs[3].next = sm.raftLog.lastIndex(), sm.raftLog.lastIndex()+1
  836. sm.Step(pb.Message{Type: pb.MsgBeat})
  837. msgs := sm.readMessages()
  838. if len(msgs) != 2 {
  839. t.Fatalf("len(msgs) = %v, want 2", len(msgs))
  840. }
  841. wantCommitMap := map[uint64]uint64{
  842. 2: min(sm.raftLog.committed, sm.prs[2].match),
  843. 3: min(sm.raftLog.committed, sm.prs[3].match),
  844. }
  845. for i, m := range msgs {
  846. if m.Type != pb.MsgApp {
  847. t.Fatalf("#%d: type = %v, want = %v", i, m.Type, pb.MsgApp)
  848. }
  849. if m.Index != 0 {
  850. t.Fatalf("#%d: prevIndex = %d, want %d", i, m.Index, 0)
  851. }
  852. if m.LogTerm != 0 {
  853. t.Fatalf("#%d: prevTerm = %d, want %d", i, m.LogTerm, 0)
  854. }
  855. if wantCommitMap[m.To] == 0 {
  856. t.Fatalf("#%d: unexpected to %d", i, m.To)
  857. } else {
  858. if m.Commit != wantCommitMap[m.To] {
  859. t.Fatalf("#%d: commit = %d, want %d", i, m.Commit, wantCommitMap[m.To])
  860. }
  861. delete(wantCommitMap, m.To)
  862. }
  863. if len(m.Entries) != 0 {
  864. t.Fatalf("#%d: len(entries) = %d, want 0", i, len(m.Entries))
  865. }
  866. }
  867. }
  868. // tests the output of the statemachine when receiving MsgBeat
  869. func TestRecvMsgBeat(t *testing.T) {
  870. tests := []struct {
  871. state StateType
  872. wMsg int
  873. }{
  874. {StateLeader, 2},
  875. // candidate and follower should ignore MsgBeat
  876. {StateCandidate, 0},
  877. {StateFollower, 0},
  878. }
  879. for i, tt := range tests {
  880. sm := newRaft(1, []uint64{1, 2, 3}, 10, 1)
  881. sm.raftLog = &raftLog{ents: []pb.Entry{{}, {Term: 0}, {Term: 1}}}
  882. sm.Term = 1
  883. sm.state = tt.state
  884. switch tt.state {
  885. case StateFollower:
  886. sm.step = stepFollower
  887. case StateCandidate:
  888. sm.step = stepCandidate
  889. case StateLeader:
  890. sm.step = stepLeader
  891. }
  892. sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
  893. msgs := sm.readMessages()
  894. if len(msgs) != tt.wMsg {
  895. t.Errorf("%d: len(msgs) = %d, want %d", i, len(msgs), tt.wMsg)
  896. }
  897. for _, m := range msgs {
  898. if m.Type != pb.MsgApp {
  899. t.Errorf("%d: msg.type = %v, want %v", i, m.Type, pb.MsgApp)
  900. }
  901. }
  902. }
  903. }
  904. func TestRestore(t *testing.T) {
  905. s := pb.Snapshot{
  906. Index: 11, // magic number
  907. Term: 11, // magic number
  908. Nodes: []uint64{1, 2, 3},
  909. }
  910. sm := newRaft(1, []uint64{1, 2}, 10, 1)
  911. if ok := sm.restore(s); !ok {
  912. t.Fatal("restore fail, want succeed")
  913. }
  914. if sm.raftLog.lastIndex() != s.Index {
  915. t.Errorf("log.lastIndex = %d, want %d", sm.raftLog.lastIndex(), s.Index)
  916. }
  917. if sm.raftLog.term(s.Index) != s.Term {
  918. t.Errorf("log.lastTerm = %d, want %d", sm.raftLog.term(s.Index), s.Term)
  919. }
  920. sg := sm.nodes()
  921. if !reflect.DeepEqual(sg, s.Nodes) {
  922. t.Errorf("sm.Nodes = %+v, want %+v", sg, s.Nodes)
  923. }
  924. if !reflect.DeepEqual(sm.raftLog.snapshot, s) {
  925. t.Errorf("snapshot = %+v, want %+v", sm.raftLog.snapshot, s)
  926. }
  927. if ok := sm.restore(s); ok {
  928. t.Fatal("restore succeed, want fail")
  929. }
  930. }
  931. func TestProvideSnap(t *testing.T) {
  932. s := pb.Snapshot{
  933. Index: 11, // magic number
  934. Term: 11, // magic number
  935. Nodes: []uint64{1, 2},
  936. }
  937. sm := newRaft(1, []uint64{1}, 10, 1)
  938. // restore the statemachin from a snapshot
  939. // so it has a compacted log and a snapshot
  940. sm.restore(s)
  941. sm.becomeCandidate()
  942. sm.becomeLeader()
  943. // force set the next of node 1, so that
  944. // node 1 needs a snapshot
  945. sm.prs[2].next = sm.raftLog.offset
  946. sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs[2].next - 1, Reject: true})
  947. msgs := sm.readMessages()
  948. if len(msgs) != 1 {
  949. t.Fatalf("len(msgs) = %d, want 1", len(msgs))
  950. }
  951. m := msgs[0]
  952. if m.Type != pb.MsgSnap {
  953. t.Errorf("m.Type = %v, want %v", m.Type, pb.MsgSnap)
  954. }
  955. }
  956. func TestRestoreFromSnapMsg(t *testing.T) {
  957. s := pb.Snapshot{
  958. Index: 11, // magic number
  959. Term: 11, // magic number
  960. Nodes: []uint64{1, 2},
  961. }
  962. m := pb.Message{Type: pb.MsgSnap, From: 1, Term: 2, Snapshot: s}
  963. sm := newRaft(2, []uint64{1, 2}, 10, 1)
  964. sm.Step(m)
  965. if !reflect.DeepEqual(sm.raftLog.snapshot, s) {
  966. t.Errorf("snapshot = %+v, want %+v", sm.raftLog.snapshot, s)
  967. }
  968. }
  969. func TestSlowNodeRestore(t *testing.T) {
  970. nt := newNetwork(nil, nil, nil)
  971. nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  972. nt.isolate(3)
  973. for j := 0; j <= 100; j++ {
  974. nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
  975. }
  976. lead := nt.peers[1].(*raft)
  977. nextEnts(lead)
  978. lead.compact(lead.raftLog.applied, lead.nodes(), nil)
  979. nt.recover()
  980. // trigger a snapshot
  981. nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
  982. follower := nt.peers[3].(*raft)
  983. if !reflect.DeepEqual(follower.raftLog.snapshot, lead.raftLog.snapshot) {
  984. t.Errorf("follower.snap = %+v, want %+v", follower.raftLog.snapshot, lead.raftLog.snapshot)
  985. }
  986. // trigger a commit
  987. nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
  988. if follower.raftLog.committed != lead.raftLog.committed {
  989. t.Errorf("follower.comitted = %d, want %d", follower.raftLog.committed, lead.raftLog.committed)
  990. }
  991. }
  992. // TestStepConfig tests that when raft step msgProp in EntryConfChange type,
  993. // it appends the entry to log and sets pendingConf to be true.
  994. func TestStepConfig(t *testing.T) {
  995. // a raft that cannot make progress
  996. r := newRaft(1, []uint64{1, 2}, 10, 1)
  997. r.becomeCandidate()
  998. r.becomeLeader()
  999. index := r.raftLog.lastIndex()
  1000. r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
  1001. if g := r.raftLog.lastIndex(); g != index+1 {
  1002. t.Errorf("index = %d, want %d", g, index+1)
  1003. }
  1004. if r.pendingConf != true {
  1005. t.Errorf("pendingConf = %v, want true", r.pendingConf)
  1006. }
  1007. }
  1008. // TestStepIgnoreConfig tests that if raft step the second msgProp in
  1009. // EntryConfChange type when the first one is uncommitted, the node will deny
  1010. // the proposal and keep its original state.
  1011. func TestStepIgnoreConfig(t *testing.T) {
  1012. // a raft that cannot make progress
  1013. r := newRaft(1, []uint64{1, 2}, 10, 1)
  1014. r.becomeCandidate()
  1015. r.becomeLeader()
  1016. r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
  1017. index := r.raftLog.lastIndex()
  1018. pendingConf := r.pendingConf
  1019. r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
  1020. if g := r.raftLog.lastIndex(); g != index {
  1021. t.Errorf("index = %d, want %d", g, index)
  1022. }
  1023. if r.pendingConf != pendingConf {
  1024. t.Errorf("pendingConf = %v, want %v", r.pendingConf, pendingConf)
  1025. }
  1026. }
  1027. // TestRecoverPendingConfig tests that new leader recovers its pendingConf flag
  1028. // based on uncommitted entries.
  1029. func TestRecoverPendingConfig(t *testing.T) {
  1030. tests := []struct {
  1031. entType pb.EntryType
  1032. wpending bool
  1033. }{
  1034. {pb.EntryNormal, false},
  1035. {pb.EntryConfChange, true},
  1036. }
  1037. for i, tt := range tests {
  1038. r := newRaft(1, []uint64{1, 2}, 10, 1)
  1039. r.appendEntry(pb.Entry{Type: tt.entType})
  1040. r.becomeCandidate()
  1041. r.becomeLeader()
  1042. if r.pendingConf != tt.wpending {
  1043. t.Errorf("#%d: pendingConf = %v, want %v", i, r.pendingConf, tt.wpending)
  1044. }
  1045. }
  1046. }
  1047. // TestRecoverDoublePendingConfig tests that new leader will panic if
  1048. // there exist two uncommitted config entries.
  1049. func TestRecoverDoublePendingConfig(t *testing.T) {
  1050. func() {
  1051. defer func() {
  1052. if err := recover(); err == nil {
  1053. t.Errorf("expect panic, but nothing happens")
  1054. }
  1055. }()
  1056. r := newRaft(1, []uint64{1, 2}, 10, 1)
  1057. r.appendEntry(pb.Entry{Type: pb.EntryConfChange})
  1058. r.appendEntry(pb.Entry{Type: pb.EntryConfChange})
  1059. r.becomeCandidate()
  1060. r.becomeLeader()
  1061. }()
  1062. }
  1063. // TestAddNode tests that addNode could update pendingConf and nodes correctly.
  1064. func TestAddNode(t *testing.T) {
  1065. r := newRaft(1, []uint64{1}, 10, 1)
  1066. r.pendingConf = true
  1067. r.addNode(2)
  1068. if r.pendingConf != false {
  1069. t.Errorf("pendingConf = %v, want false", r.pendingConf)
  1070. }
  1071. nodes := r.nodes()
  1072. wnodes := []uint64{1, 2}
  1073. if !reflect.DeepEqual(nodes, wnodes) {
  1074. t.Errorf("nodes = %v, want %v", nodes, wnodes)
  1075. }
  1076. }
  1077. // TestRemoveNode tests that removeNode could update pendingConf, nodes and
  1078. // and removed list correctly.
  1079. func TestRemoveNode(t *testing.T) {
  1080. r := newRaft(1, []uint64{1, 2}, 10, 1)
  1081. r.pendingConf = true
  1082. r.removeNode(2)
  1083. if r.pendingConf != false {
  1084. t.Errorf("pendingConf = %v, want false", r.pendingConf)
  1085. }
  1086. w := []uint64{1}
  1087. if g := r.nodes(); !reflect.DeepEqual(g, w) {
  1088. t.Errorf("nodes = %v, want %v", g, w)
  1089. }
  1090. }
  1091. func TestPromotable(t *testing.T) {
  1092. id := uint64(1)
  1093. tests := []struct {
  1094. peers []uint64
  1095. wp bool
  1096. }{
  1097. {[]uint64{1}, true},
  1098. {[]uint64{1, 2, 3}, true},
  1099. {[]uint64{}, false},
  1100. {[]uint64{2, 3}, false},
  1101. }
  1102. for i, tt := range tests {
  1103. r := &raft{id: id, prs: make(map[uint64]*progress)}
  1104. for _, id := range tt.peers {
  1105. r.prs[id] = &progress{}
  1106. }
  1107. if g := r.promotable(); g != tt.wp {
  1108. t.Errorf("#%d: promotable = %v, want %v", i, g, tt.wp)
  1109. }
  1110. }
  1111. }
  1112. func TestRaftNodes(t *testing.T) {
  1113. tests := []struct {
  1114. ids []uint64
  1115. wids []uint64
  1116. }{
  1117. {
  1118. []uint64{1, 2, 3},
  1119. []uint64{1, 2, 3},
  1120. },
  1121. {
  1122. []uint64{3, 2, 1},
  1123. []uint64{1, 2, 3},
  1124. },
  1125. }
  1126. for i, tt := range tests {
  1127. r := newRaft(1, tt.ids, 10, 1)
  1128. if !reflect.DeepEqual(r.nodes(), tt.wids) {
  1129. t.Errorf("#%d: nodes = %+v, want %+v", i, r.nodes(), tt.wids)
  1130. }
  1131. }
  1132. }
  1133. func ents(terms ...uint64) *raft {
  1134. ents := []pb.Entry{{}}
  1135. for _, term := range terms {
  1136. ents = append(ents, pb.Entry{Term: term})
  1137. }
  1138. sm := &raft{raftLog: &raftLog{ents: ents}}
  1139. sm.reset(0)
  1140. return sm
  1141. }
  1142. type network struct {
  1143. peers map[uint64]Interface
  1144. dropm map[connem]float64
  1145. ignorem map[pb.MessageType]bool
  1146. }
  1147. // newNetwork initializes a network from peers.
  1148. // A nil node will be replaced with a new *stateMachine.
  1149. // A *stateMachine will get its k, id.
  1150. // When using stateMachine, the address list is always [1, n].
  1151. func newNetwork(peers ...Interface) *network {
  1152. size := len(peers)
  1153. peerAddrs := idsBySize(size)
  1154. npeers := make(map[uint64]Interface, size)
  1155. for i, p := range peers {
  1156. id := peerAddrs[i]
  1157. switch v := p.(type) {
  1158. case nil:
  1159. sm := newRaft(id, peerAddrs, 10, 1)
  1160. npeers[id] = sm
  1161. case *raft:
  1162. v.id = id
  1163. v.prs = make(map[uint64]*progress)
  1164. for i := 0; i < size; i++ {
  1165. v.prs[peerAddrs[i]] = &progress{}
  1166. }
  1167. v.reset(0)
  1168. npeers[id] = v
  1169. case *blackHole:
  1170. npeers[id] = v
  1171. default:
  1172. panic(fmt.Sprintf("unexpected state machine type: %T", p))
  1173. }
  1174. }
  1175. return &network{
  1176. peers: npeers,
  1177. dropm: make(map[connem]float64),
  1178. ignorem: make(map[pb.MessageType]bool),
  1179. }
  1180. }
  1181. func (nw *network) send(msgs ...pb.Message) {
  1182. for len(msgs) > 0 {
  1183. m := msgs[0]
  1184. p := nw.peers[m.To]
  1185. p.Step(m)
  1186. msgs = append(msgs[1:], nw.filter(p.readMessages())...)
  1187. }
  1188. }
  1189. func (nw *network) drop(from, to uint64, perc float64) {
  1190. nw.dropm[connem{from, to}] = perc
  1191. }
  1192. func (nw *network) cut(one, other uint64) {
  1193. nw.drop(one, other, 1)
  1194. nw.drop(other, one, 1)
  1195. }
  1196. func (nw *network) isolate(id uint64) {
  1197. for i := 0; i < len(nw.peers); i++ {
  1198. nid := uint64(i) + 1
  1199. if nid != id {
  1200. nw.drop(id, nid, 1.0)
  1201. nw.drop(nid, id, 1.0)
  1202. }
  1203. }
  1204. }
  1205. func (nw *network) ignore(t pb.MessageType) {
  1206. nw.ignorem[t] = true
  1207. }
  1208. func (nw *network) recover() {
  1209. nw.dropm = make(map[connem]float64)
  1210. nw.ignorem = make(map[pb.MessageType]bool)
  1211. }
  1212. func (nw *network) filter(msgs []pb.Message) []pb.Message {
  1213. mm := []pb.Message{}
  1214. for _, m := range msgs {
  1215. if nw.ignorem[m.Type] {
  1216. continue
  1217. }
  1218. switch m.Type {
  1219. case pb.MsgHup:
  1220. // hups never go over the network, so don't drop them but panic
  1221. panic("unexpected msgHup")
  1222. default:
  1223. perc := nw.dropm[connem{m.From, m.To}]
  1224. if n := rand.Float64(); n < perc {
  1225. continue
  1226. }
  1227. }
  1228. mm = append(mm, m)
  1229. }
  1230. return mm
  1231. }
  1232. type connem struct {
  1233. from, to uint64
  1234. }
  1235. type blackHole struct{}
  1236. func (blackHole) Step(pb.Message) error { return nil }
  1237. func (blackHole) readMessages() []pb.Message { return nil }
  1238. var nopStepper = &blackHole{}
  1239. func idsBySize(size int) []uint64 {
  1240. ids := make([]uint64, size)
  1241. for i := 0; i < size; i++ {
  1242. ids[i] = 1 + uint64(i)
  1243. }
  1244. return ids
  1245. }