raft_test.go 45 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package raft
  14. import (
  15. "bytes"
  16. "fmt"
  17. "math"
  18. "math/rand"
  19. "reflect"
  20. "testing"
  21. pb "github.com/coreos/etcd/raft/raftpb"
  22. )
  23. // nextEnts returns the appliable entries and updates the applied index
  24. func nextEnts(r *raft, s *MemoryStorage) (ents []pb.Entry) {
  25. // Transfer all unstable entries to "stable" storage.
  26. s.Append(r.raftLog.unstableEntries())
  27. r.raftLog.stableTo(r.raftLog.lastIndex(), r.raftLog.lastTerm())
  28. ents = r.raftLog.nextEnts()
  29. r.raftLog.appliedTo(r.raftLog.committed)
  30. return ents
  31. }
  32. type Interface interface {
  33. Step(m pb.Message) error
  34. readMessages() []pb.Message
  35. }
  36. func (r *raft) readMessages() []pb.Message {
  37. msgs := r.msgs
  38. r.msgs = make([]pb.Message, 0)
  39. return msgs
  40. }
  41. func TestProgressUpdate(t *testing.T) {
  42. prevM, prevN := uint64(3), uint64(5)
  43. tests := []struct {
  44. update uint64
  45. wm uint64
  46. wn uint64
  47. }{
  48. {prevM - 1, prevM, prevN}, // do not decrease match, next
  49. {prevM, prevM, prevN}, // do not decrease next
  50. {prevM + 1, prevM + 1, prevN}, // increase match, do not decrease next
  51. {prevM + 2, prevM + 2, prevN + 1}, // increase match, next
  52. }
  53. for i, tt := range tests {
  54. p := &Progress{
  55. Match: prevM,
  56. Next: prevN,
  57. }
  58. p.update(tt.update)
  59. if p.Match != tt.wm {
  60. t.Errorf("#%d: match= %d, want %d", i, p.Match, tt.wm)
  61. }
  62. if p.Next != tt.wn {
  63. t.Errorf("#%d: next= %d, want %d", i, p.Next, tt.wn)
  64. }
  65. }
  66. }
  67. func TestProgressMaybeDecr(t *testing.T) {
  68. tests := []struct {
  69. m uint64
  70. n uint64
  71. rejected uint64
  72. last uint64
  73. w bool
  74. wn uint64
  75. }{
  76. {
  77. // match != 0 is always false
  78. 1, 0, 0, 0, false, 0,
  79. },
  80. {
  81. // match != 0 and to is greater than match
  82. // directly decrease to match+1
  83. 5, 10, 5, 5, false, 10,
  84. },
  85. {
  86. // match != 0 and to is greater than match
  87. // directly decrease to match+1
  88. 5, 10, 4, 4, false, 10,
  89. },
  90. {
  91. // match != 0 and to is not greater than match
  92. 5, 10, 9, 9, true, 6,
  93. },
  94. {
  95. // next-1 != rejected is always false
  96. 0, 0, 0, 0, false, 0,
  97. },
  98. {
  99. // next-1 != rejected is always false
  100. 0, 10, 5, 5, false, 10,
  101. },
  102. {
  103. // next>1 = decremented by 1
  104. 0, 10, 9, 9, true, 9,
  105. },
  106. {
  107. // next>1 = decremented by 1
  108. 0, 2, 1, 1, true, 1,
  109. },
  110. {
  111. // next<=1 = reset to 1
  112. 0, 1, 0, 0, true, 1,
  113. },
  114. {
  115. // decrease to min(rejected, last+1)
  116. 0, 10, 9, 2, true, 3,
  117. },
  118. {
  119. // rejected < 1, reset to 1
  120. 0, 10, 9, 0, true, 1,
  121. },
  122. }
  123. for i, tt := range tests {
  124. p := &Progress{
  125. Match: tt.m,
  126. Next: tt.n,
  127. }
  128. if g := p.maybeDecrTo(tt.rejected, tt.last); g != tt.w {
  129. t.Errorf("#%d: maybeDecrTo= %t, want %t", i, g, tt.w)
  130. }
  131. if gm := p.Match; gm != tt.m {
  132. t.Errorf("#%d: match= %d, want %d", i, gm, tt.m)
  133. }
  134. if gn := p.Next; gn != tt.wn {
  135. t.Errorf("#%d: next= %d, want %d", i, gn, tt.wn)
  136. }
  137. }
  138. }
  139. func TestProgressShouldWait(t *testing.T) {
  140. tests := []struct {
  141. m uint64
  142. wait int
  143. w bool
  144. }{
  145. // match != 0 is always not wait
  146. {1, 0, false},
  147. {1, 1, false},
  148. {0, 1, true},
  149. {0, 0, false},
  150. }
  151. for i, tt := range tests {
  152. p := &Progress{
  153. Match: tt.m,
  154. Wait: tt.wait,
  155. }
  156. if g := p.shouldWait(); g != tt.w {
  157. t.Errorf("#%d: shouldwait = %t, want %t", i, g, tt.w)
  158. }
  159. }
  160. }
  161. // TestProgressWaitReset ensures that progress.Update and progress.DercTo
  162. // will reset progress.wait.
  163. func TestProgressWaitReset(t *testing.T) {
  164. p := &Progress{
  165. Wait: 1,
  166. }
  167. p.maybeDecrTo(1, 1)
  168. if p.Wait != 0 {
  169. t.Errorf("wait= %d, want 0", p.Wait)
  170. }
  171. p.Wait = 1
  172. p.update(2)
  173. if p.Wait != 0 {
  174. t.Errorf("wait= %d, want 0", p.Wait)
  175. }
  176. }
  177. // TestProgressDecr ensures raft.heartbeat decreases progress.wait by heartbeat.
  178. func TestProgressDecr(t *testing.T) {
  179. r := newRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage())
  180. r.becomeCandidate()
  181. r.becomeLeader()
  182. r.prs[2].Wait = r.heartbeatTimeout * 2
  183. r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
  184. if r.prs[2].Wait != r.heartbeatTimeout*(2-1) {
  185. t.Errorf("wait = %d, want %d", r.prs[2].Wait, r.heartbeatTimeout*(2-1))
  186. }
  187. }
  188. func TestProgressWait(t *testing.T) {
  189. r := newRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage())
  190. r.becomeCandidate()
  191. r.becomeLeader()
  192. r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
  193. r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
  194. r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
  195. ms := r.readMessages()
  196. if len(ms) != 1 {
  197. t.Errorf("len(ms) = %d, want 1", len(ms))
  198. }
  199. }
  200. func TestLeaderElection(t *testing.T) {
  201. tests := []struct {
  202. *network
  203. state StateType
  204. }{
  205. {newNetwork(nil, nil, nil), StateLeader},
  206. {newNetwork(nil, nil, nopStepper), StateLeader},
  207. {newNetwork(nil, nopStepper, nopStepper), StateCandidate},
  208. {newNetwork(nil, nopStepper, nopStepper, nil), StateCandidate},
  209. {newNetwork(nil, nopStepper, nopStepper, nil, nil), StateLeader},
  210. // three logs further along than 0
  211. {newNetwork(nil, ents(1), ents(2), ents(1, 3), nil), StateFollower},
  212. // logs converge
  213. {newNetwork(ents(1), nil, ents(2), ents(1), nil), StateLeader},
  214. }
  215. for i, tt := range tests {
  216. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  217. sm := tt.network.peers[1].(*raft)
  218. if sm.state != tt.state {
  219. t.Errorf("#%d: state = %s, want %s", i, sm.state, tt.state)
  220. }
  221. if g := sm.Term; g != 1 {
  222. t.Errorf("#%d: term = %d, want %d", i, g, 1)
  223. }
  224. }
  225. }
  226. func TestLogReplication(t *testing.T) {
  227. tests := []struct {
  228. *network
  229. msgs []pb.Message
  230. wcommitted uint64
  231. }{
  232. {
  233. newNetwork(nil, nil, nil),
  234. []pb.Message{
  235. {From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}},
  236. },
  237. 2,
  238. },
  239. {
  240. newNetwork(nil, nil, nil),
  241. []pb.Message{
  242. {From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}},
  243. {From: 1, To: 2, Type: pb.MsgHup},
  244. {From: 1, To: 2, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}},
  245. },
  246. 4,
  247. },
  248. }
  249. for i, tt := range tests {
  250. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  251. for _, m := range tt.msgs {
  252. tt.send(m)
  253. }
  254. for j, x := range tt.network.peers {
  255. sm := x.(*raft)
  256. if sm.raftLog.committed != tt.wcommitted {
  257. t.Errorf("#%d.%d: committed = %d, want %d", i, j, sm.raftLog.committed, tt.wcommitted)
  258. }
  259. ents := []pb.Entry{}
  260. for _, e := range nextEnts(sm, tt.network.storage[j]) {
  261. if e.Data != nil {
  262. ents = append(ents, e)
  263. }
  264. }
  265. props := []pb.Message{}
  266. for _, m := range tt.msgs {
  267. if m.Type == pb.MsgProp {
  268. props = append(props, m)
  269. }
  270. }
  271. for k, m := range props {
  272. if !bytes.Equal(ents[k].Data, m.Entries[0].Data) {
  273. t.Errorf("#%d.%d: data = %d, want %d", i, j, ents[k].Data, m.Entries[0].Data)
  274. }
  275. }
  276. }
  277. }
  278. }
  279. func TestSingleNodeCommit(t *testing.T) {
  280. tt := newNetwork(nil)
  281. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  282. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  283. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  284. sm := tt.peers[1].(*raft)
  285. if sm.raftLog.committed != 3 {
  286. t.Errorf("committed = %d, want %d", sm.raftLog.committed, 3)
  287. }
  288. }
  289. // TestCannotCommitWithoutNewTermEntry tests the entries cannot be committed
  290. // when leader changes, no new proposal comes in and ChangeTerm proposal is
  291. // filtered.
  292. func TestCannotCommitWithoutNewTermEntry(t *testing.T) {
  293. tt := newNetwork(nil, nil, nil, nil, nil)
  294. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  295. // 0 cannot reach 2,3,4
  296. tt.cut(1, 3)
  297. tt.cut(1, 4)
  298. tt.cut(1, 5)
  299. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  300. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  301. sm := tt.peers[1].(*raft)
  302. if sm.raftLog.committed != 1 {
  303. t.Errorf("committed = %d, want %d", sm.raftLog.committed, 1)
  304. }
  305. // network recovery
  306. tt.recover()
  307. // avoid committing ChangeTerm proposal
  308. tt.ignore(pb.MsgApp)
  309. // elect 2 as the new leader with term 2
  310. tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup})
  311. // no log entries from previous term should be committed
  312. sm = tt.peers[2].(*raft)
  313. if sm.raftLog.committed != 1 {
  314. t.Errorf("committed = %d, want %d", sm.raftLog.committed, 1)
  315. }
  316. tt.recover()
  317. // send heartbeat; reset wait
  318. tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgBeat})
  319. // append an entry at current term
  320. tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  321. // expect the committed to be advanced
  322. if sm.raftLog.committed != 5 {
  323. t.Errorf("committed = %d, want %d", sm.raftLog.committed, 5)
  324. }
  325. }
  326. // TestCommitWithoutNewTermEntry tests the entries could be committed
  327. // when leader changes, no new proposal comes in.
  328. func TestCommitWithoutNewTermEntry(t *testing.T) {
  329. tt := newNetwork(nil, nil, nil, nil, nil)
  330. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  331. // 0 cannot reach 2,3,4
  332. tt.cut(1, 3)
  333. tt.cut(1, 4)
  334. tt.cut(1, 5)
  335. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  336. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  337. sm := tt.peers[1].(*raft)
  338. if sm.raftLog.committed != 1 {
  339. t.Errorf("committed = %d, want %d", sm.raftLog.committed, 1)
  340. }
  341. // network recovery
  342. tt.recover()
  343. // elect 1 as the new leader with term 2
  344. // after append a ChangeTerm entry from the current term, all entries
  345. // should be committed
  346. tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup})
  347. if sm.raftLog.committed != 4 {
  348. t.Errorf("committed = %d, want %d", sm.raftLog.committed, 4)
  349. }
  350. }
  351. func TestDuelingCandidates(t *testing.T) {
  352. a := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
  353. b := newRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
  354. c := newRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
  355. nt := newNetwork(a, b, c)
  356. nt.cut(1, 3)
  357. nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  358. nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
  359. nt.recover()
  360. nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
  361. wlog := &raftLog{
  362. storage: &MemoryStorage{ents: []pb.Entry{{}, pb.Entry{Data: nil, Term: 1, Index: 1}}},
  363. committed: 1,
  364. unstable: unstable{offset: 2},
  365. }
  366. tests := []struct {
  367. sm *raft
  368. state StateType
  369. term uint64
  370. raftLog *raftLog
  371. }{
  372. {a, StateFollower, 2, wlog},
  373. {b, StateFollower, 2, wlog},
  374. {c, StateFollower, 2, newLog(NewMemoryStorage())},
  375. }
  376. for i, tt := range tests {
  377. if g := tt.sm.state; g != tt.state {
  378. t.Errorf("#%d: state = %s, want %s", i, g, tt.state)
  379. }
  380. if g := tt.sm.Term; g != tt.term {
  381. t.Errorf("#%d: term = %d, want %d", i, g, tt.term)
  382. }
  383. base := ltoa(tt.raftLog)
  384. if sm, ok := nt.peers[1+uint64(i)].(*raft); ok {
  385. l := ltoa(sm.raftLog)
  386. if g := diffu(base, l); g != "" {
  387. t.Errorf("#%d: diff:\n%s", i, g)
  388. }
  389. } else {
  390. t.Logf("#%d: empty log", i)
  391. }
  392. }
  393. }
  394. func TestCandidateConcede(t *testing.T) {
  395. tt := newNetwork(nil, nil, nil)
  396. tt.isolate(1)
  397. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  398. tt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
  399. // heal the partition
  400. tt.recover()
  401. // send heartbeat; reset wait
  402. tt.send(pb.Message{From: 3, To: 3, Type: pb.MsgBeat})
  403. data := []byte("force follower")
  404. // send a proposal to 3 to flush out a MsgApp to 1
  405. tt.send(pb.Message{From: 3, To: 3, Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
  406. // send heartbeat; flush out commit
  407. tt.send(pb.Message{From: 3, To: 3, Type: pb.MsgBeat})
  408. a := tt.peers[1].(*raft)
  409. if g := a.state; g != StateFollower {
  410. t.Errorf("state = %s, want %s", g, StateFollower)
  411. }
  412. if g := a.Term; g != 1 {
  413. t.Errorf("term = %d, want %d", g, 1)
  414. }
  415. wantLog := ltoa(&raftLog{
  416. storage: &MemoryStorage{
  417. ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}},
  418. },
  419. unstable: unstable{offset: 3},
  420. committed: 2,
  421. })
  422. for i, p := range tt.peers {
  423. if sm, ok := p.(*raft); ok {
  424. l := ltoa(sm.raftLog)
  425. if g := diffu(wantLog, l); g != "" {
  426. t.Errorf("#%d: diff:\n%s", i, g)
  427. }
  428. } else {
  429. t.Logf("#%d: empty log", i)
  430. }
  431. }
  432. }
  433. func TestSingleNodeCandidate(t *testing.T) {
  434. tt := newNetwork(nil)
  435. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  436. sm := tt.peers[1].(*raft)
  437. if sm.state != StateLeader {
  438. t.Errorf("state = %d, want %d", sm.state, StateLeader)
  439. }
  440. }
  441. func TestOldMessages(t *testing.T) {
  442. tt := newNetwork(nil, nil, nil)
  443. // make 0 leader @ term 3
  444. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  445. tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup})
  446. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  447. // pretend we're an old leader trying to make progress; this entry is expected to be ignored.
  448. tt.send(pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 2, Entries: []pb.Entry{{Index: 3, Term: 2}}})
  449. // commit a new entry
  450. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
  451. l := &raftLog{
  452. storage: &MemoryStorage{
  453. ents: []pb.Entry{
  454. {}, {Data: nil, Term: 1, Index: 1},
  455. {Data: nil, Term: 2, Index: 2}, {Data: nil, Term: 3, Index: 3},
  456. {Data: []byte("somedata"), Term: 3, Index: 4},
  457. },
  458. },
  459. unstable: unstable{offset: 5},
  460. committed: 4,
  461. }
  462. base := ltoa(l)
  463. for i, p := range tt.peers {
  464. if sm, ok := p.(*raft); ok {
  465. l := ltoa(sm.raftLog)
  466. if g := diffu(base, l); g != "" {
  467. t.Errorf("#%d: diff:\n%s", i, g)
  468. }
  469. } else {
  470. t.Logf("#%d: empty log", i)
  471. }
  472. }
  473. }
  474. // TestOldMessagesReply - optimization - reply with new term.
  475. func TestProposal(t *testing.T) {
  476. tests := []struct {
  477. *network
  478. success bool
  479. }{
  480. {newNetwork(nil, nil, nil), true},
  481. {newNetwork(nil, nil, nopStepper), true},
  482. {newNetwork(nil, nopStepper, nopStepper), false},
  483. {newNetwork(nil, nopStepper, nopStepper, nil), false},
  484. {newNetwork(nil, nopStepper, nopStepper, nil, nil), true},
  485. }
  486. for i, tt := range tests {
  487. send := func(m pb.Message) {
  488. defer func() {
  489. // only recover is we expect it to panic so
  490. // panics we don't expect go up.
  491. if !tt.success {
  492. e := recover()
  493. if e != nil {
  494. t.Logf("#%d: err: %s", i, e)
  495. }
  496. }
  497. }()
  498. tt.send(m)
  499. }
  500. data := []byte("somedata")
  501. // promote 0 the leader
  502. send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  503. send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
  504. wantLog := newLog(NewMemoryStorage())
  505. if tt.success {
  506. wantLog = &raftLog{
  507. storage: &MemoryStorage{
  508. ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}},
  509. },
  510. unstable: unstable{offset: 3},
  511. committed: 2}
  512. }
  513. base := ltoa(wantLog)
  514. for i, p := range tt.peers {
  515. if sm, ok := p.(*raft); ok {
  516. l := ltoa(sm.raftLog)
  517. if g := diffu(base, l); g != "" {
  518. t.Errorf("#%d: diff:\n%s", i, g)
  519. }
  520. } else {
  521. t.Logf("#%d: empty log", i)
  522. }
  523. }
  524. sm := tt.network.peers[1].(*raft)
  525. if g := sm.Term; g != 1 {
  526. t.Errorf("#%d: term = %d, want %d", i, g, 1)
  527. }
  528. }
  529. }
  530. func TestProposalByProxy(t *testing.T) {
  531. data := []byte("somedata")
  532. tests := []*network{
  533. newNetwork(nil, nil, nil),
  534. newNetwork(nil, nil, nopStepper),
  535. }
  536. for i, tt := range tests {
  537. // promote 0 the leader
  538. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  539. // propose via follower
  540. tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
  541. wantLog := &raftLog{
  542. storage: &MemoryStorage{
  543. ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Data: data, Index: 2}},
  544. },
  545. unstable: unstable{offset: 3},
  546. committed: 2}
  547. base := ltoa(wantLog)
  548. for i, p := range tt.peers {
  549. if sm, ok := p.(*raft); ok {
  550. l := ltoa(sm.raftLog)
  551. if g := diffu(base, l); g != "" {
  552. t.Errorf("#%d: diff:\n%s", i, g)
  553. }
  554. } else {
  555. t.Logf("#%d: empty log", i)
  556. }
  557. }
  558. sm := tt.peers[1].(*raft)
  559. if g := sm.Term; g != 1 {
  560. t.Errorf("#%d: term = %d, want %d", i, g, 1)
  561. }
  562. }
  563. }
  564. func TestCommit(t *testing.T) {
  565. tests := []struct {
  566. matches []uint64
  567. logs []pb.Entry
  568. smTerm uint64
  569. w uint64
  570. }{
  571. // single
  572. {[]uint64{1}, []pb.Entry{{Index: 1, Term: 1}}, 1, 1},
  573. {[]uint64{1}, []pb.Entry{{Index: 1, Term: 1}}, 2, 0},
  574. {[]uint64{2}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}, 2, 2},
  575. {[]uint64{1}, []pb.Entry{{Index: 1, Term: 2}}, 2, 1},
  576. // odd
  577. {[]uint64{2, 1, 1}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}, 1, 1},
  578. {[]uint64{2, 1, 1}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}}, 2, 0},
  579. {[]uint64{2, 1, 2}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}, 2, 2},
  580. {[]uint64{2, 1, 2}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}}, 2, 0},
  581. // even
  582. {[]uint64{2, 1, 1, 1}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}, 1, 1},
  583. {[]uint64{2, 1, 1, 1}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}}, 2, 0},
  584. {[]uint64{2, 1, 1, 2}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}, 1, 1},
  585. {[]uint64{2, 1, 1, 2}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}}, 2, 0},
  586. {[]uint64{2, 1, 2, 2}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}, 2, 2},
  587. {[]uint64{2, 1, 2, 2}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}}, 2, 0},
  588. }
  589. for i, tt := range tests {
  590. storage := NewMemoryStorage()
  591. storage.Append(tt.logs)
  592. storage.hardState = pb.HardState{Term: tt.smTerm}
  593. sm := newRaft(1, []uint64{1}, 5, 1, storage)
  594. for j := 0; j < len(tt.matches); j++ {
  595. sm.setProgress(uint64(j)+1, tt.matches[j], tt.matches[j]+1)
  596. }
  597. sm.maybeCommit()
  598. if g := sm.raftLog.committed; g != tt.w {
  599. t.Errorf("#%d: committed = %d, want %d", i, g, tt.w)
  600. }
  601. }
  602. }
  603. func TestIsElectionTimeout(t *testing.T) {
  604. tests := []struct {
  605. elapse int
  606. wprobability float64
  607. round bool
  608. }{
  609. {5, 0, false},
  610. {13, 0.3, true},
  611. {15, 0.5, true},
  612. {18, 0.8, true},
  613. {20, 1, false},
  614. }
  615. for i, tt := range tests {
  616. sm := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
  617. sm.elapsed = tt.elapse
  618. c := 0
  619. for j := 0; j < 10000; j++ {
  620. if sm.isElectionTimeout() {
  621. c++
  622. }
  623. }
  624. got := float64(c) / 10000.0
  625. if tt.round {
  626. got = math.Floor(got*10+0.5) / 10.0
  627. }
  628. if got != tt.wprobability {
  629. t.Errorf("#%d: possibility = %v, want %v", i, got, tt.wprobability)
  630. }
  631. }
  632. }
  633. // ensure that the Step function ignores the message from old term and does not pass it to the
  634. // acutal stepX function.
  635. func TestStepIgnoreOldTermMsg(t *testing.T) {
  636. called := false
  637. fakeStep := func(r *raft, m pb.Message) {
  638. called = true
  639. }
  640. sm := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
  641. sm.step = fakeStep
  642. sm.Term = 2
  643. sm.Step(pb.Message{Type: pb.MsgApp, Term: sm.Term - 1})
  644. if called == true {
  645. t.Errorf("stepFunc called = %v , want %v", called, false)
  646. }
  647. }
  648. // TestHandleMsgApp ensures:
  649. // 1. Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm.
  650. // 2. If an existing entry conflicts with a new one (same index but different terms),
  651. // delete the existing entry and all that follow it; append any new entries not already in the log.
  652. // 3. If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry).
  653. func TestHandleMsgApp(t *testing.T) {
  654. tests := []struct {
  655. m pb.Message
  656. wIndex uint64
  657. wCommit uint64
  658. wReject bool
  659. }{
  660. // Ensure 1
  661. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 3, Index: 2, Commit: 3}, 2, 0, true}, // previous log mismatch
  662. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 3, Index: 3, Commit: 3}, 2, 0, true}, // previous log non-exist
  663. // Ensure 2
  664. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 1, Index: 1, Commit: 1}, 2, 1, false},
  665. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 0, Index: 0, Commit: 1, Entries: []pb.Entry{{Index: 1, Term: 2}}}, 1, 1, false},
  666. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 3, Entries: []pb.Entry{{Index: 3, Term: 2}, {Index: 4, Term: 2}}}, 4, 3, false},
  667. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 4, Entries: []pb.Entry{{Index: 3, Term: 2}}}, 3, 3, false},
  668. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 1, Index: 1, Commit: 4, Entries: []pb.Entry{{Index: 2, Term: 2}}}, 2, 2, false},
  669. // Ensure 3
  670. {pb.Message{Type: pb.MsgApp, Term: 1, LogTerm: 1, Index: 1, Commit: 3}, 2, 1, false}, // match entry 1, commit up to last new entry 1
  671. {pb.Message{Type: pb.MsgApp, Term: 1, LogTerm: 1, Index: 1, Commit: 3, Entries: []pb.Entry{{Index: 2, Term: 2}}}, 2, 2, false}, // match entry 1, commit up to last new entry 2
  672. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 3}, 2, 2, false}, // match entry 2, commit up to last new entry 2
  673. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 4}, 2, 2, false}, // commit up to log.last()
  674. }
  675. for i, tt := range tests {
  676. storage := NewMemoryStorage()
  677. storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}})
  678. sm := newRaft(1, []uint64{1}, 10, 1, storage)
  679. sm.becomeFollower(2, None)
  680. sm.handleAppendEntries(tt.m)
  681. if sm.raftLog.lastIndex() != tt.wIndex {
  682. t.Errorf("#%d: lastIndex = %d, want %d", i, sm.raftLog.lastIndex(), tt.wIndex)
  683. }
  684. if sm.raftLog.committed != tt.wCommit {
  685. t.Errorf("#%d: committed = %d, want %d", i, sm.raftLog.committed, tt.wCommit)
  686. }
  687. m := sm.readMessages()
  688. if len(m) != 1 {
  689. t.Fatalf("#%d: msg = nil, want 1", i)
  690. }
  691. if m[0].Reject != tt.wReject {
  692. t.Errorf("#%d: reject = %v, want %v", i, m[0].Reject, tt.wReject)
  693. }
  694. }
  695. }
  696. // TestHandleHeartbeat ensures that the follower commits to the commit in the message.
  697. func TestHandleHeartbeat(t *testing.T) {
  698. commit := uint64(2)
  699. tests := []struct {
  700. m pb.Message
  701. wCommit uint64
  702. }{
  703. {pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 2, Commit: commit + 1}, commit + 1},
  704. {pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 2, Commit: commit - 1}, commit}, // do not decrease commit
  705. }
  706. for i, tt := range tests {
  707. storage := NewMemoryStorage()
  708. storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}})
  709. sm := newRaft(1, []uint64{1, 2}, 5, 1, storage)
  710. sm.becomeFollower(2, 2)
  711. sm.raftLog.commitTo(commit)
  712. sm.handleHeartbeat(tt.m)
  713. if sm.raftLog.committed != tt.wCommit {
  714. t.Errorf("#%d: committed = %d, want %d", i, sm.raftLog.committed, tt.wCommit)
  715. }
  716. m := sm.readMessages()
  717. if len(m) != 1 {
  718. t.Fatalf("#%d: msg = nil, want 1", i)
  719. }
  720. if m[0].Type != pb.MsgHeartbeatResp {
  721. t.Errorf("#%d: type = %v, want MsgHeartbeatResp", i, m[0].Type)
  722. }
  723. }
  724. }
  725. // TestHandleHeartbeatResp ensures that we re-send log entries when we get a heartbeat response.
  726. func TestHandleHeartbeatResp(t *testing.T) {
  727. storage := NewMemoryStorage()
  728. storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}})
  729. sm := newRaft(1, []uint64{1, 2}, 5, 1, storage)
  730. sm.becomeCandidate()
  731. sm.becomeLeader()
  732. sm.raftLog.commitTo(sm.raftLog.lastIndex())
  733. // A heartbeat response from a node that is behind; re-send MsgApp
  734. sm.Step(pb.Message{From: 2, Type: pb.MsgHeartbeatResp})
  735. msgs := sm.readMessages()
  736. if len(msgs) != 1 {
  737. t.Fatalf("len(msgs) = %d, want 1", len(msgs))
  738. }
  739. if msgs[0].Type != pb.MsgApp {
  740. t.Errorf("type = %v, want MsgApp", msgs[0].Type)
  741. }
  742. // A second heartbeat response with no AppResp does not re-send because we are in the wait state.
  743. sm.Step(pb.Message{From: 2, Type: pb.MsgHeartbeatResp})
  744. msgs = sm.readMessages()
  745. if len(msgs) != 0 {
  746. t.Fatalf("len(msgs) = %d, want 0", len(msgs))
  747. }
  748. // Send a heartbeat to reset the wait state; next heartbeat will re-send MsgApp.
  749. sm.bcastHeartbeat()
  750. sm.Step(pb.Message{From: 2, Type: pb.MsgHeartbeatResp})
  751. msgs = sm.readMessages()
  752. if len(msgs) != 2 {
  753. t.Fatalf("len(msgs) = %d, want 2", len(msgs))
  754. }
  755. if msgs[0].Type != pb.MsgHeartbeat {
  756. t.Errorf("type = %v, want MsgHeartbeat", msgs[0].Type)
  757. }
  758. if msgs[1].Type != pb.MsgApp {
  759. t.Errorf("type = %v, want MsgApp", msgs[1].Type)
  760. }
  761. // Once we have an MsgAppResp, heartbeats no longer send MsgApp.
  762. sm.Step(pb.Message{
  763. From: 2,
  764. Type: pb.MsgAppResp,
  765. Index: msgs[1].Index + uint64(len(msgs[1].Entries)),
  766. })
  767. sm.bcastHeartbeat() // reset wait state
  768. sm.Step(pb.Message{From: 2, Type: pb.MsgHeartbeatResp})
  769. msgs = sm.readMessages()
  770. if len(msgs) != 1 {
  771. t.Fatalf("len(msgs) = %d, want 1", len(msgs))
  772. }
  773. if msgs[0].Type != pb.MsgHeartbeat {
  774. t.Errorf("type = %v, want MsgHeartbeat", msgs[0].Type)
  775. }
  776. }
  777. func TestRecvMsgVote(t *testing.T) {
  778. tests := []struct {
  779. state StateType
  780. i, term uint64
  781. voteFor uint64
  782. wreject bool
  783. }{
  784. {StateFollower, 0, 0, None, true},
  785. {StateFollower, 0, 1, None, true},
  786. {StateFollower, 0, 2, None, true},
  787. {StateFollower, 0, 3, None, false},
  788. {StateFollower, 1, 0, None, true},
  789. {StateFollower, 1, 1, None, true},
  790. {StateFollower, 1, 2, None, true},
  791. {StateFollower, 1, 3, None, false},
  792. {StateFollower, 2, 0, None, true},
  793. {StateFollower, 2, 1, None, true},
  794. {StateFollower, 2, 2, None, false},
  795. {StateFollower, 2, 3, None, false},
  796. {StateFollower, 3, 0, None, true},
  797. {StateFollower, 3, 1, None, true},
  798. {StateFollower, 3, 2, None, false},
  799. {StateFollower, 3, 3, None, false},
  800. {StateFollower, 3, 2, 2, false},
  801. {StateFollower, 3, 2, 1, true},
  802. {StateLeader, 3, 3, 1, true},
  803. {StateCandidate, 3, 3, 1, true},
  804. }
  805. for i, tt := range tests {
  806. sm := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
  807. sm.state = tt.state
  808. switch tt.state {
  809. case StateFollower:
  810. sm.step = stepFollower
  811. case StateCandidate:
  812. sm.step = stepCandidate
  813. case StateLeader:
  814. sm.step = stepLeader
  815. }
  816. sm.HardState = pb.HardState{Vote: tt.voteFor}
  817. sm.raftLog = &raftLog{
  818. storage: &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 2}, {Index: 2, Term: 2}}},
  819. unstable: unstable{offset: 3},
  820. }
  821. sm.Step(pb.Message{Type: pb.MsgVote, From: 2, Index: tt.i, LogTerm: tt.term})
  822. msgs := sm.readMessages()
  823. if g := len(msgs); g != 1 {
  824. t.Fatalf("#%d: len(msgs) = %d, want 1", i, g)
  825. continue
  826. }
  827. if g := msgs[0].Reject; g != tt.wreject {
  828. t.Errorf("#%d, m.Reject = %v, want %v", i, g, tt.wreject)
  829. }
  830. }
  831. }
  832. func TestStateTransition(t *testing.T) {
  833. tests := []struct {
  834. from StateType
  835. to StateType
  836. wallow bool
  837. wterm uint64
  838. wlead uint64
  839. }{
  840. {StateFollower, StateFollower, true, 1, None},
  841. {StateFollower, StateCandidate, true, 1, None},
  842. {StateFollower, StateLeader, false, 0, None},
  843. {StateCandidate, StateFollower, true, 0, None},
  844. {StateCandidate, StateCandidate, true, 1, None},
  845. {StateCandidate, StateLeader, true, 0, 1},
  846. {StateLeader, StateFollower, true, 1, None},
  847. {StateLeader, StateCandidate, false, 1, None},
  848. {StateLeader, StateLeader, true, 0, 1},
  849. }
  850. for i, tt := range tests {
  851. func() {
  852. defer func() {
  853. if r := recover(); r != nil {
  854. if tt.wallow == true {
  855. t.Errorf("%d: allow = %v, want %v", i, false, true)
  856. }
  857. }
  858. }()
  859. sm := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
  860. sm.state = tt.from
  861. switch tt.to {
  862. case StateFollower:
  863. sm.becomeFollower(tt.wterm, tt.wlead)
  864. case StateCandidate:
  865. sm.becomeCandidate()
  866. case StateLeader:
  867. sm.becomeLeader()
  868. }
  869. if sm.Term != tt.wterm {
  870. t.Errorf("%d: term = %d, want %d", i, sm.Term, tt.wterm)
  871. }
  872. if sm.lead != tt.wlead {
  873. t.Errorf("%d: lead = %d, want %d", i, sm.lead, tt.wlead)
  874. }
  875. }()
  876. }
  877. }
  878. func TestAllServerStepdown(t *testing.T) {
  879. tests := []struct {
  880. state StateType
  881. wstate StateType
  882. wterm uint64
  883. windex uint64
  884. }{
  885. {StateFollower, StateFollower, 3, 0},
  886. {StateCandidate, StateFollower, 3, 0},
  887. {StateLeader, StateFollower, 3, 1},
  888. }
  889. tmsgTypes := [...]pb.MessageType{pb.MsgVote, pb.MsgApp}
  890. tterm := uint64(3)
  891. for i, tt := range tests {
  892. sm := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
  893. switch tt.state {
  894. case StateFollower:
  895. sm.becomeFollower(1, None)
  896. case StateCandidate:
  897. sm.becomeCandidate()
  898. case StateLeader:
  899. sm.becomeCandidate()
  900. sm.becomeLeader()
  901. }
  902. for j, msgType := range tmsgTypes {
  903. sm.Step(pb.Message{From: 2, Type: msgType, Term: tterm, LogTerm: tterm})
  904. if sm.state != tt.wstate {
  905. t.Errorf("#%d.%d state = %v , want %v", i, j, sm.state, tt.wstate)
  906. }
  907. if sm.Term != tt.wterm {
  908. t.Errorf("#%d.%d term = %v , want %v", i, j, sm.Term, tt.wterm)
  909. }
  910. if uint64(sm.raftLog.lastIndex()) != tt.windex {
  911. t.Errorf("#%d.%d index = %v , want %v", i, j, sm.raftLog.lastIndex(), tt.windex)
  912. }
  913. if uint64(len(sm.raftLog.allEntries())) != tt.windex {
  914. t.Errorf("#%d.%d len(ents) = %v , want %v", i, j, len(sm.raftLog.allEntries()), tt.windex)
  915. }
  916. wlead := uint64(2)
  917. if msgType == pb.MsgVote {
  918. wlead = None
  919. }
  920. if sm.lead != wlead {
  921. t.Errorf("#%d, sm.lead = %d, want %d", i, sm.lead, None)
  922. }
  923. }
  924. }
  925. }
  926. func TestLeaderAppResp(t *testing.T) {
  927. // initial progress: match = 0; next = 3
  928. tests := []struct {
  929. index uint64
  930. reject bool
  931. // progress
  932. wmatch uint64
  933. wnext uint64
  934. // message
  935. wmsgNum int
  936. windex uint64
  937. wcommitted uint64
  938. }{
  939. {3, true, 0, 3, 0, 0, 0}, // stale resp; no replies
  940. {2, true, 0, 2, 1, 1, 0}, // denied resp; leader does not commit; decrease next and send probing msg
  941. {2, false, 2, 4, 2, 2, 2}, // accept resp; leader commits; broadcast with commit index
  942. {0, false, 0, 3, 0, 0, 0}, // ignore heartbeat replies
  943. }
  944. for i, tt := range tests {
  945. // sm term is 1 after it becomes the leader.
  946. // thus the last log term must be 1 to be committed.
  947. sm := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
  948. sm.raftLog = &raftLog{
  949. storage: &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 0}, {Index: 2, Term: 1}}},
  950. unstable: unstable{offset: 3},
  951. }
  952. sm.becomeCandidate()
  953. sm.becomeLeader()
  954. sm.readMessages()
  955. sm.Step(pb.Message{From: 2, Type: pb.MsgAppResp, Index: tt.index, Term: sm.Term, Reject: tt.reject, RejectHint: tt.index})
  956. p := sm.prs[2]
  957. if p.Match != tt.wmatch {
  958. t.Errorf("#%d match = %d, want %d", i, p.Match, tt.wmatch)
  959. }
  960. if p.Next != tt.wnext {
  961. t.Errorf("#%d next = %d, want %d", i, p.Next, tt.wnext)
  962. }
  963. msgs := sm.readMessages()
  964. if len(msgs) != tt.wmsgNum {
  965. t.Errorf("#%d msgNum = %d, want %d", i, len(msgs), tt.wmsgNum)
  966. }
  967. for j, msg := range msgs {
  968. if msg.Index != tt.windex {
  969. t.Errorf("#%d.%d index = %d, want %d", i, j, msg.Index, tt.windex)
  970. }
  971. if msg.Commit != tt.wcommitted {
  972. t.Errorf("#%d.%d commit = %d, want %d", i, j, msg.Commit, tt.wcommitted)
  973. }
  974. }
  975. }
  976. }
  977. // When the leader receives a heartbeat tick, it should
  978. // send a MsgApp with m.Index = 0, m.LogTerm=0 and empty entries.
  979. func TestBcastBeat(t *testing.T) {
  980. offset := uint64(1000)
  981. // make a state machine with log.offset = 1000
  982. s := pb.Snapshot{
  983. Metadata: pb.SnapshotMetadata{
  984. Index: offset,
  985. Term: 1,
  986. ConfState: pb.ConfState{Nodes: []uint64{1, 2, 3}},
  987. },
  988. }
  989. storage := NewMemoryStorage()
  990. storage.ApplySnapshot(s)
  991. sm := newRaft(1, nil, 10, 1, storage)
  992. sm.Term = 1
  993. sm.becomeCandidate()
  994. sm.becomeLeader()
  995. for i := 0; i < 10; i++ {
  996. sm.appendEntry(pb.Entry{Index: uint64(i) + 1})
  997. }
  998. // slow follower
  999. sm.prs[2].Match, sm.prs[2].Next = 5, 6
  1000. // normal follower
  1001. sm.prs[3].Match, sm.prs[3].Next = sm.raftLog.lastIndex(), sm.raftLog.lastIndex()+1
  1002. sm.Step(pb.Message{Type: pb.MsgBeat})
  1003. msgs := sm.readMessages()
  1004. if len(msgs) != 2 {
  1005. t.Fatalf("len(msgs) = %v, want 2", len(msgs))
  1006. }
  1007. wantCommitMap := map[uint64]uint64{
  1008. 2: min(sm.raftLog.committed, sm.prs[2].Match),
  1009. 3: min(sm.raftLog.committed, sm.prs[3].Match),
  1010. }
  1011. for i, m := range msgs {
  1012. if m.Type != pb.MsgHeartbeat {
  1013. t.Fatalf("#%d: type = %v, want = %v", i, m.Type, pb.MsgHeartbeat)
  1014. }
  1015. if m.Index != 0 {
  1016. t.Fatalf("#%d: prevIndex = %d, want %d", i, m.Index, 0)
  1017. }
  1018. if m.LogTerm != 0 {
  1019. t.Fatalf("#%d: prevTerm = %d, want %d", i, m.LogTerm, 0)
  1020. }
  1021. if wantCommitMap[m.To] == 0 {
  1022. t.Fatalf("#%d: unexpected to %d", i, m.To)
  1023. } else {
  1024. if m.Commit != wantCommitMap[m.To] {
  1025. t.Fatalf("#%d: commit = %d, want %d", i, m.Commit, wantCommitMap[m.To])
  1026. }
  1027. delete(wantCommitMap, m.To)
  1028. }
  1029. if len(m.Entries) != 0 {
  1030. t.Fatalf("#%d: len(entries) = %d, want 0", i, len(m.Entries))
  1031. }
  1032. }
  1033. }
  1034. // tests the output of the statemachine when receiving MsgBeat
  1035. func TestRecvMsgBeat(t *testing.T) {
  1036. tests := []struct {
  1037. state StateType
  1038. wMsg int
  1039. }{
  1040. {StateLeader, 2},
  1041. // candidate and follower should ignore MsgBeat
  1042. {StateCandidate, 0},
  1043. {StateFollower, 0},
  1044. }
  1045. for i, tt := range tests {
  1046. sm := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
  1047. sm.raftLog = &raftLog{storage: &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 0}, {Index: 2, Term: 1}}}}
  1048. sm.Term = 1
  1049. sm.state = tt.state
  1050. switch tt.state {
  1051. case StateFollower:
  1052. sm.step = stepFollower
  1053. case StateCandidate:
  1054. sm.step = stepCandidate
  1055. case StateLeader:
  1056. sm.step = stepLeader
  1057. }
  1058. sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
  1059. msgs := sm.readMessages()
  1060. if len(msgs) != tt.wMsg {
  1061. t.Errorf("%d: len(msgs) = %d, want %d", i, len(msgs), tt.wMsg)
  1062. }
  1063. for _, m := range msgs {
  1064. if m.Type != pb.MsgHeartbeat {
  1065. t.Errorf("%d: msg.type = %v, want %v", i, m.Type, pb.MsgHeartbeat)
  1066. }
  1067. }
  1068. }
  1069. }
  1070. func TestLeaderIncreaseNext(t *testing.T) {
  1071. previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3}}
  1072. tests := []struct {
  1073. // progress
  1074. match uint64
  1075. next uint64
  1076. wnext uint64
  1077. }{
  1078. // match is not zero, optimistically increase next
  1079. // previous entries + noop entry + propose + 1
  1080. {1, 2, uint64(len(previousEnts) + 1 + 1 + 1)},
  1081. // match is zero, not optimistically increase next
  1082. {0, 2, 2},
  1083. }
  1084. for i, tt := range tests {
  1085. sm := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
  1086. sm.raftLog.append(previousEnts...)
  1087. sm.becomeCandidate()
  1088. sm.becomeLeader()
  1089. sm.prs[2].Match, sm.prs[2].Next = tt.match, tt.next
  1090. sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
  1091. p := sm.prs[2]
  1092. if p.Next != tt.wnext {
  1093. t.Errorf("#%d next = %d, want %d", i, p.Next, tt.wnext)
  1094. }
  1095. }
  1096. }
  1097. func TestRestore(t *testing.T) {
  1098. s := pb.Snapshot{
  1099. Metadata: pb.SnapshotMetadata{
  1100. Index: 11, // magic number
  1101. Term: 11, // magic number
  1102. ConfState: pb.ConfState{Nodes: []uint64{1, 2, 3}},
  1103. },
  1104. }
  1105. storage := NewMemoryStorage()
  1106. sm := newRaft(1, []uint64{1, 2}, 10, 1, storage)
  1107. if ok := sm.restore(s); !ok {
  1108. t.Fatal("restore fail, want succeed")
  1109. }
  1110. if sm.raftLog.lastIndex() != s.Metadata.Index {
  1111. t.Errorf("log.lastIndex = %d, want %d", sm.raftLog.lastIndex(), s.Metadata.Index)
  1112. }
  1113. if sm.raftLog.term(s.Metadata.Index) != s.Metadata.Term {
  1114. t.Errorf("log.lastTerm = %d, want %d", sm.raftLog.term(s.Metadata.Index), s.Metadata.Term)
  1115. }
  1116. sg := sm.nodes()
  1117. if !reflect.DeepEqual(sg, s.Metadata.ConfState.Nodes) {
  1118. t.Errorf("sm.Nodes = %+v, want %+v", sg, s.Metadata.ConfState.Nodes)
  1119. }
  1120. if ok := sm.restore(s); ok {
  1121. t.Fatal("restore succeed, want fail")
  1122. }
  1123. }
  1124. func TestRestoreIgnoreSnapshot(t *testing.T) {
  1125. previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3}}
  1126. commit := uint64(1)
  1127. storage := NewMemoryStorage()
  1128. sm := newRaft(1, []uint64{1, 2}, 10, 1, storage)
  1129. sm.raftLog.append(previousEnts...)
  1130. sm.raftLog.commitTo(commit)
  1131. s := pb.Snapshot{
  1132. Metadata: pb.SnapshotMetadata{
  1133. Index: commit,
  1134. Term: 1,
  1135. ConfState: pb.ConfState{Nodes: []uint64{1, 2}},
  1136. },
  1137. }
  1138. // ignore snapshot
  1139. if ok := sm.restore(s); ok {
  1140. t.Errorf("restore = %t, want %t", ok, false)
  1141. }
  1142. if sm.raftLog.committed != commit {
  1143. t.Errorf("commit = %d, want %d", sm.raftLog.committed, commit)
  1144. }
  1145. // ignore snapshot and fast forward commit
  1146. s.Metadata.Index = commit + 1
  1147. if ok := sm.restore(s); ok {
  1148. t.Errorf("restore = %t, want %t", ok, false)
  1149. }
  1150. if sm.raftLog.committed != commit+1 {
  1151. t.Errorf("commit = %d, want %d", sm.raftLog.committed, commit+1)
  1152. }
  1153. }
  1154. func TestProvideSnap(t *testing.T) {
  1155. // restore the statemachin from a snapshot
  1156. // so it has a compacted log and a snapshot
  1157. s := pb.Snapshot{
  1158. Metadata: pb.SnapshotMetadata{
  1159. Index: 11, // magic number
  1160. Term: 11, // magic number
  1161. ConfState: pb.ConfState{Nodes: []uint64{1, 2}},
  1162. },
  1163. }
  1164. storage := NewMemoryStorage()
  1165. sm := newRaft(1, []uint64{1}, 10, 1, storage)
  1166. sm.restore(s)
  1167. sm.becomeCandidate()
  1168. sm.becomeLeader()
  1169. // force set the next of node 1, so that
  1170. // node 1 needs a snapshot
  1171. sm.prs[2].Next = sm.raftLog.firstIndex()
  1172. sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs[2].Next - 1, Reject: true})
  1173. msgs := sm.readMessages()
  1174. if len(msgs) != 1 {
  1175. t.Fatalf("len(msgs) = %d, want 1", len(msgs))
  1176. }
  1177. m := msgs[0]
  1178. if m.Type != pb.MsgSnap {
  1179. t.Errorf("m.Type = %v, want %v", m.Type, pb.MsgSnap)
  1180. }
  1181. }
  1182. func TestRestoreFromSnapMsg(t *testing.T) {
  1183. s := pb.Snapshot{
  1184. Metadata: pb.SnapshotMetadata{
  1185. Index: 11, // magic number
  1186. Term: 11, // magic number
  1187. ConfState: pb.ConfState{Nodes: []uint64{1, 2}},
  1188. },
  1189. }
  1190. m := pb.Message{Type: pb.MsgSnap, From: 1, Term: 2, Snapshot: s}
  1191. sm := newRaft(2, []uint64{1, 2}, 10, 1, NewMemoryStorage())
  1192. sm.Step(m)
  1193. // TODO(bdarnell): what should this test?
  1194. }
  1195. func TestSlowNodeRestore(t *testing.T) {
  1196. nt := newNetwork(nil, nil, nil)
  1197. nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  1198. nt.isolate(3)
  1199. for j := 0; j <= 100; j++ {
  1200. nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
  1201. }
  1202. lead := nt.peers[1].(*raft)
  1203. nextEnts(lead, nt.storage[1])
  1204. nt.storage[1].Compact(lead.raftLog.applied, &pb.ConfState{Nodes: lead.nodes()}, nil)
  1205. nt.recover()
  1206. // trigger a snapshot
  1207. nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
  1208. follower := nt.peers[3].(*raft)
  1209. // trigger a commit
  1210. nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
  1211. if follower.raftLog.committed != lead.raftLog.committed {
  1212. t.Errorf("follower.committed = %d, want %d", follower.raftLog.committed, lead.raftLog.committed)
  1213. }
  1214. }
  1215. // TestStepConfig tests that when raft step msgProp in EntryConfChange type,
  1216. // it appends the entry to log and sets pendingConf to be true.
  1217. func TestStepConfig(t *testing.T) {
  1218. // a raft that cannot make progress
  1219. r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
  1220. r.becomeCandidate()
  1221. r.becomeLeader()
  1222. index := r.raftLog.lastIndex()
  1223. r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
  1224. if g := r.raftLog.lastIndex(); g != index+1 {
  1225. t.Errorf("index = %d, want %d", g, index+1)
  1226. }
  1227. if r.pendingConf != true {
  1228. t.Errorf("pendingConf = %v, want true", r.pendingConf)
  1229. }
  1230. }
  1231. // TestStepIgnoreConfig tests that if raft step the second msgProp in
  1232. // EntryConfChange type when the first one is uncommitted, the node will set
  1233. // the proposal to noop and keep its original state.
  1234. func TestStepIgnoreConfig(t *testing.T) {
  1235. // a raft that cannot make progress
  1236. r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
  1237. r.becomeCandidate()
  1238. r.becomeLeader()
  1239. r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
  1240. index := r.raftLog.lastIndex()
  1241. pendingConf := r.pendingConf
  1242. r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
  1243. wents := []pb.Entry{{Type: pb.EntryNormal, Term: 1, Index: 3, Data: nil}}
  1244. if ents := r.raftLog.entries(index + 1); !reflect.DeepEqual(ents, wents) {
  1245. t.Errorf("ents = %+v, want %+v", ents, wents)
  1246. }
  1247. if r.pendingConf != pendingConf {
  1248. t.Errorf("pendingConf = %v, want %v", r.pendingConf, pendingConf)
  1249. }
  1250. }
  1251. // TestRecoverPendingConfig tests that new leader recovers its pendingConf flag
  1252. // based on uncommitted entries.
  1253. func TestRecoverPendingConfig(t *testing.T) {
  1254. tests := []struct {
  1255. entType pb.EntryType
  1256. wpending bool
  1257. }{
  1258. {pb.EntryNormal, false},
  1259. {pb.EntryConfChange, true},
  1260. }
  1261. for i, tt := range tests {
  1262. r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
  1263. r.appendEntry(pb.Entry{Type: tt.entType})
  1264. r.becomeCandidate()
  1265. r.becomeLeader()
  1266. if r.pendingConf != tt.wpending {
  1267. t.Errorf("#%d: pendingConf = %v, want %v", i, r.pendingConf, tt.wpending)
  1268. }
  1269. }
  1270. }
  1271. // TestRecoverDoublePendingConfig tests that new leader will panic if
  1272. // there exist two uncommitted config entries.
  1273. func TestRecoverDoublePendingConfig(t *testing.T) {
  1274. func() {
  1275. defer func() {
  1276. if err := recover(); err == nil {
  1277. t.Errorf("expect panic, but nothing happens")
  1278. }
  1279. }()
  1280. r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
  1281. r.appendEntry(pb.Entry{Type: pb.EntryConfChange})
  1282. r.appendEntry(pb.Entry{Type: pb.EntryConfChange})
  1283. r.becomeCandidate()
  1284. r.becomeLeader()
  1285. }()
  1286. }
  1287. // TestAddNode tests that addNode could update pendingConf and nodes correctly.
  1288. func TestAddNode(t *testing.T) {
  1289. r := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
  1290. r.pendingConf = true
  1291. r.addNode(2)
  1292. if r.pendingConf != false {
  1293. t.Errorf("pendingConf = %v, want false", r.pendingConf)
  1294. }
  1295. nodes := r.nodes()
  1296. wnodes := []uint64{1, 2}
  1297. if !reflect.DeepEqual(nodes, wnodes) {
  1298. t.Errorf("nodes = %v, want %v", nodes, wnodes)
  1299. }
  1300. }
  1301. // TestRemoveNode tests that removeNode could update pendingConf, nodes and
  1302. // and removed list correctly.
  1303. func TestRemoveNode(t *testing.T) {
  1304. r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
  1305. r.pendingConf = true
  1306. r.removeNode(2)
  1307. if r.pendingConf != false {
  1308. t.Errorf("pendingConf = %v, want false", r.pendingConf)
  1309. }
  1310. w := []uint64{1}
  1311. if g := r.nodes(); !reflect.DeepEqual(g, w) {
  1312. t.Errorf("nodes = %v, want %v", g, w)
  1313. }
  1314. }
  1315. func TestPromotable(t *testing.T) {
  1316. id := uint64(1)
  1317. tests := []struct {
  1318. peers []uint64
  1319. wp bool
  1320. }{
  1321. {[]uint64{1}, true},
  1322. {[]uint64{1, 2, 3}, true},
  1323. {[]uint64{}, false},
  1324. {[]uint64{2, 3}, false},
  1325. }
  1326. for i, tt := range tests {
  1327. r := newRaft(id, tt.peers, 5, 1, NewMemoryStorage())
  1328. if g := r.promotable(); g != tt.wp {
  1329. t.Errorf("#%d: promotable = %v, want %v", i, g, tt.wp)
  1330. }
  1331. }
  1332. }
  1333. func TestRaftNodes(t *testing.T) {
  1334. tests := []struct {
  1335. ids []uint64
  1336. wids []uint64
  1337. }{
  1338. {
  1339. []uint64{1, 2, 3},
  1340. []uint64{1, 2, 3},
  1341. },
  1342. {
  1343. []uint64{3, 2, 1},
  1344. []uint64{1, 2, 3},
  1345. },
  1346. }
  1347. for i, tt := range tests {
  1348. r := newRaft(1, tt.ids, 10, 1, NewMemoryStorage())
  1349. if !reflect.DeepEqual(r.nodes(), tt.wids) {
  1350. t.Errorf("#%d: nodes = %+v, want %+v", i, r.nodes(), tt.wids)
  1351. }
  1352. }
  1353. }
  1354. func ents(terms ...uint64) *raft {
  1355. storage := NewMemoryStorage()
  1356. for i, term := range terms {
  1357. storage.Append([]pb.Entry{{Index: uint64(i + 1), Term: term}})
  1358. }
  1359. sm := newRaft(1, []uint64{}, 5, 1, storage)
  1360. sm.reset(0)
  1361. return sm
  1362. }
  1363. type network struct {
  1364. peers map[uint64]Interface
  1365. storage map[uint64]*MemoryStorage
  1366. dropm map[connem]float64
  1367. ignorem map[pb.MessageType]bool
  1368. }
  1369. // newNetwork initializes a network from peers.
  1370. // A nil node will be replaced with a new *stateMachine.
  1371. // A *stateMachine will get its k, id.
  1372. // When using stateMachine, the address list is always [1, n].
  1373. func newNetwork(peers ...Interface) *network {
  1374. size := len(peers)
  1375. peerAddrs := idsBySize(size)
  1376. npeers := make(map[uint64]Interface, size)
  1377. nstorage := make(map[uint64]*MemoryStorage, size)
  1378. for i, p := range peers {
  1379. id := peerAddrs[i]
  1380. switch v := p.(type) {
  1381. case nil:
  1382. nstorage[id] = NewMemoryStorage()
  1383. sm := newRaft(id, peerAddrs, 10, 1, nstorage[id])
  1384. npeers[id] = sm
  1385. case *raft:
  1386. v.id = id
  1387. v.prs = make(map[uint64]*Progress)
  1388. for i := 0; i < size; i++ {
  1389. v.prs[peerAddrs[i]] = &Progress{}
  1390. }
  1391. v.reset(0)
  1392. npeers[id] = v
  1393. case *blackHole:
  1394. npeers[id] = v
  1395. default:
  1396. panic(fmt.Sprintf("unexpected state machine type: %T", p))
  1397. }
  1398. }
  1399. return &network{
  1400. peers: npeers,
  1401. storage: nstorage,
  1402. dropm: make(map[connem]float64),
  1403. ignorem: make(map[pb.MessageType]bool),
  1404. }
  1405. }
  1406. func (nw *network) send(msgs ...pb.Message) {
  1407. for len(msgs) > 0 {
  1408. m := msgs[0]
  1409. p := nw.peers[m.To]
  1410. p.Step(m)
  1411. msgs = append(msgs[1:], nw.filter(p.readMessages())...)
  1412. }
  1413. }
  1414. func (nw *network) drop(from, to uint64, perc float64) {
  1415. nw.dropm[connem{from, to}] = perc
  1416. }
  1417. func (nw *network) cut(one, other uint64) {
  1418. nw.drop(one, other, 1)
  1419. nw.drop(other, one, 1)
  1420. }
  1421. func (nw *network) isolate(id uint64) {
  1422. for i := 0; i < len(nw.peers); i++ {
  1423. nid := uint64(i) + 1
  1424. if nid != id {
  1425. nw.drop(id, nid, 1.0)
  1426. nw.drop(nid, id, 1.0)
  1427. }
  1428. }
  1429. }
  1430. func (nw *network) ignore(t pb.MessageType) {
  1431. nw.ignorem[t] = true
  1432. }
  1433. func (nw *network) recover() {
  1434. nw.dropm = make(map[connem]float64)
  1435. nw.ignorem = make(map[pb.MessageType]bool)
  1436. }
  1437. func (nw *network) filter(msgs []pb.Message) []pb.Message {
  1438. mm := []pb.Message{}
  1439. for _, m := range msgs {
  1440. if nw.ignorem[m.Type] {
  1441. continue
  1442. }
  1443. switch m.Type {
  1444. case pb.MsgHup:
  1445. // hups never go over the network, so don't drop them but panic
  1446. panic("unexpected msgHup")
  1447. default:
  1448. perc := nw.dropm[connem{m.From, m.To}]
  1449. if n := rand.Float64(); n < perc {
  1450. continue
  1451. }
  1452. }
  1453. mm = append(mm, m)
  1454. }
  1455. return mm
  1456. }
  1457. type connem struct {
  1458. from, to uint64
  1459. }
  1460. type blackHole struct{}
  1461. func (blackHole) Step(pb.Message) error { return nil }
  1462. func (blackHole) readMessages() []pb.Message { return nil }
  1463. var nopStepper = &blackHole{}
  1464. func idsBySize(size int) []uint64 {
  1465. ids := make([]uint64, size)
  1466. for i := 0; i < size; i++ {
  1467. ids[i] = 1 + uint64(i)
  1468. }
  1469. return ids
  1470. }