raft_test.go 40 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package raft
  14. import (
  15. "bytes"
  16. "fmt"
  17. "math"
  18. "math/rand"
  19. "reflect"
  20. "testing"
  21. pb "github.com/coreos/etcd/raft/raftpb"
  22. )
  23. // nextEnts returns the appliable entries and updates the applied index
  24. func nextEnts(r *raft, s *MemoryStorage) (ents []pb.Entry) {
  25. // Transfer all unstable entries to "stable" storage.
  26. s.Append(r.raftLog.unstableEntries())
  27. r.raftLog.stableTo(r.raftLog.lastIndex(), r.raftLog.lastTerm())
  28. ents = r.raftLog.nextEnts()
  29. r.raftLog.appliedTo(r.raftLog.committed)
  30. return ents
  31. }
  32. type Interface interface {
  33. Step(m pb.Message) error
  34. readMessages() []pb.Message
  35. }
  36. func (r *raft) readMessages() []pb.Message {
  37. msgs := r.msgs
  38. r.msgs = make([]pb.Message, 0)
  39. return msgs
  40. }
  41. func TestProgressUpdate(t *testing.T) {
  42. prevM, prevN := uint64(3), uint64(5)
  43. tests := []struct {
  44. update uint64
  45. wm uint64
  46. wn uint64
  47. }{
  48. {prevM - 1, prevM, prevN}, // do not decrease match, next
  49. {prevM, prevM, prevN}, // do not decrease next
  50. {prevM + 1, prevM + 1, prevN}, // increase match, do not decrease next
  51. {prevM + 2, prevM + 2, prevN + 1}, // increase match, next
  52. }
  53. for i, tt := range tests {
  54. p := &progress{
  55. match: prevM,
  56. next: prevN,
  57. }
  58. p.update(tt.update)
  59. if p.match != tt.wm {
  60. t.Errorf("#%d: match=%d, want %d", i, p.match, tt.wm)
  61. }
  62. if p.next != tt.wn {
  63. t.Errorf("#%d: next=%d, want %d", i, p.next, tt.wn)
  64. }
  65. }
  66. }
  67. func TestProgressMaybeDecr(t *testing.T) {
  68. tests := []struct {
  69. m uint64
  70. n uint64
  71. to uint64
  72. w bool
  73. wn uint64
  74. }{
  75. {
  76. // match != 0 is always false
  77. 1, 0, 0, false, 0,
  78. },
  79. {
  80. // match != 0 and to is greater than match
  81. // directly decrease to match+1
  82. 5, 10, 5, false, 10,
  83. },
  84. {
  85. // match != 0 and to is greater than match
  86. // directly decrease to match+1
  87. 5, 10, 4, false, 10,
  88. },
  89. {
  90. // match != 0 and to is not greater than match
  91. 5, 10, 9, true, 6,
  92. },
  93. {
  94. // next-1 != to is always false
  95. 0, 0, 0, false, 0,
  96. },
  97. {
  98. // next-1 != to is always false
  99. 0, 10, 5, false, 10,
  100. },
  101. {
  102. // next>1 = decremented by 1
  103. 0, 10, 9, true, 9,
  104. },
  105. {
  106. // next>1 = decremented by 1
  107. 0, 2, 1, true, 1,
  108. },
  109. {
  110. // next<=1 = reset to 1
  111. 0, 1, 0, true, 1,
  112. },
  113. }
  114. for i, tt := range tests {
  115. p := &progress{
  116. match: tt.m,
  117. next: tt.n,
  118. }
  119. if g := p.maybeDecrTo(tt.to); g != tt.w {
  120. t.Errorf("#%d: maybeDecrTo=%t, want %t", i, g, tt.w)
  121. }
  122. if gm := p.match; gm != tt.m {
  123. t.Errorf("#%d: match=%d, want %d", i, gm, tt.m)
  124. }
  125. if gn := p.next; gn != tt.wn {
  126. t.Errorf("#%d: next=%d, want %d", i, gn, tt.wn)
  127. }
  128. }
  129. }
  130. func TestLeaderElection(t *testing.T) {
  131. tests := []struct {
  132. *network
  133. state StateType
  134. }{
  135. {newNetwork(nil, nil, nil), StateLeader},
  136. {newNetwork(nil, nil, nopStepper), StateLeader},
  137. {newNetwork(nil, nopStepper, nopStepper), StateCandidate},
  138. {newNetwork(nil, nopStepper, nopStepper, nil), StateCandidate},
  139. {newNetwork(nil, nopStepper, nopStepper, nil, nil), StateLeader},
  140. // three logs further along than 0
  141. {newNetwork(nil, ents(1), ents(2), ents(1, 3), nil), StateFollower},
  142. // logs converge
  143. {newNetwork(ents(1), nil, ents(2), ents(1), nil), StateLeader},
  144. }
  145. for i, tt := range tests {
  146. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  147. sm := tt.network.peers[1].(*raft)
  148. if sm.state != tt.state {
  149. t.Errorf("#%d: state = %s, want %s", i, sm.state, tt.state)
  150. }
  151. if g := sm.Term; g != 1 {
  152. t.Errorf("#%d: term = %d, want %d", i, g, 1)
  153. }
  154. }
  155. }
  156. func TestLogReplication(t *testing.T) {
  157. tests := []struct {
  158. *network
  159. msgs []pb.Message
  160. wcommitted uint64
  161. }{
  162. {
  163. newNetwork(nil, nil, nil),
  164. []pb.Message{
  165. {From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}},
  166. },
  167. 2,
  168. },
  169. {
  170. newNetwork(nil, nil, nil),
  171. []pb.Message{
  172. {From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}},
  173. {From: 1, To: 2, Type: pb.MsgHup},
  174. {From: 1, To: 2, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}},
  175. },
  176. 4,
  177. },
  178. }
  179. for i, tt := range tests {
  180. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  181. for _, m := range tt.msgs {
  182. tt.send(m)
  183. }
  184. for j, x := range tt.network.peers {
  185. sm := x.(*raft)
  186. if sm.raftLog.committed != tt.wcommitted {
  187. t.Errorf("#%d.%d: committed = %d, want %d", i, j, sm.raftLog.committed, tt.wcommitted)
  188. }
  189. ents := []pb.Entry{}
  190. for _, e := range nextEnts(sm, tt.network.storage[j]) {
  191. if e.Data != nil {
  192. ents = append(ents, e)
  193. }
  194. }
  195. props := []pb.Message{}
  196. for _, m := range tt.msgs {
  197. if m.Type == pb.MsgProp {
  198. props = append(props, m)
  199. }
  200. }
  201. for k, m := range props {
  202. if !bytes.Equal(ents[k].Data, m.Entries[0].Data) {
  203. t.Errorf("#%d.%d: data = %d, want %d", i, j, ents[k].Data, m.Entries[0].Data)
  204. }
  205. }
  206. }
  207. }
  208. }
  209. func TestSingleNodeCommit(t *testing.T) {
  210. tt := newNetwork(nil)
  211. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  212. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  213. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  214. sm := tt.peers[1].(*raft)
  215. if sm.raftLog.committed != 3 {
  216. t.Errorf("committed = %d, want %d", sm.raftLog.committed, 3)
  217. }
  218. }
  219. // TestCannotCommitWithoutNewTermEntry tests the entries cannot be committed
  220. // when leader changes, no new proposal comes in and ChangeTerm proposal is
  221. // filtered.
  222. func TestCannotCommitWithoutNewTermEntry(t *testing.T) {
  223. tt := newNetwork(nil, nil, nil, nil, nil)
  224. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  225. // 0 cannot reach 2,3,4
  226. tt.cut(1, 3)
  227. tt.cut(1, 4)
  228. tt.cut(1, 5)
  229. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  230. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  231. sm := tt.peers[1].(*raft)
  232. if sm.raftLog.committed != 1 {
  233. t.Errorf("committed = %d, want %d", sm.raftLog.committed, 1)
  234. }
  235. // network recovery
  236. tt.recover()
  237. // avoid committing ChangeTerm proposal
  238. tt.ignore(pb.MsgApp)
  239. // elect 1 as the new leader with term 2
  240. tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup})
  241. // no log entries from previous term should be committed
  242. sm = tt.peers[2].(*raft)
  243. if sm.raftLog.committed != 1 {
  244. t.Errorf("committed = %d, want %d", sm.raftLog.committed, 1)
  245. }
  246. tt.recover()
  247. // still be able to append a entry
  248. tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  249. if sm.raftLog.committed != 5 {
  250. t.Errorf("committed = %d, want %d", sm.raftLog.committed, 5)
  251. }
  252. }
  253. // TestCommitWithoutNewTermEntry tests the entries could be committed
  254. // when leader changes, no new proposal comes in.
  255. func TestCommitWithoutNewTermEntry(t *testing.T) {
  256. tt := newNetwork(nil, nil, nil, nil, nil)
  257. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  258. // 0 cannot reach 2,3,4
  259. tt.cut(1, 3)
  260. tt.cut(1, 4)
  261. tt.cut(1, 5)
  262. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  263. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  264. sm := tt.peers[1].(*raft)
  265. if sm.raftLog.committed != 1 {
  266. t.Errorf("committed = %d, want %d", sm.raftLog.committed, 1)
  267. }
  268. // network recovery
  269. tt.recover()
  270. // elect 1 as the new leader with term 2
  271. // after append a ChangeTerm entry from the current term, all entries
  272. // should be committed
  273. tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup})
  274. if sm.raftLog.committed != 4 {
  275. t.Errorf("committed = %d, want %d", sm.raftLog.committed, 4)
  276. }
  277. }
  278. func TestDuelingCandidates(t *testing.T) {
  279. a := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
  280. b := newRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
  281. c := newRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
  282. nt := newNetwork(a, b, c)
  283. nt.cut(1, 3)
  284. nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  285. nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
  286. nt.recover()
  287. nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
  288. wlog := &raftLog{
  289. storage: &MemoryStorage{ents: []pb.Entry{{}, pb.Entry{Data: nil, Term: 1, Index: 1}}},
  290. committed: 1,
  291. unstable: unstable{offset: 2},
  292. }
  293. tests := []struct {
  294. sm *raft
  295. state StateType
  296. term uint64
  297. raftLog *raftLog
  298. }{
  299. {a, StateFollower, 2, wlog},
  300. {b, StateFollower, 2, wlog},
  301. {c, StateFollower, 2, newLog(NewMemoryStorage())},
  302. }
  303. for i, tt := range tests {
  304. if g := tt.sm.state; g != tt.state {
  305. t.Errorf("#%d: state = %s, want %s", i, g, tt.state)
  306. }
  307. if g := tt.sm.Term; g != tt.term {
  308. t.Errorf("#%d: term = %d, want %d", i, g, tt.term)
  309. }
  310. base := ltoa(tt.raftLog)
  311. if sm, ok := nt.peers[1+uint64(i)].(*raft); ok {
  312. l := ltoa(sm.raftLog)
  313. if g := diffu(base, l); g != "" {
  314. t.Errorf("#%d: diff:\n%s", i, g)
  315. }
  316. } else {
  317. t.Logf("#%d: empty log", i)
  318. }
  319. }
  320. }
  321. func TestCandidateConcede(t *testing.T) {
  322. tt := newNetwork(nil, nil, nil)
  323. tt.isolate(1)
  324. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  325. tt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
  326. // heal the partition
  327. tt.recover()
  328. data := []byte("force follower")
  329. // send a proposal to 2 to flush out a MsgApp to 0
  330. tt.send(pb.Message{From: 3, To: 3, Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
  331. a := tt.peers[1].(*raft)
  332. if g := a.state; g != StateFollower {
  333. t.Errorf("state = %s, want %s", g, StateFollower)
  334. }
  335. if g := a.Term; g != 1 {
  336. t.Errorf("term = %d, want %d", g, 1)
  337. }
  338. wantLog := ltoa(&raftLog{
  339. storage: &MemoryStorage{
  340. ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}},
  341. },
  342. unstable: unstable{offset: 3},
  343. committed: 2,
  344. })
  345. for i, p := range tt.peers {
  346. if sm, ok := p.(*raft); ok {
  347. l := ltoa(sm.raftLog)
  348. if g := diffu(wantLog, l); g != "" {
  349. t.Errorf("#%d: diff:\n%s", i, g)
  350. }
  351. } else {
  352. t.Logf("#%d: empty log", i)
  353. }
  354. }
  355. }
  356. func TestSingleNodeCandidate(t *testing.T) {
  357. tt := newNetwork(nil)
  358. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  359. sm := tt.peers[1].(*raft)
  360. if sm.state != StateLeader {
  361. t.Errorf("state = %d, want %d", sm.state, StateLeader)
  362. }
  363. }
  364. func TestOldMessages(t *testing.T) {
  365. tt := newNetwork(nil, nil, nil)
  366. // make 0 leader @ term 3
  367. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  368. tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup})
  369. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  370. // pretend we're an old leader trying to make progress
  371. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgApp, Term: 1, Entries: []pb.Entry{{Term: 1}}})
  372. l := &raftLog{
  373. storage: &MemoryStorage{
  374. ents: []pb.Entry{
  375. {}, {Data: nil, Term: 1, Index: 1},
  376. {Data: nil, Term: 2, Index: 2}, {Data: nil, Term: 3, Index: 3},
  377. },
  378. },
  379. unstable: unstable{offset: 4},
  380. committed: 3,
  381. }
  382. base := ltoa(l)
  383. for i, p := range tt.peers {
  384. if sm, ok := p.(*raft); ok {
  385. l := ltoa(sm.raftLog)
  386. if g := diffu(base, l); g != "" {
  387. t.Errorf("#%d: diff:\n%s", i, g)
  388. }
  389. } else {
  390. t.Logf("#%d: empty log", i)
  391. }
  392. }
  393. }
  394. // TestOldMessagesReply - optimization - reply with new term.
  395. func TestProposal(t *testing.T) {
  396. tests := []struct {
  397. *network
  398. success bool
  399. }{
  400. {newNetwork(nil, nil, nil), true},
  401. {newNetwork(nil, nil, nopStepper), true},
  402. {newNetwork(nil, nopStepper, nopStepper), false},
  403. {newNetwork(nil, nopStepper, nopStepper, nil), false},
  404. {newNetwork(nil, nopStepper, nopStepper, nil, nil), true},
  405. }
  406. for i, tt := range tests {
  407. send := func(m pb.Message) {
  408. defer func() {
  409. // only recover is we expect it to panic so
  410. // panics we don't expect go up.
  411. if !tt.success {
  412. e := recover()
  413. if e != nil {
  414. t.Logf("#%d: err: %s", i, e)
  415. }
  416. }
  417. }()
  418. tt.send(m)
  419. }
  420. data := []byte("somedata")
  421. // promote 0 the leader
  422. send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  423. send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
  424. wantLog := newLog(NewMemoryStorage())
  425. if tt.success {
  426. wantLog = &raftLog{
  427. storage: &MemoryStorage{
  428. ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}},
  429. },
  430. unstable: unstable{offset: 3},
  431. committed: 2}
  432. }
  433. base := ltoa(wantLog)
  434. for i, p := range tt.peers {
  435. if sm, ok := p.(*raft); ok {
  436. l := ltoa(sm.raftLog)
  437. if g := diffu(base, l); g != "" {
  438. t.Errorf("#%d: diff:\n%s", i, g)
  439. }
  440. } else {
  441. t.Logf("#%d: empty log", i)
  442. }
  443. }
  444. sm := tt.network.peers[1].(*raft)
  445. if g := sm.Term; g != 1 {
  446. t.Errorf("#%d: term = %d, want %d", i, g, 1)
  447. }
  448. }
  449. }
  450. func TestProposalByProxy(t *testing.T) {
  451. data := []byte("somedata")
  452. tests := []*network{
  453. newNetwork(nil, nil, nil),
  454. newNetwork(nil, nil, nopStepper),
  455. }
  456. for i, tt := range tests {
  457. // promote 0 the leader
  458. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  459. // propose via follower
  460. tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
  461. wantLog := &raftLog{
  462. storage: &MemoryStorage{
  463. ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Data: data, Index: 2}},
  464. },
  465. unstable: unstable{offset: 3},
  466. committed: 2}
  467. base := ltoa(wantLog)
  468. for i, p := range tt.peers {
  469. if sm, ok := p.(*raft); ok {
  470. l := ltoa(sm.raftLog)
  471. if g := diffu(base, l); g != "" {
  472. t.Errorf("#%d: diff:\n%s", i, g)
  473. }
  474. } else {
  475. t.Logf("#%d: empty log", i)
  476. }
  477. }
  478. sm := tt.peers[1].(*raft)
  479. if g := sm.Term; g != 1 {
  480. t.Errorf("#%d: term = %d, want %d", i, g, 1)
  481. }
  482. }
  483. }
  484. func TestCommit(t *testing.T) {
  485. tests := []struct {
  486. matches []uint64
  487. logs []pb.Entry
  488. smTerm uint64
  489. w uint64
  490. }{
  491. // single
  492. {[]uint64{1}, []pb.Entry{{}, {Term: 1}}, 1, 1},
  493. {[]uint64{1}, []pb.Entry{{}, {Term: 1}}, 2, 0},
  494. {[]uint64{2}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 2, 2},
  495. {[]uint64{1}, []pb.Entry{{}, {Term: 2}}, 2, 1},
  496. // odd
  497. {[]uint64{2, 1, 1}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
  498. {[]uint64{2, 1, 1}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
  499. {[]uint64{2, 1, 2}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 2, 2},
  500. {[]uint64{2, 1, 2}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
  501. // even
  502. {[]uint64{2, 1, 1, 1}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
  503. {[]uint64{2, 1, 1, 1}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
  504. {[]uint64{2, 1, 1, 2}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
  505. {[]uint64{2, 1, 1, 2}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
  506. {[]uint64{2, 1, 2, 2}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 2, 2},
  507. {[]uint64{2, 1, 2, 2}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
  508. }
  509. for i, tt := range tests {
  510. prs := make(map[uint64]*progress)
  511. for j := 0; j < len(tt.matches); j++ {
  512. prs[uint64(j)] = &progress{tt.matches[j], tt.matches[j] + 1}
  513. }
  514. sm := &raft{
  515. raftLog: &raftLog{storage: &MemoryStorage{ents: tt.logs}, unstable: unstable{offset: uint64(len(tt.logs))}},
  516. prs: prs,
  517. HardState: pb.HardState{Term: tt.smTerm},
  518. }
  519. sm.maybeCommit()
  520. if g := sm.raftLog.committed; g != tt.w {
  521. t.Errorf("#%d: committed = %d, want %d", i, g, tt.w)
  522. }
  523. }
  524. }
  525. func TestIsElectionTimeout(t *testing.T) {
  526. tests := []struct {
  527. elapse int
  528. wprobability float64
  529. round bool
  530. }{
  531. {5, 0, false},
  532. {13, 0.3, true},
  533. {15, 0.5, true},
  534. {18, 0.8, true},
  535. {20, 1, false},
  536. }
  537. for i, tt := range tests {
  538. sm := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
  539. sm.elapsed = tt.elapse
  540. c := 0
  541. for j := 0; j < 10000; j++ {
  542. if sm.isElectionTimeout() {
  543. c++
  544. }
  545. }
  546. got := float64(c) / 10000.0
  547. if tt.round {
  548. got = math.Floor(got*10+0.5) / 10.0
  549. }
  550. if got != tt.wprobability {
  551. t.Errorf("#%d: possibility = %v, want %v", i, got, tt.wprobability)
  552. }
  553. }
  554. }
  555. // ensure that the Step function ignores the message from old term and does not pass it to the
  556. // acutal stepX function.
  557. func TestStepIgnoreOldTermMsg(t *testing.T) {
  558. called := false
  559. fakeStep := func(r *raft, m pb.Message) {
  560. called = true
  561. }
  562. sm := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
  563. sm.step = fakeStep
  564. sm.Term = 2
  565. sm.Step(pb.Message{Type: pb.MsgApp, Term: sm.Term - 1})
  566. if called == true {
  567. t.Errorf("stepFunc called = %v , want %v", called, false)
  568. }
  569. }
  570. // TestHandleMsgApp ensures:
  571. // 1. Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm.
  572. // 2. If an existing entry conflicts with a new one (same index but different terms),
  573. // delete the existing entry and all that follow it; append any new entries not already in the log.
  574. // 3. If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry).
  575. func TestHandleMsgApp(t *testing.T) {
  576. tests := []struct {
  577. m pb.Message
  578. wIndex uint64
  579. wCommit uint64
  580. wReject bool
  581. }{
  582. // Ensure 1
  583. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 3, Index: 2, Commit: 3}, 2, 0, true}, // previous log mismatch
  584. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 3, Index: 3, Commit: 3}, 2, 0, true}, // previous log non-exist
  585. // Ensure 2
  586. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 1, Index: 1, Commit: 1}, 2, 1, false},
  587. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 0, Index: 0, Commit: 1, Entries: []pb.Entry{{Term: 2}}}, 1, 1, false},
  588. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 3, Entries: []pb.Entry{{Term: 2}, {Term: 2}}}, 4, 3, false},
  589. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 4, Entries: []pb.Entry{{Term: 2}}}, 3, 3, false},
  590. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 1, Index: 1, Commit: 4, Entries: []pb.Entry{{Term: 2}}}, 2, 2, false},
  591. // Ensure 3
  592. {pb.Message{Type: pb.MsgApp, Term: 1, LogTerm: 1, Index: 1, Commit: 3}, 2, 1, false}, // match entry 1, commit upto last new entry 1
  593. {pb.Message{Type: pb.MsgApp, Term: 1, LogTerm: 1, Index: 1, Commit: 3, Entries: []pb.Entry{{Term: 2}}}, 2, 2, false}, // match entry 1, commit upto last new entry 2
  594. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 3}, 2, 2, false}, // match entry 2, commit upto last new entry 2
  595. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 4}, 2, 2, false}, // commit upto log.last()
  596. }
  597. for i, tt := range tests {
  598. sm := &raft{
  599. state: StateFollower,
  600. HardState: pb.HardState{Term: 2},
  601. raftLog: &raftLog{
  602. committed: 0,
  603. storage: &MemoryStorage{ents: []pb.Entry{{}, {Term: 1}, {Term: 2}}},
  604. unstable: unstable{offset: 3},
  605. },
  606. }
  607. sm.handleAppendEntries(tt.m)
  608. if sm.raftLog.lastIndex() != tt.wIndex {
  609. t.Errorf("#%d: lastIndex = %d, want %d", i, sm.raftLog.lastIndex(), tt.wIndex)
  610. }
  611. if sm.raftLog.committed != tt.wCommit {
  612. t.Errorf("#%d: committed = %d, want %d", i, sm.raftLog.committed, tt.wCommit)
  613. }
  614. m := sm.readMessages()
  615. if len(m) != 1 {
  616. t.Fatalf("#%d: msg = nil, want 1", i)
  617. }
  618. if m[0].Reject != tt.wReject {
  619. t.Errorf("#%d: reject = %v, want %v", i, m[0].Reject, tt.wReject)
  620. }
  621. }
  622. }
  623. // TestHandleHeartbeat ensures that the follower commits to the commit in the message.
  624. func TestHandleHeartbeat(t *testing.T) {
  625. commit := uint64(2)
  626. tests := []struct {
  627. m pb.Message
  628. wCommit uint64
  629. }{
  630. {pb.Message{Type: pb.MsgApp, Term: 2, Commit: commit + 1}, commit + 1},
  631. {pb.Message{Type: pb.MsgApp, Term: 2, Commit: commit - 1}, commit}, // do not decrease commit
  632. }
  633. for i, tt := range tests {
  634. storage := NewMemoryStorage()
  635. storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}})
  636. sm := &raft{
  637. state: StateFollower,
  638. HardState: pb.HardState{Term: 2},
  639. raftLog: newLog(storage),
  640. }
  641. sm.raftLog.commitTo(commit)
  642. sm.handleHeartbeat(tt.m)
  643. if sm.raftLog.committed != tt.wCommit {
  644. t.Errorf("#%d: committed = %d, want %d", i, sm.raftLog.committed, tt.wCommit)
  645. }
  646. m := sm.readMessages()
  647. if len(m) != 0 {
  648. t.Fatalf("#%d: msg = nil, want 0", i)
  649. }
  650. }
  651. }
  652. func TestRecvMsgVote(t *testing.T) {
  653. tests := []struct {
  654. state StateType
  655. i, term uint64
  656. voteFor uint64
  657. wreject bool
  658. }{
  659. {StateFollower, 0, 0, None, true},
  660. {StateFollower, 0, 1, None, true},
  661. {StateFollower, 0, 2, None, true},
  662. {StateFollower, 0, 3, None, false},
  663. {StateFollower, 1, 0, None, true},
  664. {StateFollower, 1, 1, None, true},
  665. {StateFollower, 1, 2, None, true},
  666. {StateFollower, 1, 3, None, false},
  667. {StateFollower, 2, 0, None, true},
  668. {StateFollower, 2, 1, None, true},
  669. {StateFollower, 2, 2, None, false},
  670. {StateFollower, 2, 3, None, false},
  671. {StateFollower, 3, 0, None, true},
  672. {StateFollower, 3, 1, None, true},
  673. {StateFollower, 3, 2, None, false},
  674. {StateFollower, 3, 3, None, false},
  675. {StateFollower, 3, 2, 2, false},
  676. {StateFollower, 3, 2, 1, true},
  677. {StateLeader, 3, 3, 1, true},
  678. {StateCandidate, 3, 3, 1, true},
  679. }
  680. for i, tt := range tests {
  681. sm := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
  682. sm.state = tt.state
  683. switch tt.state {
  684. case StateFollower:
  685. sm.step = stepFollower
  686. case StateCandidate:
  687. sm.step = stepCandidate
  688. case StateLeader:
  689. sm.step = stepLeader
  690. }
  691. sm.HardState = pb.HardState{Vote: tt.voteFor}
  692. sm.raftLog = &raftLog{
  693. storage: &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 2}, {Index: 2, Term: 2}}},
  694. unstable: unstable{offset: 3},
  695. }
  696. sm.Step(pb.Message{Type: pb.MsgVote, From: 2, Index: tt.i, LogTerm: tt.term})
  697. msgs := sm.readMessages()
  698. if g := len(msgs); g != 1 {
  699. t.Fatalf("#%d: len(msgs) = %d, want 1", i, g)
  700. continue
  701. }
  702. if g := msgs[0].Reject; g != tt.wreject {
  703. t.Errorf("#%d, m.Reject = %v, want %v", i, g, tt.wreject)
  704. }
  705. }
  706. }
  707. func TestStateTransition(t *testing.T) {
  708. tests := []struct {
  709. from StateType
  710. to StateType
  711. wallow bool
  712. wterm uint64
  713. wlead uint64
  714. }{
  715. {StateFollower, StateFollower, true, 1, None},
  716. {StateFollower, StateCandidate, true, 1, None},
  717. {StateFollower, StateLeader, false, 0, None},
  718. {StateCandidate, StateFollower, true, 0, None},
  719. {StateCandidate, StateCandidate, true, 1, None},
  720. {StateCandidate, StateLeader, true, 0, 1},
  721. {StateLeader, StateFollower, true, 1, None},
  722. {StateLeader, StateCandidate, false, 1, None},
  723. {StateLeader, StateLeader, true, 0, 1},
  724. }
  725. for i, tt := range tests {
  726. func() {
  727. defer func() {
  728. if r := recover(); r != nil {
  729. if tt.wallow == true {
  730. t.Errorf("%d: allow = %v, want %v", i, false, true)
  731. }
  732. }
  733. }()
  734. sm := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
  735. sm.state = tt.from
  736. switch tt.to {
  737. case StateFollower:
  738. sm.becomeFollower(tt.wterm, tt.wlead)
  739. case StateCandidate:
  740. sm.becomeCandidate()
  741. case StateLeader:
  742. sm.becomeLeader()
  743. }
  744. if sm.Term != tt.wterm {
  745. t.Errorf("%d: term = %d, want %d", i, sm.Term, tt.wterm)
  746. }
  747. if sm.lead != tt.wlead {
  748. t.Errorf("%d: lead = %d, want %d", i, sm.lead, tt.wlead)
  749. }
  750. }()
  751. }
  752. }
  753. func TestAllServerStepdown(t *testing.T) {
  754. tests := []struct {
  755. state StateType
  756. wstate StateType
  757. wterm uint64
  758. windex uint64
  759. }{
  760. {StateFollower, StateFollower, 3, 0},
  761. {StateCandidate, StateFollower, 3, 0},
  762. {StateLeader, StateFollower, 3, 1},
  763. }
  764. tmsgTypes := [...]pb.MessageType{pb.MsgVote, pb.MsgApp}
  765. tterm := uint64(3)
  766. for i, tt := range tests {
  767. sm := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
  768. switch tt.state {
  769. case StateFollower:
  770. sm.becomeFollower(1, None)
  771. case StateCandidate:
  772. sm.becomeCandidate()
  773. case StateLeader:
  774. sm.becomeCandidate()
  775. sm.becomeLeader()
  776. }
  777. for j, msgType := range tmsgTypes {
  778. sm.Step(pb.Message{From: 2, Type: msgType, Term: tterm, LogTerm: tterm})
  779. if sm.state != tt.wstate {
  780. t.Errorf("#%d.%d state = %v , want %v", i, j, sm.state, tt.wstate)
  781. }
  782. if sm.Term != tt.wterm {
  783. t.Errorf("#%d.%d term = %v , want %v", i, j, sm.Term, tt.wterm)
  784. }
  785. if uint64(sm.raftLog.lastIndex()) != tt.windex {
  786. t.Errorf("#%d.%d index = %v , want %v", i, j, sm.raftLog.lastIndex(), tt.windex)
  787. }
  788. if uint64(len(sm.raftLog.allEntries())) != tt.windex {
  789. t.Errorf("#%d.%d len(ents) = %v , want %v", i, j, len(sm.raftLog.allEntries()), tt.windex)
  790. }
  791. wlead := uint64(2)
  792. if msgType == pb.MsgVote {
  793. wlead = None
  794. }
  795. if sm.lead != wlead {
  796. t.Errorf("#%d, sm.lead = %d, want %d", i, sm.lead, None)
  797. }
  798. }
  799. }
  800. }
  801. func TestLeaderAppResp(t *testing.T) {
  802. // initial progress: match = 0; next = 3
  803. tests := []struct {
  804. index uint64
  805. reject bool
  806. // progress
  807. wmatch uint64
  808. wnext uint64
  809. // message
  810. wmsgNum int
  811. windex uint64
  812. wcommitted uint64
  813. }{
  814. {3, true, 0, 3, 0, 0, 0}, // stale resp; no replies
  815. {2, true, 0, 2, 1, 1, 0}, // denied resp; leader does not commit; decrese next and send probing msg
  816. {2, false, 2, 4, 2, 2, 2}, // accept resp; leader commits; broadcast with commit index
  817. {0, false, 0, 3, 0, 0, 0}, // ignore heartbeat replies
  818. }
  819. for i, tt := range tests {
  820. // sm term is 1 after it becomes the leader.
  821. // thus the last log term must be 1 to be committed.
  822. sm := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
  823. sm.raftLog = &raftLog{
  824. storage: &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 0}, {Index: 2, Term: 1}}},
  825. unstable: unstable{offset: 3},
  826. }
  827. sm.becomeCandidate()
  828. sm.becomeLeader()
  829. sm.readMessages()
  830. sm.Step(pb.Message{From: 2, Type: pb.MsgAppResp, Index: tt.index, Term: sm.Term, Reject: tt.reject})
  831. p := sm.prs[2]
  832. if p.match != tt.wmatch {
  833. t.Errorf("#%d match = %d, want %d", i, p.match, tt.wmatch)
  834. }
  835. if p.next != tt.wnext {
  836. t.Errorf("#%d next = %d, want %d", i, p.next, tt.wnext)
  837. }
  838. msgs := sm.readMessages()
  839. if len(msgs) != tt.wmsgNum {
  840. t.Errorf("#%d msgNum = %d, want %d", i, len(msgs), tt.wmsgNum)
  841. }
  842. for j, msg := range msgs {
  843. if msg.Index != tt.windex {
  844. t.Errorf("#%d.%d index = %d, want %d", i, j, msg.Index, tt.windex)
  845. }
  846. if msg.Commit != tt.wcommitted {
  847. t.Errorf("#%d.%d commit = %d, want %d", i, j, msg.Commit, tt.wcommitted)
  848. }
  849. }
  850. }
  851. }
  852. // When the leader receives a heartbeat tick, it should
  853. // send a MsgApp with m.Index = 0, m.LogTerm=0 and empty entries.
  854. func TestBcastBeat(t *testing.T) {
  855. offset := uint64(1000)
  856. // make a state machine with log.offset = 1000
  857. s := pb.Snapshot{
  858. Metadata: pb.SnapshotMetadata{
  859. Index: offset,
  860. Term: 1,
  861. ConfState: pb.ConfState{Nodes: []uint64{1, 2, 3}},
  862. },
  863. }
  864. storage := NewMemoryStorage()
  865. storage.ApplySnapshot(s)
  866. sm := newRaft(1, nil, 10, 1, storage)
  867. sm.Term = 1
  868. sm.becomeCandidate()
  869. sm.becomeLeader()
  870. for i := 0; i < 10; i++ {
  871. sm.appendEntry(pb.Entry{Index: uint64(i) + 1})
  872. }
  873. // slow follower
  874. sm.prs[2].match, sm.prs[2].next = 5, 6
  875. // normal follower
  876. sm.prs[3].match, sm.prs[3].next = sm.raftLog.lastIndex(), sm.raftLog.lastIndex()+1
  877. sm.Step(pb.Message{Type: pb.MsgBeat})
  878. msgs := sm.readMessages()
  879. if len(msgs) != 2 {
  880. t.Fatalf("len(msgs) = %v, want 2", len(msgs))
  881. }
  882. wantCommitMap := map[uint64]uint64{
  883. 2: min(sm.raftLog.committed, sm.prs[2].match),
  884. 3: min(sm.raftLog.committed, sm.prs[3].match),
  885. }
  886. for i, m := range msgs {
  887. if m.Type != pb.MsgApp {
  888. t.Fatalf("#%d: type = %v, want = %v", i, m.Type, pb.MsgApp)
  889. }
  890. if m.Index != 0 {
  891. t.Fatalf("#%d: prevIndex = %d, want %d", i, m.Index, 0)
  892. }
  893. if m.LogTerm != 0 {
  894. t.Fatalf("#%d: prevTerm = %d, want %d", i, m.LogTerm, 0)
  895. }
  896. if wantCommitMap[m.To] == 0 {
  897. t.Fatalf("#%d: unexpected to %d", i, m.To)
  898. } else {
  899. if m.Commit != wantCommitMap[m.To] {
  900. t.Fatalf("#%d: commit = %d, want %d", i, m.Commit, wantCommitMap[m.To])
  901. }
  902. delete(wantCommitMap, m.To)
  903. }
  904. if len(m.Entries) != 0 {
  905. t.Fatalf("#%d: len(entries) = %d, want 0", i, len(m.Entries))
  906. }
  907. }
  908. }
  909. // tests the output of the statemachine when receiving MsgBeat
  910. func TestRecvMsgBeat(t *testing.T) {
  911. tests := []struct {
  912. state StateType
  913. wMsg int
  914. }{
  915. {StateLeader, 2},
  916. // candidate and follower should ignore MsgBeat
  917. {StateCandidate, 0},
  918. {StateFollower, 0},
  919. }
  920. for i, tt := range tests {
  921. sm := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
  922. sm.raftLog = &raftLog{storage: &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 0}, {Index: 2, Term: 1}}}}
  923. sm.Term = 1
  924. sm.state = tt.state
  925. switch tt.state {
  926. case StateFollower:
  927. sm.step = stepFollower
  928. case StateCandidate:
  929. sm.step = stepCandidate
  930. case StateLeader:
  931. sm.step = stepLeader
  932. }
  933. sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
  934. msgs := sm.readMessages()
  935. if len(msgs) != tt.wMsg {
  936. t.Errorf("%d: len(msgs) = %d, want %d", i, len(msgs), tt.wMsg)
  937. }
  938. for _, m := range msgs {
  939. if m.Type != pb.MsgApp {
  940. t.Errorf("%d: msg.type = %v, want %v", i, m.Type, pb.MsgApp)
  941. }
  942. }
  943. }
  944. }
  945. func TestLeaderIncreaseNext(t *testing.T) {
  946. previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3}}
  947. tests := []struct {
  948. // progress
  949. match uint64
  950. next uint64
  951. wnext uint64
  952. }{
  953. // match is not zero, optimistically increase next
  954. // previous entries + noop entry + propose + 1
  955. {1, 2, uint64(len(previousEnts) + 1 + 1 + 1)},
  956. // match is zero, not optimistically increase next
  957. {0, 2, 2},
  958. }
  959. for i, tt := range tests {
  960. sm := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
  961. sm.raftLog.append(0, previousEnts...)
  962. sm.becomeCandidate()
  963. sm.becomeLeader()
  964. sm.prs[2].match, sm.prs[2].next = tt.match, tt.next
  965. sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
  966. p := sm.prs[2]
  967. if p.next != tt.wnext {
  968. t.Errorf("#%d next = %d, want %d", i, p.next, tt.wnext)
  969. }
  970. }
  971. }
  972. func TestRestore(t *testing.T) {
  973. s := pb.Snapshot{
  974. Metadata: pb.SnapshotMetadata{
  975. Index: 11, // magic number
  976. Term: 11, // magic number
  977. ConfState: pb.ConfState{Nodes: []uint64{1, 2, 3}},
  978. },
  979. }
  980. storage := NewMemoryStorage()
  981. sm := newRaft(1, []uint64{1, 2}, 10, 1, storage)
  982. if ok := sm.restore(s); !ok {
  983. t.Fatal("restore fail, want succeed")
  984. }
  985. if sm.raftLog.lastIndex() != s.Metadata.Index {
  986. t.Errorf("log.lastIndex = %d, want %d", sm.raftLog.lastIndex(), s.Metadata.Index)
  987. }
  988. if sm.raftLog.term(s.Metadata.Index) != s.Metadata.Term {
  989. t.Errorf("log.lastTerm = %d, want %d", sm.raftLog.term(s.Metadata.Index), s.Metadata.Term)
  990. }
  991. sg := sm.nodes()
  992. if !reflect.DeepEqual(sg, s.Metadata.ConfState.Nodes) {
  993. t.Errorf("sm.Nodes = %+v, want %+v", sg, s.Metadata.ConfState.Nodes)
  994. }
  995. if ok := sm.restore(s); ok {
  996. t.Fatal("restore succeed, want fail")
  997. }
  998. }
  999. func TestProvideSnap(t *testing.T) {
  1000. // restore the statemachin from a snapshot
  1001. // so it has a compacted log and a snapshot
  1002. s := pb.Snapshot{
  1003. Metadata: pb.SnapshotMetadata{
  1004. Index: 11, // magic number
  1005. Term: 11, // magic number
  1006. ConfState: pb.ConfState{Nodes: []uint64{1, 2}},
  1007. },
  1008. }
  1009. storage := NewMemoryStorage()
  1010. sm := newRaft(1, []uint64{1}, 10, 1, storage)
  1011. sm.restore(s)
  1012. sm.becomeCandidate()
  1013. sm.becomeLeader()
  1014. // force set the next of node 1, so that
  1015. // node 1 needs a snapshot
  1016. sm.prs[2].next = sm.raftLog.firstIndex()
  1017. sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs[2].next - 1, Reject: true})
  1018. msgs := sm.readMessages()
  1019. if len(msgs) != 1 {
  1020. t.Fatalf("len(msgs) = %d, want 1", len(msgs))
  1021. }
  1022. m := msgs[0]
  1023. if m.Type != pb.MsgSnap {
  1024. t.Errorf("m.Type = %v, want %v", m.Type, pb.MsgSnap)
  1025. }
  1026. }
  1027. func TestRestoreFromSnapMsg(t *testing.T) {
  1028. s := pb.Snapshot{
  1029. Metadata: pb.SnapshotMetadata{
  1030. Index: 11, // magic number
  1031. Term: 11, // magic number
  1032. ConfState: pb.ConfState{Nodes: []uint64{1, 2}},
  1033. },
  1034. }
  1035. m := pb.Message{Type: pb.MsgSnap, From: 1, Term: 2, Snapshot: s}
  1036. sm := newRaft(2, []uint64{1, 2}, 10, 1, NewMemoryStorage())
  1037. sm.Step(m)
  1038. // TODO(bdarnell): what should this test?
  1039. }
  1040. func TestSlowNodeRestore(t *testing.T) {
  1041. nt := newNetwork(nil, nil, nil)
  1042. nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  1043. nt.isolate(3)
  1044. for j := 0; j <= 100; j++ {
  1045. nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
  1046. }
  1047. lead := nt.peers[1].(*raft)
  1048. nextEnts(lead, nt.storage[1])
  1049. nt.storage[1].Compact(lead.raftLog.applied, &pb.ConfState{Nodes: lead.nodes()}, nil)
  1050. nt.recover()
  1051. // trigger a snapshot
  1052. nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
  1053. follower := nt.peers[3].(*raft)
  1054. // trigger a commit
  1055. nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
  1056. if follower.raftLog.committed != lead.raftLog.committed {
  1057. t.Errorf("follower.comitted = %d, want %d", follower.raftLog.committed, lead.raftLog.committed)
  1058. }
  1059. }
  1060. // TestStepConfig tests that when raft step msgProp in EntryConfChange type,
  1061. // it appends the entry to log and sets pendingConf to be true.
  1062. func TestStepConfig(t *testing.T) {
  1063. // a raft that cannot make progress
  1064. r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
  1065. r.becomeCandidate()
  1066. r.becomeLeader()
  1067. index := r.raftLog.lastIndex()
  1068. r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
  1069. if g := r.raftLog.lastIndex(); g != index+1 {
  1070. t.Errorf("index = %d, want %d", g, index+1)
  1071. }
  1072. if r.pendingConf != true {
  1073. t.Errorf("pendingConf = %v, want true", r.pendingConf)
  1074. }
  1075. }
  1076. // TestStepIgnoreConfig tests that if raft step the second msgProp in
  1077. // EntryConfChange type when the first one is uncommitted, the node will deny
  1078. // the proposal and keep its original state.
  1079. func TestStepIgnoreConfig(t *testing.T) {
  1080. // a raft that cannot make progress
  1081. r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
  1082. r.becomeCandidate()
  1083. r.becomeLeader()
  1084. r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
  1085. index := r.raftLog.lastIndex()
  1086. pendingConf := r.pendingConf
  1087. r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
  1088. if g := r.raftLog.lastIndex(); g != index {
  1089. t.Errorf("index = %d, want %d", g, index)
  1090. }
  1091. if r.pendingConf != pendingConf {
  1092. t.Errorf("pendingConf = %v, want %v", r.pendingConf, pendingConf)
  1093. }
  1094. }
  1095. // TestRecoverPendingConfig tests that new leader recovers its pendingConf flag
  1096. // based on uncommitted entries.
  1097. func TestRecoverPendingConfig(t *testing.T) {
  1098. tests := []struct {
  1099. entType pb.EntryType
  1100. wpending bool
  1101. }{
  1102. {pb.EntryNormal, false},
  1103. {pb.EntryConfChange, true},
  1104. }
  1105. for i, tt := range tests {
  1106. r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
  1107. r.appendEntry(pb.Entry{Type: tt.entType})
  1108. r.becomeCandidate()
  1109. r.becomeLeader()
  1110. if r.pendingConf != tt.wpending {
  1111. t.Errorf("#%d: pendingConf = %v, want %v", i, r.pendingConf, tt.wpending)
  1112. }
  1113. }
  1114. }
  1115. // TestRecoverDoublePendingConfig tests that new leader will panic if
  1116. // there exist two uncommitted config entries.
  1117. func TestRecoverDoublePendingConfig(t *testing.T) {
  1118. func() {
  1119. defer func() {
  1120. if err := recover(); err == nil {
  1121. t.Errorf("expect panic, but nothing happens")
  1122. }
  1123. }()
  1124. r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
  1125. r.appendEntry(pb.Entry{Type: pb.EntryConfChange})
  1126. r.appendEntry(pb.Entry{Type: pb.EntryConfChange})
  1127. r.becomeCandidate()
  1128. r.becomeLeader()
  1129. }()
  1130. }
  1131. // TestAddNode tests that addNode could update pendingConf and nodes correctly.
  1132. func TestAddNode(t *testing.T) {
  1133. r := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
  1134. r.pendingConf = true
  1135. r.addNode(2)
  1136. if r.pendingConf != false {
  1137. t.Errorf("pendingConf = %v, want false", r.pendingConf)
  1138. }
  1139. nodes := r.nodes()
  1140. wnodes := []uint64{1, 2}
  1141. if !reflect.DeepEqual(nodes, wnodes) {
  1142. t.Errorf("nodes = %v, want %v", nodes, wnodes)
  1143. }
  1144. }
  1145. // TestRemoveNode tests that removeNode could update pendingConf, nodes and
  1146. // and removed list correctly.
  1147. func TestRemoveNode(t *testing.T) {
  1148. r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
  1149. r.pendingConf = true
  1150. r.removeNode(2)
  1151. if r.pendingConf != false {
  1152. t.Errorf("pendingConf = %v, want false", r.pendingConf)
  1153. }
  1154. w := []uint64{1}
  1155. if g := r.nodes(); !reflect.DeepEqual(g, w) {
  1156. t.Errorf("nodes = %v, want %v", g, w)
  1157. }
  1158. }
  1159. func TestPromotable(t *testing.T) {
  1160. id := uint64(1)
  1161. tests := []struct {
  1162. peers []uint64
  1163. wp bool
  1164. }{
  1165. {[]uint64{1}, true},
  1166. {[]uint64{1, 2, 3}, true},
  1167. {[]uint64{}, false},
  1168. {[]uint64{2, 3}, false},
  1169. }
  1170. for i, tt := range tests {
  1171. r := &raft{id: id, prs: make(map[uint64]*progress)}
  1172. for _, id := range tt.peers {
  1173. r.prs[id] = &progress{}
  1174. }
  1175. if g := r.promotable(); g != tt.wp {
  1176. t.Errorf("#%d: promotable = %v, want %v", i, g, tt.wp)
  1177. }
  1178. }
  1179. }
  1180. func TestRaftNodes(t *testing.T) {
  1181. tests := []struct {
  1182. ids []uint64
  1183. wids []uint64
  1184. }{
  1185. {
  1186. []uint64{1, 2, 3},
  1187. []uint64{1, 2, 3},
  1188. },
  1189. {
  1190. []uint64{3, 2, 1},
  1191. []uint64{1, 2, 3},
  1192. },
  1193. }
  1194. for i, tt := range tests {
  1195. r := newRaft(1, tt.ids, 10, 1, NewMemoryStorage())
  1196. if !reflect.DeepEqual(r.nodes(), tt.wids) {
  1197. t.Errorf("#%d: nodes = %+v, want %+v", i, r.nodes(), tt.wids)
  1198. }
  1199. }
  1200. }
  1201. func ents(terms ...uint64) *raft {
  1202. ents := []pb.Entry{{}}
  1203. for i, term := range terms {
  1204. ents = append(ents, pb.Entry{Index: uint64(i), Term: term})
  1205. }
  1206. sm := &raft{
  1207. raftLog: &raftLog{
  1208. storage: &MemoryStorage{ents: ents},
  1209. unstable: unstable{offset: uint64(len(ents))},
  1210. },
  1211. }
  1212. sm.reset(0)
  1213. return sm
  1214. }
  1215. type network struct {
  1216. peers map[uint64]Interface
  1217. storage map[uint64]*MemoryStorage
  1218. dropm map[connem]float64
  1219. ignorem map[pb.MessageType]bool
  1220. }
  1221. // newNetwork initializes a network from peers.
  1222. // A nil node will be replaced with a new *stateMachine.
  1223. // A *stateMachine will get its k, id.
  1224. // When using stateMachine, the address list is always [1, n].
  1225. func newNetwork(peers ...Interface) *network {
  1226. size := len(peers)
  1227. peerAddrs := idsBySize(size)
  1228. npeers := make(map[uint64]Interface, size)
  1229. nstorage := make(map[uint64]*MemoryStorage, size)
  1230. for i, p := range peers {
  1231. id := peerAddrs[i]
  1232. switch v := p.(type) {
  1233. case nil:
  1234. nstorage[id] = NewMemoryStorage()
  1235. sm := newRaft(id, peerAddrs, 10, 1, nstorage[id])
  1236. npeers[id] = sm
  1237. case *raft:
  1238. v.id = id
  1239. v.prs = make(map[uint64]*progress)
  1240. for i := 0; i < size; i++ {
  1241. v.prs[peerAddrs[i]] = &progress{}
  1242. }
  1243. v.reset(0)
  1244. npeers[id] = v
  1245. case *blackHole:
  1246. npeers[id] = v
  1247. default:
  1248. panic(fmt.Sprintf("unexpected state machine type: %T", p))
  1249. }
  1250. }
  1251. return &network{
  1252. peers: npeers,
  1253. storage: nstorage,
  1254. dropm: make(map[connem]float64),
  1255. ignorem: make(map[pb.MessageType]bool),
  1256. }
  1257. }
  1258. func (nw *network) send(msgs ...pb.Message) {
  1259. for len(msgs) > 0 {
  1260. m := msgs[0]
  1261. p := nw.peers[m.To]
  1262. p.Step(m)
  1263. msgs = append(msgs[1:], nw.filter(p.readMessages())...)
  1264. }
  1265. }
  1266. func (nw *network) drop(from, to uint64, perc float64) {
  1267. nw.dropm[connem{from, to}] = perc
  1268. }
  1269. func (nw *network) cut(one, other uint64) {
  1270. nw.drop(one, other, 1)
  1271. nw.drop(other, one, 1)
  1272. }
  1273. func (nw *network) isolate(id uint64) {
  1274. for i := 0; i < len(nw.peers); i++ {
  1275. nid := uint64(i) + 1
  1276. if nid != id {
  1277. nw.drop(id, nid, 1.0)
  1278. nw.drop(nid, id, 1.0)
  1279. }
  1280. }
  1281. }
  1282. func (nw *network) ignore(t pb.MessageType) {
  1283. nw.ignorem[t] = true
  1284. }
  1285. func (nw *network) recover() {
  1286. nw.dropm = make(map[connem]float64)
  1287. nw.ignorem = make(map[pb.MessageType]bool)
  1288. }
  1289. func (nw *network) filter(msgs []pb.Message) []pb.Message {
  1290. mm := []pb.Message{}
  1291. for _, m := range msgs {
  1292. if nw.ignorem[m.Type] {
  1293. continue
  1294. }
  1295. switch m.Type {
  1296. case pb.MsgHup:
  1297. // hups never go over the network, so don't drop them but panic
  1298. panic("unexpected msgHup")
  1299. default:
  1300. perc := nw.dropm[connem{m.From, m.To}]
  1301. if n := rand.Float64(); n < perc {
  1302. continue
  1303. }
  1304. }
  1305. mm = append(mm, m)
  1306. }
  1307. return mm
  1308. }
  1309. type connem struct {
  1310. from, to uint64
  1311. }
  1312. type blackHole struct{}
  1313. func (blackHole) Step(pb.Message) error { return nil }
  1314. func (blackHole) readMessages() []pb.Message { return nil }
  1315. var nopStepper = &blackHole{}
  1316. func idsBySize(size int) []uint64 {
  1317. ids := make([]uint64, size)
  1318. for i := 0; i < size; i++ {
  1319. ids[i] = 1 + uint64(i)
  1320. }
  1321. return ids
  1322. }