raft_test.go 35 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package raft
  14. import (
  15. "bytes"
  16. "fmt"
  17. "math"
  18. "math/rand"
  19. "reflect"
  20. "sort"
  21. "testing"
  22. pb "github.com/coreos/etcd/raft/raftpb"
  23. )
  24. // nextEnts returns the appliable entries and updates the applied index
  25. func nextEnts(r *raft) (ents []pb.Entry) {
  26. ents = r.raftLog.nextEnts()
  27. r.raftLog.resetNextEnts()
  28. return ents
  29. }
  30. type Interface interface {
  31. Step(m pb.Message) error
  32. readMessages() []pb.Message
  33. }
  34. func (r *raft) readMessages() []pb.Message {
  35. msgs := r.msgs
  36. r.msgs = make([]pb.Message, 0)
  37. return msgs
  38. }
  39. func TestProgressMaybeDecr(t *testing.T) {
  40. tests := []struct {
  41. m uint64
  42. n uint64
  43. to uint64
  44. w bool
  45. wn uint64
  46. }{
  47. {
  48. // match != 0 is always false
  49. 1, 0, 0, false, 0,
  50. },
  51. {
  52. // match != 0 is always false
  53. 5, 10, 9, false, 10,
  54. },
  55. {
  56. // next-1 != to is always false
  57. 0, 0, 0, false, 0,
  58. },
  59. {
  60. // next-1 != to is always false
  61. 0, 10, 5, false, 10,
  62. },
  63. {
  64. // next>1 = decremented by 1
  65. 0, 10, 9, true, 9,
  66. },
  67. {
  68. // next>1 = decremented by 1
  69. 0, 2, 1, true, 1,
  70. },
  71. {
  72. // next<=1 = reset to 1
  73. 0, 1, 0, true, 1,
  74. },
  75. }
  76. for i, tt := range tests {
  77. p := &progress{
  78. match: tt.m,
  79. next: tt.n,
  80. }
  81. if g := p.maybeDecrTo(tt.to); g != tt.w {
  82. t.Errorf("#%d: maybeDecrTo=%t, want %t", i, g, tt.w)
  83. }
  84. if gm := p.match; gm != tt.m {
  85. t.Errorf("#%d: match=%d, want %d", i, gm, tt.m)
  86. }
  87. if gn := p.next; gn != tt.wn {
  88. t.Errorf("#%d: next=%d, want %d", i, gn, tt.wn)
  89. }
  90. }
  91. }
  92. func TestLeaderElection(t *testing.T) {
  93. tests := []struct {
  94. *network
  95. state StateType
  96. }{
  97. {newNetwork(nil, nil, nil), StateLeader},
  98. {newNetwork(nil, nil, nopStepper), StateLeader},
  99. {newNetwork(nil, nopStepper, nopStepper), StateCandidate},
  100. {newNetwork(nil, nopStepper, nopStepper, nil), StateCandidate},
  101. {newNetwork(nil, nopStepper, nopStepper, nil, nil), StateLeader},
  102. // three logs further along than 0
  103. {newNetwork(nil, ents(1), ents(2), ents(1, 3), nil), StateFollower},
  104. // logs converge
  105. {newNetwork(ents(1), nil, ents(2), ents(1), nil), StateLeader},
  106. }
  107. for i, tt := range tests {
  108. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  109. sm := tt.network.peers[1].(*raft)
  110. if sm.state != tt.state {
  111. t.Errorf("#%d: state = %s, want %s", i, sm.state, tt.state)
  112. }
  113. if g := sm.Term; g != 1 {
  114. t.Errorf("#%d: term = %d, want %d", i, g, 1)
  115. }
  116. }
  117. }
  118. func TestLogReplication(t *testing.T) {
  119. tests := []struct {
  120. *network
  121. msgs []pb.Message
  122. wcommitted uint64
  123. }{
  124. {
  125. newNetwork(nil, nil, nil),
  126. []pb.Message{
  127. {From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}},
  128. },
  129. 2,
  130. },
  131. {
  132. newNetwork(nil, nil, nil),
  133. []pb.Message{
  134. {From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}},
  135. {From: 1, To: 2, Type: pb.MsgHup},
  136. {From: 1, To: 2, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}},
  137. },
  138. 4,
  139. },
  140. }
  141. for i, tt := range tests {
  142. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  143. for _, m := range tt.msgs {
  144. tt.send(m)
  145. }
  146. for j, x := range tt.network.peers {
  147. sm := x.(*raft)
  148. if sm.raftLog.committed != tt.wcommitted {
  149. t.Errorf("#%d.%d: committed = %d, want %d", i, j, sm.raftLog.committed, tt.wcommitted)
  150. }
  151. ents := []pb.Entry{}
  152. for _, e := range nextEnts(sm) {
  153. if e.Data != nil {
  154. ents = append(ents, e)
  155. }
  156. }
  157. props := []pb.Message{}
  158. for _, m := range tt.msgs {
  159. if m.Type == pb.MsgProp {
  160. props = append(props, m)
  161. }
  162. }
  163. for k, m := range props {
  164. if !bytes.Equal(ents[k].Data, m.Entries[0].Data) {
  165. t.Errorf("#%d.%d: data = %d, want %d", i, j, ents[k].Data, m.Entries[0].Data)
  166. }
  167. }
  168. }
  169. }
  170. }
  171. func TestSingleNodeCommit(t *testing.T) {
  172. tt := newNetwork(nil)
  173. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  174. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  175. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  176. sm := tt.peers[1].(*raft)
  177. if sm.raftLog.committed != 3 {
  178. t.Errorf("committed = %d, want %d", sm.raftLog.committed, 3)
  179. }
  180. }
  181. // TestCannotCommitWithoutNewTermEntry tests the entries cannot be committed
  182. // when leader changes, no new proposal comes in and ChangeTerm proposal is
  183. // filtered.
  184. func TestCannotCommitWithoutNewTermEntry(t *testing.T) {
  185. tt := newNetwork(nil, nil, nil, nil, nil)
  186. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  187. // 0 cannot reach 2,3,4
  188. tt.cut(1, 3)
  189. tt.cut(1, 4)
  190. tt.cut(1, 5)
  191. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  192. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  193. sm := tt.peers[1].(*raft)
  194. if sm.raftLog.committed != 1 {
  195. t.Errorf("committed = %d, want %d", sm.raftLog.committed, 1)
  196. }
  197. // network recovery
  198. tt.recover()
  199. // avoid committing ChangeTerm proposal
  200. tt.ignore(pb.MsgApp)
  201. // elect 1 as the new leader with term 2
  202. tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup})
  203. // no log entries from previous term should be committed
  204. sm = tt.peers[2].(*raft)
  205. if sm.raftLog.committed != 1 {
  206. t.Errorf("committed = %d, want %d", sm.raftLog.committed, 1)
  207. }
  208. tt.recover()
  209. // still be able to append a entry
  210. tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  211. if sm.raftLog.committed != 5 {
  212. t.Errorf("committed = %d, want %d", sm.raftLog.committed, 5)
  213. }
  214. }
  215. // TestCommitWithoutNewTermEntry tests the entries could be committed
  216. // when leader changes, no new proposal comes in.
  217. func TestCommitWithoutNewTermEntry(t *testing.T) {
  218. tt := newNetwork(nil, nil, nil, nil, nil)
  219. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  220. // 0 cannot reach 2,3,4
  221. tt.cut(1, 3)
  222. tt.cut(1, 4)
  223. tt.cut(1, 5)
  224. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  225. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  226. sm := tt.peers[1].(*raft)
  227. if sm.raftLog.committed != 1 {
  228. t.Errorf("committed = %d, want %d", sm.raftLog.committed, 1)
  229. }
  230. // network recovery
  231. tt.recover()
  232. // elect 1 as the new leader with term 2
  233. // after append a ChangeTerm entry from the current term, all entries
  234. // should be committed
  235. tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup})
  236. if sm.raftLog.committed != 4 {
  237. t.Errorf("committed = %d, want %d", sm.raftLog.committed, 4)
  238. }
  239. }
  240. func TestDuelingCandidates(t *testing.T) {
  241. a := newRaft(1, []uint64{1, 2, 3}, 10, 1)
  242. b := newRaft(2, []uint64{1, 2, 3}, 10, 1)
  243. c := newRaft(3, []uint64{1, 2, 3}, 10, 1)
  244. nt := newNetwork(a, b, c)
  245. nt.cut(1, 3)
  246. nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  247. nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
  248. nt.recover()
  249. nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
  250. wlog := &raftLog{ents: []pb.Entry{{}, pb.Entry{Data: nil, Term: 1, Index: 1}}, committed: 1}
  251. tests := []struct {
  252. sm *raft
  253. state StateType
  254. term uint64
  255. raftLog *raftLog
  256. }{
  257. {a, StateFollower, 2, wlog},
  258. {b, StateFollower, 2, wlog},
  259. {c, StateFollower, 2, newLog()},
  260. }
  261. for i, tt := range tests {
  262. if g := tt.sm.state; g != tt.state {
  263. t.Errorf("#%d: state = %s, want %s", i, g, tt.state)
  264. }
  265. if g := tt.sm.Term; g != tt.term {
  266. t.Errorf("#%d: term = %d, want %d", i, g, tt.term)
  267. }
  268. base := ltoa(tt.raftLog)
  269. if sm, ok := nt.peers[1+uint64(i)].(*raft); ok {
  270. l := ltoa(sm.raftLog)
  271. if g := diffu(base, l); g != "" {
  272. t.Errorf("#%d: diff:\n%s", i, g)
  273. }
  274. } else {
  275. t.Logf("#%d: empty log", i)
  276. }
  277. }
  278. }
  279. func TestCandidateConcede(t *testing.T) {
  280. tt := newNetwork(nil, nil, nil)
  281. tt.isolate(1)
  282. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  283. tt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
  284. // heal the partition
  285. tt.recover()
  286. data := []byte("force follower")
  287. // send a proposal to 2 to flush out a msgApp to 0
  288. tt.send(pb.Message{From: 3, To: 3, Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
  289. a := tt.peers[1].(*raft)
  290. if g := a.state; g != StateFollower {
  291. t.Errorf("state = %s, want %s", g, StateFollower)
  292. }
  293. if g := a.Term; g != 1 {
  294. t.Errorf("term = %d, want %d", g, 1)
  295. }
  296. wantLog := ltoa(&raftLog{ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}}, committed: 2})
  297. for i, p := range tt.peers {
  298. if sm, ok := p.(*raft); ok {
  299. l := ltoa(sm.raftLog)
  300. if g := diffu(wantLog, l); g != "" {
  301. t.Errorf("#%d: diff:\n%s", i, g)
  302. }
  303. } else {
  304. t.Logf("#%d: empty log", i)
  305. }
  306. }
  307. }
  308. func TestSingleNodeCandidate(t *testing.T) {
  309. tt := newNetwork(nil)
  310. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  311. sm := tt.peers[1].(*raft)
  312. if sm.state != StateLeader {
  313. t.Errorf("state = %d, want %d", sm.state, StateLeader)
  314. }
  315. }
  316. func TestOldMessages(t *testing.T) {
  317. tt := newNetwork(nil, nil, nil)
  318. // make 0 leader @ term 3
  319. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  320. tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup})
  321. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  322. // pretend we're an old leader trying to make progress
  323. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgApp, Term: 1, Entries: []pb.Entry{{Term: 1}}})
  324. l := &raftLog{
  325. ents: []pb.Entry{
  326. {}, {Data: nil, Term: 1, Index: 1},
  327. {Data: nil, Term: 2, Index: 2}, {Data: nil, Term: 3, Index: 3},
  328. },
  329. committed: 3,
  330. }
  331. base := ltoa(l)
  332. for i, p := range tt.peers {
  333. if sm, ok := p.(*raft); ok {
  334. l := ltoa(sm.raftLog)
  335. if g := diffu(base, l); g != "" {
  336. t.Errorf("#%d: diff:\n%s", i, g)
  337. }
  338. } else {
  339. t.Logf("#%d: empty log", i)
  340. }
  341. }
  342. }
  343. // TestOldMessagesReply - optimization - reply with new term.
  344. func TestProposal(t *testing.T) {
  345. tests := []struct {
  346. *network
  347. success bool
  348. }{
  349. {newNetwork(nil, nil, nil), true},
  350. {newNetwork(nil, nil, nopStepper), true},
  351. {newNetwork(nil, nopStepper, nopStepper), false},
  352. {newNetwork(nil, nopStepper, nopStepper, nil), false},
  353. {newNetwork(nil, nopStepper, nopStepper, nil, nil), true},
  354. }
  355. for i, tt := range tests {
  356. send := func(m pb.Message) {
  357. defer func() {
  358. // only recover is we expect it to panic so
  359. // panics we don't expect go up.
  360. if !tt.success {
  361. e := recover()
  362. if e != nil {
  363. t.Logf("#%d: err: %s", i, e)
  364. }
  365. }
  366. }()
  367. tt.send(m)
  368. }
  369. data := []byte("somedata")
  370. // promote 0 the leader
  371. send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  372. send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
  373. wantLog := newLog()
  374. if tt.success {
  375. wantLog = &raftLog{ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}}, committed: 2}
  376. }
  377. base := ltoa(wantLog)
  378. for i, p := range tt.peers {
  379. if sm, ok := p.(*raft); ok {
  380. l := ltoa(sm.raftLog)
  381. if g := diffu(base, l); g != "" {
  382. t.Errorf("#%d: diff:\n%s", i, g)
  383. }
  384. } else {
  385. t.Logf("#%d: empty log", i)
  386. }
  387. }
  388. sm := tt.network.peers[1].(*raft)
  389. if g := sm.Term; g != 1 {
  390. t.Errorf("#%d: term = %d, want %d", i, g, 1)
  391. }
  392. }
  393. }
  394. func TestProposalByProxy(t *testing.T) {
  395. data := []byte("somedata")
  396. tests := []*network{
  397. newNetwork(nil, nil, nil),
  398. newNetwork(nil, nil, nopStepper),
  399. }
  400. for i, tt := range tests {
  401. // promote 0 the leader
  402. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  403. // propose via follower
  404. tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
  405. wantLog := &raftLog{ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Data: data, Index: 2}}, committed: 2}
  406. base := ltoa(wantLog)
  407. for i, p := range tt.peers {
  408. if sm, ok := p.(*raft); ok {
  409. l := ltoa(sm.raftLog)
  410. if g := diffu(base, l); g != "" {
  411. t.Errorf("#%d: diff:\n%s", i, g)
  412. }
  413. } else {
  414. t.Logf("#%d: empty log", i)
  415. }
  416. }
  417. sm := tt.peers[1].(*raft)
  418. if g := sm.Term; g != 1 {
  419. t.Errorf("#%d: term = %d, want %d", i, g, 1)
  420. }
  421. }
  422. }
  423. func TestCompact(t *testing.T) {
  424. tests := []struct {
  425. compacti uint64
  426. nodes []uint64
  427. snapd []byte
  428. wpanic bool
  429. }{
  430. {1, []uint64{1, 2, 3}, []byte("some data"), false},
  431. {2, []uint64{1, 2, 3}, []byte("some data"), false},
  432. {4, []uint64{1, 2, 3}, []byte("some data"), true}, // compact out of range
  433. }
  434. for i, tt := range tests {
  435. func() {
  436. defer func() {
  437. if r := recover(); r != nil {
  438. if tt.wpanic != true {
  439. t.Errorf("%d: panic = %v, want %v", i, true, tt.wpanic)
  440. }
  441. }
  442. }()
  443. sm := &raft{
  444. state: StateLeader,
  445. raftLog: &raftLog{
  446. committed: 2,
  447. applied: 2,
  448. ents: []pb.Entry{{}, {Term: 1}, {Term: 1}, {Term: 1}},
  449. },
  450. }
  451. sm.compact(tt.compacti, tt.nodes, tt.snapd)
  452. sort.Sort(uint64Slice(sm.raftLog.snapshot.Nodes))
  453. if sm.raftLog.offset != tt.compacti {
  454. t.Errorf("%d: log.offset = %d, want %d", i, sm.raftLog.offset, tt.compacti)
  455. }
  456. if !reflect.DeepEqual(sm.raftLog.snapshot.Nodes, tt.nodes) {
  457. t.Errorf("%d: snap.nodes = %v, want %v", i, sm.raftLog.snapshot.Nodes, tt.nodes)
  458. }
  459. if !reflect.DeepEqual(sm.raftLog.snapshot.Data, tt.snapd) {
  460. t.Errorf("%d: snap.data = %v, want %v", i, sm.raftLog.snapshot.Data, tt.snapd)
  461. }
  462. }()
  463. }
  464. }
  465. func TestCommit(t *testing.T) {
  466. tests := []struct {
  467. matches []uint64
  468. logs []pb.Entry
  469. smTerm uint64
  470. w uint64
  471. }{
  472. // single
  473. {[]uint64{1}, []pb.Entry{{}, {Term: 1}}, 1, 1},
  474. {[]uint64{1}, []pb.Entry{{}, {Term: 1}}, 2, 0},
  475. {[]uint64{2}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 2, 2},
  476. {[]uint64{1}, []pb.Entry{{}, {Term: 2}}, 2, 1},
  477. // odd
  478. {[]uint64{2, 1, 1}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
  479. {[]uint64{2, 1, 1}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
  480. {[]uint64{2, 1, 2}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 2, 2},
  481. {[]uint64{2, 1, 2}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
  482. // even
  483. {[]uint64{2, 1, 1, 1}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
  484. {[]uint64{2, 1, 1, 1}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
  485. {[]uint64{2, 1, 1, 2}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
  486. {[]uint64{2, 1, 1, 2}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
  487. {[]uint64{2, 1, 2, 2}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 2, 2},
  488. {[]uint64{2, 1, 2, 2}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
  489. }
  490. for i, tt := range tests {
  491. prs := make(map[uint64]*progress)
  492. for j := 0; j < len(tt.matches); j++ {
  493. prs[uint64(j)] = &progress{tt.matches[j], tt.matches[j] + 1}
  494. }
  495. sm := &raft{raftLog: &raftLog{ents: tt.logs}, prs: prs, HardState: pb.HardState{Term: tt.smTerm}}
  496. sm.maybeCommit()
  497. if g := sm.raftLog.committed; g != tt.w {
  498. t.Errorf("#%d: committed = %d, want %d", i, g, tt.w)
  499. }
  500. }
  501. }
  502. func TestIsElectionTimeout(t *testing.T) {
  503. tests := []struct {
  504. elapse int
  505. wprobability float64
  506. round bool
  507. }{
  508. {5, 0, false},
  509. {13, 0.3, true},
  510. {15, 0.5, true},
  511. {18, 0.8, true},
  512. {20, 1, false},
  513. }
  514. for i, tt := range tests {
  515. sm := newRaft(1, []uint64{1}, 10, 1)
  516. sm.elapsed = tt.elapse
  517. c := 0
  518. for j := 0; j < 10000; j++ {
  519. if sm.isElectionTimeout() {
  520. c++
  521. }
  522. }
  523. got := float64(c) / 10000.0
  524. if tt.round {
  525. got = math.Floor(got*10+0.5) / 10.0
  526. }
  527. if got != tt.wprobability {
  528. t.Errorf("#%d: possibility = %v, want %v", i, got, tt.wprobability)
  529. }
  530. }
  531. }
  532. // ensure that the Step function ignores the message from old term and does not pass it to the
  533. // acutal stepX function.
  534. func TestStepIgnoreOldTermMsg(t *testing.T) {
  535. called := false
  536. fakeStep := func(r *raft, m pb.Message) {
  537. called = true
  538. }
  539. sm := newRaft(1, []uint64{1}, 10, 1)
  540. sm.step = fakeStep
  541. sm.Term = 2
  542. sm.Step(pb.Message{Type: pb.MsgApp, Term: sm.Term - 1})
  543. if called == true {
  544. t.Errorf("stepFunc called = %v , want %v", called, false)
  545. }
  546. }
  547. // TestHandleMsgApp ensures:
  548. // 1. Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm.
  549. // 2. If an existing entry conflicts with a new one (same index but different terms),
  550. // delete the existing entry and all that follow it; append any new entries not already in the log.
  551. // 3. If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry).
  552. func TestHandleMsgApp(t *testing.T) {
  553. tests := []struct {
  554. m pb.Message
  555. wIndex uint64
  556. wCommit uint64
  557. wReject bool
  558. }{
  559. // Ensure 1
  560. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 3, Index: 2, Commit: 3}, 2, 0, true}, // previous log mismatch
  561. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 3, Index: 3, Commit: 3}, 2, 0, true}, // previous log non-exist
  562. // Ensure 2
  563. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 1, Index: 1, Commit: 1}, 2, 1, false},
  564. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 0, Index: 0, Commit: 1, Entries: []pb.Entry{{Term: 2}}}, 1, 1, false},
  565. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 3, Entries: []pb.Entry{{Term: 2}, {Term: 2}}}, 4, 3, false},
  566. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 4, Entries: []pb.Entry{{Term: 2}}}, 3, 3, false},
  567. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 1, Index: 1, Commit: 4, Entries: []pb.Entry{{Term: 2}}}, 2, 2, false},
  568. // Ensure 3
  569. {pb.Message{Type: pb.MsgApp, Term: 1, LogTerm: 1, Index: 1, Commit: 3}, 2, 1, false}, // match entry 1, commit upto last new entry 1
  570. {pb.Message{Type: pb.MsgApp, Term: 1, LogTerm: 1, Index: 1, Commit: 3, Entries: []pb.Entry{{Term: 2}}}, 2, 2, false}, // match entry 1, commit upto last new entry 2
  571. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 3}, 2, 2, false}, // match entry 2, commit upto last new entry 2
  572. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 4}, 2, 2, false}, // commit upto log.last()
  573. }
  574. for i, tt := range tests {
  575. sm := &raft{
  576. state: StateFollower,
  577. HardState: pb.HardState{Term: 2},
  578. raftLog: &raftLog{committed: 0, ents: []pb.Entry{{}, {Term: 1}, {Term: 2}}},
  579. }
  580. sm.handleAppendEntries(tt.m)
  581. if sm.raftLog.lastIndex() != tt.wIndex {
  582. t.Errorf("#%d: lastIndex = %d, want %d", i, sm.raftLog.lastIndex(), tt.wIndex)
  583. }
  584. if sm.raftLog.committed != tt.wCommit {
  585. t.Errorf("#%d: committed = %d, want %d", i, sm.raftLog.committed, tt.wCommit)
  586. }
  587. m := sm.readMessages()
  588. if len(m) != 1 {
  589. t.Fatalf("#%d: msg = nil, want 1", i)
  590. }
  591. if m[0].Reject != tt.wReject {
  592. t.Errorf("#%d: reject = %v, want %v", i, m[0].Reject, tt.wReject)
  593. }
  594. }
  595. }
  596. func TestRecvMsgVote(t *testing.T) {
  597. tests := []struct {
  598. state StateType
  599. i, term uint64
  600. voteFor uint64
  601. wreject bool
  602. }{
  603. {StateFollower, 0, 0, None, true},
  604. {StateFollower, 0, 1, None, true},
  605. {StateFollower, 0, 2, None, true},
  606. {StateFollower, 0, 3, None, false},
  607. {StateFollower, 1, 0, None, true},
  608. {StateFollower, 1, 1, None, true},
  609. {StateFollower, 1, 2, None, true},
  610. {StateFollower, 1, 3, None, false},
  611. {StateFollower, 2, 0, None, true},
  612. {StateFollower, 2, 1, None, true},
  613. {StateFollower, 2, 2, None, false},
  614. {StateFollower, 2, 3, None, false},
  615. {StateFollower, 3, 0, None, true},
  616. {StateFollower, 3, 1, None, true},
  617. {StateFollower, 3, 2, None, false},
  618. {StateFollower, 3, 3, None, false},
  619. {StateFollower, 3, 2, 2, false},
  620. {StateFollower, 3, 2, 1, true},
  621. {StateLeader, 3, 3, 1, true},
  622. {StateCandidate, 3, 3, 1, true},
  623. }
  624. for i, tt := range tests {
  625. sm := newRaft(1, []uint64{1}, 10, 1)
  626. sm.state = tt.state
  627. switch tt.state {
  628. case StateFollower:
  629. sm.step = stepFollower
  630. case StateCandidate:
  631. sm.step = stepCandidate
  632. case StateLeader:
  633. sm.step = stepLeader
  634. }
  635. sm.HardState = pb.HardState{Vote: tt.voteFor}
  636. sm.raftLog = &raftLog{ents: []pb.Entry{{}, {Term: 2}, {Term: 2}}}
  637. sm.Step(pb.Message{Type: pb.MsgVote, From: 2, Index: tt.i, LogTerm: tt.term})
  638. msgs := sm.readMessages()
  639. if g := len(msgs); g != 1 {
  640. t.Fatalf("#%d: len(msgs) = %d, want 1", i, g)
  641. continue
  642. }
  643. if g := msgs[0].Reject; g != tt.wreject {
  644. t.Errorf("#%d, m.Reject = %v, want %v", i, g, tt.wreject)
  645. }
  646. }
  647. }
  648. func TestStateTransition(t *testing.T) {
  649. tests := []struct {
  650. from StateType
  651. to StateType
  652. wallow bool
  653. wterm uint64
  654. wlead uint64
  655. }{
  656. {StateFollower, StateFollower, true, 1, None},
  657. {StateFollower, StateCandidate, true, 1, None},
  658. {StateFollower, StateLeader, false, 0, None},
  659. {StateCandidate, StateFollower, true, 0, None},
  660. {StateCandidate, StateCandidate, true, 1, None},
  661. {StateCandidate, StateLeader, true, 0, 1},
  662. {StateLeader, StateFollower, true, 1, None},
  663. {StateLeader, StateCandidate, false, 1, None},
  664. {StateLeader, StateLeader, true, 0, 1},
  665. }
  666. for i, tt := range tests {
  667. func() {
  668. defer func() {
  669. if r := recover(); r != nil {
  670. if tt.wallow == true {
  671. t.Errorf("%d: allow = %v, want %v", i, false, true)
  672. }
  673. }
  674. }()
  675. sm := newRaft(1, []uint64{1}, 10, 1)
  676. sm.state = tt.from
  677. switch tt.to {
  678. case StateFollower:
  679. sm.becomeFollower(tt.wterm, tt.wlead)
  680. case StateCandidate:
  681. sm.becomeCandidate()
  682. case StateLeader:
  683. sm.becomeLeader()
  684. }
  685. if sm.Term != tt.wterm {
  686. t.Errorf("%d: term = %d, want %d", i, sm.Term, tt.wterm)
  687. }
  688. if sm.lead != tt.wlead {
  689. t.Errorf("%d: lead = %d, want %d", i, sm.lead, tt.wlead)
  690. }
  691. }()
  692. }
  693. }
  694. func TestAllServerStepdown(t *testing.T) {
  695. tests := []struct {
  696. state StateType
  697. wstate StateType
  698. wterm uint64
  699. windex uint64
  700. }{
  701. {StateFollower, StateFollower, 3, 1},
  702. {StateCandidate, StateFollower, 3, 1},
  703. {StateLeader, StateFollower, 3, 2},
  704. }
  705. tmsgTypes := [...]pb.MessageType{pb.MsgVote, pb.MsgApp}
  706. tterm := uint64(3)
  707. for i, tt := range tests {
  708. sm := newRaft(1, []uint64{1, 2, 3}, 10, 1)
  709. switch tt.state {
  710. case StateFollower:
  711. sm.becomeFollower(1, None)
  712. case StateCandidate:
  713. sm.becomeCandidate()
  714. case StateLeader:
  715. sm.becomeCandidate()
  716. sm.becomeLeader()
  717. }
  718. for j, msgType := range tmsgTypes {
  719. sm.Step(pb.Message{From: 2, Type: msgType, Term: tterm, LogTerm: tterm})
  720. if sm.state != tt.wstate {
  721. t.Errorf("#%d.%d state = %v , want %v", i, j, sm.state, tt.wstate)
  722. }
  723. if sm.Term != tt.wterm {
  724. t.Errorf("#%d.%d term = %v , want %v", i, j, sm.Term, tt.wterm)
  725. }
  726. if uint64(len(sm.raftLog.ents)) != tt.windex {
  727. t.Errorf("#%d.%d index = %v , want %v", i, j, len(sm.raftLog.ents), tt.windex)
  728. }
  729. wlead := uint64(2)
  730. if msgType == pb.MsgVote {
  731. wlead = None
  732. }
  733. if sm.lead != wlead {
  734. t.Errorf("#%d, sm.lead = %d, want %d", i, sm.lead, None)
  735. }
  736. }
  737. }
  738. }
  739. func TestLeaderAppResp(t *testing.T) {
  740. // initial progress: match = 0; netx = 3
  741. tests := []struct {
  742. index uint64
  743. reject bool
  744. // progress
  745. wmatch uint64
  746. wnext uint64
  747. // message
  748. wmsgNum int
  749. windex uint64
  750. wcommitted uint64
  751. }{
  752. {3, true, 0, 3, 0, 0, 0}, // stale resp; no replies
  753. {2, true, 0, 2, 1, 1, 0}, // denied resp; leader does not commit; decrese next and send probing msg
  754. {2, false, 2, 3, 2, 2, 2}, // accept resp; leader commits; broadcast with commit index
  755. {0, false, 0, 3, 0, 0, 0}, // ignore heartbeat replies
  756. }
  757. for i, tt := range tests {
  758. // sm term is 1 after it becomes the leader.
  759. // thus the last log term must be 1 to be committed.
  760. sm := newRaft(1, []uint64{1, 2, 3}, 10, 1)
  761. sm.raftLog = &raftLog{ents: []pb.Entry{{}, {Term: 0}, {Term: 1}}}
  762. sm.becomeCandidate()
  763. sm.becomeLeader()
  764. sm.readMessages()
  765. sm.Step(pb.Message{From: 2, Type: pb.MsgAppResp, Index: tt.index, Term: sm.Term, Reject: tt.reject})
  766. p := sm.prs[2]
  767. if p.match != tt.wmatch {
  768. t.Errorf("#%d match = %d, want %d", i, p.match, tt.wmatch)
  769. }
  770. if p.next != tt.wnext {
  771. t.Errorf("#%d next = %d, want %d", i, p.next, tt.wnext)
  772. }
  773. msgs := sm.readMessages()
  774. if len(msgs) != tt.wmsgNum {
  775. t.Errorf("#%d msgNum = %d, want %d", i, len(msgs), tt.wmsgNum)
  776. }
  777. for j, msg := range msgs {
  778. if msg.Index != tt.windex {
  779. t.Errorf("#%d.%d index = %d, want %d", i, j, msg.Index, tt.windex)
  780. }
  781. if msg.Commit != tt.wcommitted {
  782. t.Errorf("#%d.%d commit = %d, want %d", i, j, msg.Commit, tt.wcommitted)
  783. }
  784. }
  785. }
  786. }
  787. // When the leader receives a heartbeat tick, it should
  788. // send a msgApp with m.Index = 0, m.LogTerm=0 and empty entries.
  789. func TestBcastBeat(t *testing.T) {
  790. offset := uint64(1000)
  791. // make a state machine with log.offset = 1000
  792. s := pb.Snapshot{
  793. Index: offset,
  794. Term: 1,
  795. Nodes: []uint64{1, 2, 3},
  796. }
  797. sm := newRaft(1, []uint64{1, 2, 3}, 10, 1)
  798. sm.Term = 1
  799. sm.restore(s)
  800. sm.becomeCandidate()
  801. sm.becomeLeader()
  802. for i := 0; i < 10; i++ {
  803. sm.appendEntry(pb.Entry{})
  804. }
  805. sm.Step(pb.Message{Type: pb.MsgBeat})
  806. msgs := sm.readMessages()
  807. if len(msgs) != 2 {
  808. t.Fatalf("len(msgs) = %v, want 1", len(msgs))
  809. }
  810. tomap := map[uint64]bool{2: true, 3: true}
  811. for i, m := range msgs {
  812. if m.Type != pb.MsgApp {
  813. t.Fatalf("#%d: type = %v, want = %v", i, m.Type, pb.MsgApp)
  814. }
  815. if m.Index != 0 {
  816. t.Fatalf("#%d: prevIndex = %d, want %d", i, m.Index, 0)
  817. }
  818. if m.LogTerm != 0 {
  819. t.Fatalf("#%d: prevTerm = %d, want %d", i, m.LogTerm, 0)
  820. }
  821. if !tomap[m.To] {
  822. t.Fatalf("#%d: unexpected to %d", i, m.To)
  823. } else {
  824. delete(tomap, m.To)
  825. }
  826. if len(m.Entries) != 0 {
  827. t.Fatalf("#%d: len(entries) = %d, want 0", i, len(m.Entries))
  828. }
  829. }
  830. }
  831. // tests the output of the statemachine when receiving msgBeat
  832. func TestRecvMsgBeat(t *testing.T) {
  833. tests := []struct {
  834. state StateType
  835. wMsg int
  836. }{
  837. {StateLeader, 2},
  838. // candidate and follower should ignore msgBeat
  839. {StateCandidate, 0},
  840. {StateFollower, 0},
  841. }
  842. for i, tt := range tests {
  843. sm := newRaft(1, []uint64{1, 2, 3}, 10, 1)
  844. sm.raftLog = &raftLog{ents: []pb.Entry{{}, {Term: 0}, {Term: 1}}}
  845. sm.Term = 1
  846. sm.state = tt.state
  847. switch tt.state {
  848. case StateFollower:
  849. sm.step = stepFollower
  850. case StateCandidate:
  851. sm.step = stepCandidate
  852. case StateLeader:
  853. sm.step = stepLeader
  854. }
  855. sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
  856. msgs := sm.readMessages()
  857. if len(msgs) != tt.wMsg {
  858. t.Errorf("%d: len(msgs) = %d, want %d", i, len(msgs), tt.wMsg)
  859. }
  860. for _, m := range msgs {
  861. if m.Type != pb.MsgApp {
  862. t.Errorf("%d: msg.type = %v, want %v", i, m.Type, pb.MsgApp)
  863. }
  864. }
  865. }
  866. }
  867. func TestRestore(t *testing.T) {
  868. s := pb.Snapshot{
  869. Index: 11, // magic number
  870. Term: 11, // magic number
  871. Nodes: []uint64{1, 2, 3},
  872. }
  873. sm := newRaft(1, []uint64{1, 2}, 10, 1)
  874. if ok := sm.restore(s); !ok {
  875. t.Fatal("restore fail, want succeed")
  876. }
  877. if sm.raftLog.lastIndex() != s.Index {
  878. t.Errorf("log.lastIndex = %d, want %d", sm.raftLog.lastIndex(), s.Index)
  879. }
  880. if sm.raftLog.term(s.Index) != s.Term {
  881. t.Errorf("log.lastTerm = %d, want %d", sm.raftLog.term(s.Index), s.Term)
  882. }
  883. sg := sm.nodes()
  884. sort.Sort(uint64Slice(sg))
  885. if !reflect.DeepEqual(sg, s.Nodes) {
  886. t.Errorf("sm.Nodes = %+v, want %+v", sg, s.Nodes)
  887. }
  888. if !reflect.DeepEqual(sm.raftLog.snapshot, s) {
  889. t.Errorf("snapshot = %+v, want %+v", sm.raftLog.snapshot, s)
  890. }
  891. if ok := sm.restore(s); ok {
  892. t.Fatal("restore succeed, want fail")
  893. }
  894. }
  895. func TestProvideSnap(t *testing.T) {
  896. s := pb.Snapshot{
  897. Index: 11, // magic number
  898. Term: 11, // magic number
  899. Nodes: []uint64{1, 2},
  900. }
  901. sm := newRaft(1, []uint64{1}, 10, 1)
  902. // restore the statemachin from a snapshot
  903. // so it has a compacted log and a snapshot
  904. sm.restore(s)
  905. sm.becomeCandidate()
  906. sm.becomeLeader()
  907. // force set the next of node 1, so that
  908. // node 1 needs a snapshot
  909. sm.prs[2].next = sm.raftLog.offset
  910. sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs[2].next - 1, Reject: true})
  911. msgs := sm.readMessages()
  912. if len(msgs) != 1 {
  913. t.Fatalf("len(msgs) = %d, want 1", len(msgs))
  914. }
  915. m := msgs[0]
  916. if m.Type != pb.MsgSnap {
  917. t.Errorf("m.Type = %v, want %v", m.Type, pb.MsgSnap)
  918. }
  919. }
  920. func TestRestoreFromSnapMsg(t *testing.T) {
  921. s := pb.Snapshot{
  922. Index: 11, // magic number
  923. Term: 11, // magic number
  924. Nodes: []uint64{1, 2},
  925. }
  926. m := pb.Message{Type: pb.MsgSnap, From: 1, Term: 2, Snapshot: s}
  927. sm := newRaft(2, []uint64{1, 2}, 10, 1)
  928. sm.Step(m)
  929. if !reflect.DeepEqual(sm.raftLog.snapshot, s) {
  930. t.Errorf("snapshot = %+v, want %+v", sm.raftLog.snapshot, s)
  931. }
  932. }
  933. func TestSlowNodeRestore(t *testing.T) {
  934. nt := newNetwork(nil, nil, nil)
  935. nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  936. nt.isolate(3)
  937. for j := 0; j <= 100; j++ {
  938. nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
  939. }
  940. lead := nt.peers[1].(*raft)
  941. nextEnts(lead)
  942. lead.compact(lead.raftLog.applied, lead.nodes(), nil)
  943. nt.recover()
  944. // trigger a snapshot
  945. nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
  946. follower := nt.peers[3].(*raft)
  947. if !reflect.DeepEqual(follower.raftLog.snapshot, lead.raftLog.snapshot) {
  948. t.Errorf("follower.snap = %+v, want %+v", follower.raftLog.snapshot, lead.raftLog.snapshot)
  949. }
  950. // trigger a commit
  951. nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
  952. if follower.raftLog.committed != lead.raftLog.committed {
  953. t.Errorf("follower.comitted = %d, want %d", follower.raftLog.committed, lead.raftLog.committed)
  954. }
  955. }
  956. // TestStepConfig tests that when raft step msgProp in EntryConfChange type,
  957. // it appends the entry to log and sets pendingConf to be true.
  958. func TestStepConfig(t *testing.T) {
  959. // a raft that cannot make progress
  960. r := newRaft(1, []uint64{1, 2}, 10, 1)
  961. r.becomeCandidate()
  962. r.becomeLeader()
  963. index := r.raftLog.lastIndex()
  964. r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
  965. if g := r.raftLog.lastIndex(); g != index+1 {
  966. t.Errorf("index = %d, want %d", g, index+1)
  967. }
  968. if r.pendingConf != true {
  969. t.Errorf("pendingConf = %v, want true", r.pendingConf)
  970. }
  971. }
  972. // TestStepIgnoreConfig tests that if raft step the second msgProp in
  973. // EntryConfChange type when the first one is uncommitted, the node will deny
  974. // the proposal and keep its original state.
  975. func TestStepIgnoreConfig(t *testing.T) {
  976. // a raft that cannot make progress
  977. r := newRaft(1, []uint64{1, 2}, 10, 1)
  978. r.becomeCandidate()
  979. r.becomeLeader()
  980. r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
  981. index := r.raftLog.lastIndex()
  982. pendingConf := r.pendingConf
  983. r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
  984. if g := r.raftLog.lastIndex(); g != index {
  985. t.Errorf("index = %d, want %d", g, index)
  986. }
  987. if r.pendingConf != pendingConf {
  988. t.Errorf("pendingConf = %v, want %v", r.pendingConf, pendingConf)
  989. }
  990. }
  991. // TestRecoverPendingConfig tests that new leader recovers its pendingConf flag
  992. // based on uncommitted entries.
  993. func TestRecoverPendingConfig(t *testing.T) {
  994. tests := []struct {
  995. entType pb.EntryType
  996. wpending bool
  997. }{
  998. {pb.EntryNormal, false},
  999. {pb.EntryConfChange, true},
  1000. }
  1001. for i, tt := range tests {
  1002. r := newRaft(1, []uint64{1, 2}, 10, 1)
  1003. r.appendEntry(pb.Entry{Type: tt.entType})
  1004. r.becomeCandidate()
  1005. r.becomeLeader()
  1006. if r.pendingConf != tt.wpending {
  1007. t.Errorf("#%d: pendingConf = %v, want %v", i, r.pendingConf, tt.wpending)
  1008. }
  1009. }
  1010. }
  1011. // TestRecoverDoublePendingConfig tests that new leader will panic if
  1012. // there exist two uncommitted config entries.
  1013. func TestRecoverDoublePendingConfig(t *testing.T) {
  1014. func() {
  1015. defer func() {
  1016. if err := recover(); err == nil {
  1017. t.Errorf("expect panic, but nothing happens")
  1018. }
  1019. }()
  1020. r := newRaft(1, []uint64{1, 2}, 10, 1)
  1021. r.appendEntry(pb.Entry{Type: pb.EntryConfChange})
  1022. r.appendEntry(pb.Entry{Type: pb.EntryConfChange})
  1023. r.becomeCandidate()
  1024. r.becomeLeader()
  1025. }()
  1026. }
  1027. // TestAddNode tests that addNode could update pendingConf and nodes correctly.
  1028. func TestAddNode(t *testing.T) {
  1029. r := newRaft(1, []uint64{1}, 10, 1)
  1030. r.pendingConf = true
  1031. r.addNode(2)
  1032. if r.pendingConf != false {
  1033. t.Errorf("pendingConf = %v, want false", r.pendingConf)
  1034. }
  1035. nodes := r.nodes()
  1036. sort.Sort(uint64Slice(nodes))
  1037. wnodes := []uint64{1, 2}
  1038. if !reflect.DeepEqual(nodes, wnodes) {
  1039. t.Errorf("nodes = %v, want %v", nodes, wnodes)
  1040. }
  1041. }
  1042. // TestRemoveNode tests that removeNode could update pendingConf, nodes and
  1043. // and removed list correctly.
  1044. func TestRemoveNode(t *testing.T) {
  1045. r := newRaft(1, []uint64{1, 2}, 10, 1)
  1046. r.pendingConf = true
  1047. r.removeNode(2)
  1048. if r.pendingConf != false {
  1049. t.Errorf("pendingConf = %v, want false", r.pendingConf)
  1050. }
  1051. w := []uint64{1}
  1052. if g := r.nodes(); !reflect.DeepEqual(g, w) {
  1053. t.Errorf("nodes = %v, want %v", g, w)
  1054. }
  1055. }
  1056. func TestPromotable(t *testing.T) {
  1057. id := uint64(1)
  1058. tests := []struct {
  1059. peers []uint64
  1060. wp bool
  1061. }{
  1062. {[]uint64{1}, true},
  1063. {[]uint64{1, 2, 3}, true},
  1064. {[]uint64{}, false},
  1065. {[]uint64{2, 3}, false},
  1066. }
  1067. for i, tt := range tests {
  1068. r := &raft{id: id, prs: make(map[uint64]*progress)}
  1069. for _, id := range tt.peers {
  1070. r.prs[id] = &progress{}
  1071. }
  1072. if g := r.promotable(); g != tt.wp {
  1073. t.Errorf("#%d: promotable = %v, want %v", i, g, tt.wp)
  1074. }
  1075. }
  1076. }
  1077. func ents(terms ...uint64) *raft {
  1078. ents := []pb.Entry{{}}
  1079. for _, term := range terms {
  1080. ents = append(ents, pb.Entry{Term: term})
  1081. }
  1082. sm := &raft{raftLog: &raftLog{ents: ents}}
  1083. sm.reset(0)
  1084. return sm
  1085. }
  1086. type network struct {
  1087. peers map[uint64]Interface
  1088. dropm map[connem]float64
  1089. ignorem map[pb.MessageType]bool
  1090. }
  1091. // newNetwork initializes a network from peers.
  1092. // A nil node will be replaced with a new *stateMachine.
  1093. // A *stateMachine will get its k, id.
  1094. // When using stateMachine, the address list is always [1, n].
  1095. func newNetwork(peers ...Interface) *network {
  1096. size := len(peers)
  1097. peerAddrs := idsBySize(size)
  1098. npeers := make(map[uint64]Interface, size)
  1099. for i, p := range peers {
  1100. id := peerAddrs[i]
  1101. switch v := p.(type) {
  1102. case nil:
  1103. sm := newRaft(id, peerAddrs, 10, 1)
  1104. npeers[id] = sm
  1105. case *raft:
  1106. v.id = id
  1107. v.prs = make(map[uint64]*progress)
  1108. for i := 0; i < size; i++ {
  1109. v.prs[peerAddrs[i]] = &progress{}
  1110. }
  1111. v.reset(0)
  1112. npeers[id] = v
  1113. case *blackHole:
  1114. npeers[id] = v
  1115. default:
  1116. panic(fmt.Sprintf("unexpected state machine type: %T", p))
  1117. }
  1118. }
  1119. return &network{
  1120. peers: npeers,
  1121. dropm: make(map[connem]float64),
  1122. ignorem: make(map[pb.MessageType]bool),
  1123. }
  1124. }
  1125. func (nw *network) send(msgs ...pb.Message) {
  1126. for len(msgs) > 0 {
  1127. m := msgs[0]
  1128. p := nw.peers[m.To]
  1129. p.Step(m)
  1130. msgs = append(msgs[1:], nw.filter(p.readMessages())...)
  1131. }
  1132. }
  1133. func (nw *network) drop(from, to uint64, perc float64) {
  1134. nw.dropm[connem{from, to}] = perc
  1135. }
  1136. func (nw *network) cut(one, other uint64) {
  1137. nw.drop(one, other, 1)
  1138. nw.drop(other, one, 1)
  1139. }
  1140. func (nw *network) isolate(id uint64) {
  1141. for i := 0; i < len(nw.peers); i++ {
  1142. nid := uint64(i) + 1
  1143. if nid != id {
  1144. nw.drop(id, nid, 1.0)
  1145. nw.drop(nid, id, 1.0)
  1146. }
  1147. }
  1148. }
  1149. func (nw *network) ignore(t pb.MessageType) {
  1150. nw.ignorem[t] = true
  1151. }
  1152. func (nw *network) recover() {
  1153. nw.dropm = make(map[connem]float64)
  1154. nw.ignorem = make(map[pb.MessageType]bool)
  1155. }
  1156. func (nw *network) filter(msgs []pb.Message) []pb.Message {
  1157. mm := []pb.Message{}
  1158. for _, m := range msgs {
  1159. if nw.ignorem[m.Type] {
  1160. continue
  1161. }
  1162. switch m.Type {
  1163. case pb.MsgHup:
  1164. // hups never go over the network, so don't drop them but panic
  1165. panic("unexpected msgHup")
  1166. default:
  1167. perc := nw.dropm[connem{m.From, m.To}]
  1168. if n := rand.Float64(); n < perc {
  1169. continue
  1170. }
  1171. }
  1172. mm = append(mm, m)
  1173. }
  1174. return mm
  1175. }
  1176. type connem struct {
  1177. from, to uint64
  1178. }
  1179. type blackHole struct{}
  1180. func (blackHole) Step(pb.Message) error { return nil }
  1181. func (blackHole) readMessages() []pb.Message { return nil }
  1182. var nopStepper = &blackHole{}
  1183. func idsBySize(size int) []uint64 {
  1184. ids := make([]uint64, size)
  1185. for i := 0; i < size; i++ {
  1186. ids[i] = 1 + uint64(i)
  1187. }
  1188. return ids
  1189. }