raft_test.go 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package raft
  14. import (
  15. "bytes"
  16. "fmt"
  17. "math"
  18. "math/rand"
  19. "reflect"
  20. "sort"
  21. "testing"
  22. pb "github.com/coreos/etcd/raft/raftpb"
  23. )
  24. // nextEnts returns the appliable entries and updates the applied index
  25. func nextEnts(r *raft) (ents []pb.Entry) {
  26. ents = r.raftLog.nextEnts()
  27. r.raftLog.resetNextEnts()
  28. return ents
  29. }
  30. type Interface interface {
  31. Step(m pb.Message) error
  32. ReadMessages() []pb.Message
  33. }
  34. func TestLeaderElection(t *testing.T) {
  35. tests := []struct {
  36. *network
  37. state StateType
  38. }{
  39. {newNetwork(nil, nil, nil), StateLeader},
  40. {newNetwork(nil, nil, nopStepper), StateLeader},
  41. {newNetwork(nil, nopStepper, nopStepper), StateCandidate},
  42. {newNetwork(nil, nopStepper, nopStepper, nil), StateCandidate},
  43. {newNetwork(nil, nopStepper, nopStepper, nil, nil), StateLeader},
  44. // three logs further along than 0
  45. {newNetwork(nil, ents(1), ents(2), ents(1, 3), nil), StateFollower},
  46. // logs converge
  47. {newNetwork(ents(1), nil, ents(2), ents(1), nil), StateLeader},
  48. }
  49. for i, tt := range tests {
  50. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  51. sm := tt.network.peers[1].(*raft)
  52. if sm.state != tt.state {
  53. t.Errorf("#%d: state = %s, want %s", i, sm.state, tt.state)
  54. }
  55. if g := sm.Term; g != 1 {
  56. t.Errorf("#%d: term = %d, want %d", i, g, 1)
  57. }
  58. }
  59. }
  60. func TestLogReplication(t *testing.T) {
  61. tests := []struct {
  62. *network
  63. msgs []pb.Message
  64. wcommitted uint64
  65. }{
  66. {
  67. newNetwork(nil, nil, nil),
  68. []pb.Message{
  69. {From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}},
  70. },
  71. 2,
  72. },
  73. {
  74. newNetwork(nil, nil, nil),
  75. []pb.Message{
  76. {From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}},
  77. {From: 1, To: 2, Type: pb.MsgHup},
  78. {From: 1, To: 2, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}},
  79. },
  80. 4,
  81. },
  82. }
  83. for i, tt := range tests {
  84. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  85. for _, m := range tt.msgs {
  86. tt.send(m)
  87. }
  88. for j, x := range tt.network.peers {
  89. sm := x.(*raft)
  90. if sm.raftLog.committed != tt.wcommitted {
  91. t.Errorf("#%d.%d: committed = %d, want %d", i, j, sm.raftLog.committed, tt.wcommitted)
  92. }
  93. ents := []pb.Entry{}
  94. for _, e := range nextEnts(sm) {
  95. if e.Data != nil {
  96. ents = append(ents, e)
  97. }
  98. }
  99. props := []pb.Message{}
  100. for _, m := range tt.msgs {
  101. if m.Type == pb.MsgProp {
  102. props = append(props, m)
  103. }
  104. }
  105. for k, m := range props {
  106. if !bytes.Equal(ents[k].Data, m.Entries[0].Data) {
  107. t.Errorf("#%d.%d: data = %d, want %d", i, j, ents[k].Data, m.Entries[0].Data)
  108. }
  109. }
  110. }
  111. }
  112. }
  113. func TestSingleNodeCommit(t *testing.T) {
  114. tt := newNetwork(nil)
  115. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  116. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  117. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  118. sm := tt.peers[1].(*raft)
  119. if sm.raftLog.committed != 3 {
  120. t.Errorf("committed = %d, want %d", sm.raftLog.committed, 3)
  121. }
  122. }
  123. // TestCannotCommitWithoutNewTermEntry tests the entries cannot be committed
  124. // when leader changes, no new proposal comes in and ChangeTerm proposal is
  125. // filtered.
  126. func TestCannotCommitWithoutNewTermEntry(t *testing.T) {
  127. tt := newNetwork(nil, nil, nil, nil, nil)
  128. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  129. // 0 cannot reach 2,3,4
  130. tt.cut(1, 3)
  131. tt.cut(1, 4)
  132. tt.cut(1, 5)
  133. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  134. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  135. sm := tt.peers[1].(*raft)
  136. if sm.raftLog.committed != 1 {
  137. t.Errorf("committed = %d, want %d", sm.raftLog.committed, 1)
  138. }
  139. // network recovery
  140. tt.recover()
  141. // avoid committing ChangeTerm proposal
  142. tt.ignore(pb.MsgApp)
  143. // elect 1 as the new leader with term 2
  144. tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup})
  145. // no log entries from previous term should be committed
  146. sm = tt.peers[2].(*raft)
  147. if sm.raftLog.committed != 1 {
  148. t.Errorf("committed = %d, want %d", sm.raftLog.committed, 1)
  149. }
  150. tt.recover()
  151. // still be able to append a entry
  152. tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  153. if sm.raftLog.committed != 5 {
  154. t.Errorf("committed = %d, want %d", sm.raftLog.committed, 5)
  155. }
  156. }
  157. // TestCommitWithoutNewTermEntry tests the entries could be committed
  158. // when leader changes, no new proposal comes in.
  159. func TestCommitWithoutNewTermEntry(t *testing.T) {
  160. tt := newNetwork(nil, nil, nil, nil, nil)
  161. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  162. // 0 cannot reach 2,3,4
  163. tt.cut(1, 3)
  164. tt.cut(1, 4)
  165. tt.cut(1, 5)
  166. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  167. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  168. sm := tt.peers[1].(*raft)
  169. if sm.raftLog.committed != 1 {
  170. t.Errorf("committed = %d, want %d", sm.raftLog.committed, 1)
  171. }
  172. // network recovery
  173. tt.recover()
  174. // elect 1 as the new leader with term 2
  175. // after append a ChangeTerm entry from the current term, all entries
  176. // should be committed
  177. tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup})
  178. if sm.raftLog.committed != 4 {
  179. t.Errorf("committed = %d, want %d", sm.raftLog.committed, 4)
  180. }
  181. }
  182. func TestDuelingCandidates(t *testing.T) {
  183. a := newRaft(1, []uint64{1, 2, 3}, 10, 1)
  184. b := newRaft(2, []uint64{1, 2, 3}, 10, 1)
  185. c := newRaft(3, []uint64{1, 2, 3}, 10, 1)
  186. nt := newNetwork(a, b, c)
  187. nt.cut(1, 3)
  188. nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  189. nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
  190. nt.recover()
  191. nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
  192. wlog := &raftLog{ents: []pb.Entry{{}, pb.Entry{Data: nil, Term: 1, Index: 1}}, committed: 1}
  193. tests := []struct {
  194. sm *raft
  195. state StateType
  196. term uint64
  197. raftLog *raftLog
  198. }{
  199. {a, StateFollower, 2, wlog},
  200. {b, StateFollower, 2, wlog},
  201. {c, StateFollower, 2, newLog()},
  202. }
  203. for i, tt := range tests {
  204. if g := tt.sm.state; g != tt.state {
  205. t.Errorf("#%d: state = %s, want %s", i, g, tt.state)
  206. }
  207. if g := tt.sm.Term; g != tt.term {
  208. t.Errorf("#%d: term = %d, want %d", i, g, tt.term)
  209. }
  210. base := ltoa(tt.raftLog)
  211. if sm, ok := nt.peers[1+uint64(i)].(*raft); ok {
  212. l := ltoa(sm.raftLog)
  213. if g := diffu(base, l); g != "" {
  214. t.Errorf("#%d: diff:\n%s", i, g)
  215. }
  216. } else {
  217. t.Logf("#%d: empty log", i)
  218. }
  219. }
  220. }
  221. func TestCandidateConcede(t *testing.T) {
  222. tt := newNetwork(nil, nil, nil)
  223. tt.isolate(1)
  224. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  225. tt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
  226. // heal the partition
  227. tt.recover()
  228. data := []byte("force follower")
  229. // send a proposal to 2 to flush out a msgApp to 0
  230. tt.send(pb.Message{From: 3, To: 3, Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
  231. a := tt.peers[1].(*raft)
  232. if g := a.state; g != StateFollower {
  233. t.Errorf("state = %s, want %s", g, StateFollower)
  234. }
  235. if g := a.Term; g != 1 {
  236. t.Errorf("term = %d, want %d", g, 1)
  237. }
  238. wantLog := ltoa(&raftLog{ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}}, committed: 2})
  239. for i, p := range tt.peers {
  240. if sm, ok := p.(*raft); ok {
  241. l := ltoa(sm.raftLog)
  242. if g := diffu(wantLog, l); g != "" {
  243. t.Errorf("#%d: diff:\n%s", i, g)
  244. }
  245. } else {
  246. t.Logf("#%d: empty log", i)
  247. }
  248. }
  249. }
  250. func TestSingleNodeCandidate(t *testing.T) {
  251. tt := newNetwork(nil)
  252. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  253. sm := tt.peers[1].(*raft)
  254. if sm.state != StateLeader {
  255. t.Errorf("state = %d, want %d", sm.state, StateLeader)
  256. }
  257. }
  258. func TestOldMessages(t *testing.T) {
  259. tt := newNetwork(nil, nil, nil)
  260. // make 0 leader @ term 3
  261. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  262. tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup})
  263. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  264. // pretend we're an old leader trying to make progress
  265. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgApp, Term: 1, Entries: []pb.Entry{{Term: 1}}})
  266. l := &raftLog{
  267. ents: []pb.Entry{
  268. {}, {Data: nil, Term: 1, Index: 1},
  269. {Data: nil, Term: 2, Index: 2}, {Data: nil, Term: 3, Index: 3},
  270. },
  271. committed: 3,
  272. }
  273. base := ltoa(l)
  274. for i, p := range tt.peers {
  275. if sm, ok := p.(*raft); ok {
  276. l := ltoa(sm.raftLog)
  277. if g := diffu(base, l); g != "" {
  278. t.Errorf("#%d: diff:\n%s", i, g)
  279. }
  280. } else {
  281. t.Logf("#%d: empty log", i)
  282. }
  283. }
  284. }
  285. // TestOldMessagesReply - optimization - reply with new term.
  286. func TestProposal(t *testing.T) {
  287. tests := []struct {
  288. *network
  289. success bool
  290. }{
  291. {newNetwork(nil, nil, nil), true},
  292. {newNetwork(nil, nil, nopStepper), true},
  293. {newNetwork(nil, nopStepper, nopStepper), false},
  294. {newNetwork(nil, nopStepper, nopStepper, nil), false},
  295. {newNetwork(nil, nopStepper, nopStepper, nil, nil), true},
  296. }
  297. for i, tt := range tests {
  298. send := func(m pb.Message) {
  299. defer func() {
  300. // only recover is we expect it to panic so
  301. // panics we don't expect go up.
  302. if !tt.success {
  303. e := recover()
  304. if e != nil {
  305. t.Logf("#%d: err: %s", i, e)
  306. }
  307. }
  308. }()
  309. tt.send(m)
  310. }
  311. data := []byte("somedata")
  312. // promote 0 the leader
  313. send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  314. send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
  315. wantLog := newLog()
  316. if tt.success {
  317. wantLog = &raftLog{ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}}, committed: 2}
  318. }
  319. base := ltoa(wantLog)
  320. for i, p := range tt.peers {
  321. if sm, ok := p.(*raft); ok {
  322. l := ltoa(sm.raftLog)
  323. if g := diffu(base, l); g != "" {
  324. t.Errorf("#%d: diff:\n%s", i, g)
  325. }
  326. } else {
  327. t.Logf("#%d: empty log", i)
  328. }
  329. }
  330. sm := tt.network.peers[1].(*raft)
  331. if g := sm.Term; g != 1 {
  332. t.Errorf("#%d: term = %d, want %d", i, g, 1)
  333. }
  334. }
  335. }
  336. func TestProposalByProxy(t *testing.T) {
  337. data := []byte("somedata")
  338. tests := []*network{
  339. newNetwork(nil, nil, nil),
  340. newNetwork(nil, nil, nopStepper),
  341. }
  342. for i, tt := range tests {
  343. // promote 0 the leader
  344. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  345. // propose via follower
  346. tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
  347. wantLog := &raftLog{ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Data: data, Index: 2}}, committed: 2}
  348. base := ltoa(wantLog)
  349. for i, p := range tt.peers {
  350. if sm, ok := p.(*raft); ok {
  351. l := ltoa(sm.raftLog)
  352. if g := diffu(base, l); g != "" {
  353. t.Errorf("#%d: diff:\n%s", i, g)
  354. }
  355. } else {
  356. t.Logf("#%d: empty log", i)
  357. }
  358. }
  359. sm := tt.peers[1].(*raft)
  360. if g := sm.Term; g != 1 {
  361. t.Errorf("#%d: term = %d, want %d", i, g, 1)
  362. }
  363. }
  364. }
  365. func TestCompact(t *testing.T) {
  366. tests := []struct {
  367. compacti uint64
  368. nodes []uint64
  369. snapd []byte
  370. wpanic bool
  371. }{
  372. {1, []uint64{1, 2, 3}, []byte("some data"), false},
  373. {2, []uint64{1, 2, 3}, []byte("some data"), false},
  374. {4, []uint64{1, 2, 3}, []byte("some data"), true}, // compact out of range
  375. }
  376. for i, tt := range tests {
  377. func() {
  378. defer func() {
  379. if r := recover(); r != nil {
  380. if tt.wpanic != true {
  381. t.Errorf("%d: panic = %v, want %v", i, true, tt.wpanic)
  382. }
  383. }
  384. }()
  385. sm := &raft{
  386. state: StateLeader,
  387. raftLog: &raftLog{
  388. committed: 2,
  389. applied: 2,
  390. ents: []pb.Entry{{}, {Term: 1}, {Term: 1}, {Term: 1}},
  391. },
  392. }
  393. sm.compact(tt.compacti, tt.nodes, tt.snapd)
  394. sort.Sort(uint64Slice(sm.raftLog.snapshot.Nodes))
  395. if sm.raftLog.offset != tt.compacti {
  396. t.Errorf("%d: log.offset = %d, want %d", i, sm.raftLog.offset, tt.compacti)
  397. }
  398. if !reflect.DeepEqual(sm.raftLog.snapshot.Nodes, tt.nodes) {
  399. t.Errorf("%d: snap.nodes = %v, want %v", i, sm.raftLog.snapshot.Nodes, tt.nodes)
  400. }
  401. if !reflect.DeepEqual(sm.raftLog.snapshot.Data, tt.snapd) {
  402. t.Errorf("%d: snap.data = %v, want %v", i, sm.raftLog.snapshot.Data, tt.snapd)
  403. }
  404. }()
  405. }
  406. }
  407. func TestCommit(t *testing.T) {
  408. tests := []struct {
  409. matches []uint64
  410. logs []pb.Entry
  411. smTerm uint64
  412. w uint64
  413. }{
  414. // single
  415. {[]uint64{1}, []pb.Entry{{}, {Term: 1}}, 1, 1},
  416. {[]uint64{1}, []pb.Entry{{}, {Term: 1}}, 2, 0},
  417. {[]uint64{2}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 2, 2},
  418. {[]uint64{1}, []pb.Entry{{}, {Term: 2}}, 2, 1},
  419. // odd
  420. {[]uint64{2, 1, 1}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
  421. {[]uint64{2, 1, 1}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
  422. {[]uint64{2, 1, 2}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 2, 2},
  423. {[]uint64{2, 1, 2}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
  424. // even
  425. {[]uint64{2, 1, 1, 1}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
  426. {[]uint64{2, 1, 1, 1}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
  427. {[]uint64{2, 1, 1, 2}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
  428. {[]uint64{2, 1, 1, 2}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
  429. {[]uint64{2, 1, 2, 2}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 2, 2},
  430. {[]uint64{2, 1, 2, 2}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
  431. }
  432. for i, tt := range tests {
  433. prs := make(map[uint64]*progress)
  434. for j := 0; j < len(tt.matches); j++ {
  435. prs[uint64(j)] = &progress{tt.matches[j], tt.matches[j] + 1}
  436. }
  437. sm := &raft{raftLog: &raftLog{ents: tt.logs}, prs: prs, HardState: pb.HardState{Term: tt.smTerm}}
  438. sm.maybeCommit()
  439. if g := sm.raftLog.committed; g != tt.w {
  440. t.Errorf("#%d: committed = %d, want %d", i, g, tt.w)
  441. }
  442. }
  443. }
  444. func TestIsElectionTimeout(t *testing.T) {
  445. tests := []struct {
  446. elapse int
  447. wprobability float64
  448. round bool
  449. }{
  450. {5, 0, false},
  451. {13, 0.3, true},
  452. {15, 0.5, true},
  453. {18, 0.8, true},
  454. {20, 1, false},
  455. }
  456. for i, tt := range tests {
  457. sm := newRaft(1, []uint64{1}, 10, 1)
  458. sm.elapsed = tt.elapse
  459. c := 0
  460. for j := 0; j < 10000; j++ {
  461. if sm.isElectionTimeout() {
  462. c++
  463. }
  464. }
  465. got := float64(c) / 10000.0
  466. if tt.round {
  467. got = math.Floor(got*10+0.5) / 10.0
  468. }
  469. if got != tt.wprobability {
  470. t.Errorf("#%d: possibility = %v, want %v", i, got, tt.wprobability)
  471. }
  472. }
  473. }
  474. // ensure that the Step function ignores the message from old term and does not pass it to the
  475. // acutal stepX function.
  476. func TestStepIgnoreOldTermMsg(t *testing.T) {
  477. called := false
  478. fakeStep := func(r *raft, m pb.Message) {
  479. called = true
  480. }
  481. sm := newRaft(1, []uint64{1}, 10, 1)
  482. sm.step = fakeStep
  483. sm.Term = 2
  484. sm.Step(pb.Message{Type: pb.MsgApp, Term: sm.Term - 1})
  485. if called == true {
  486. t.Errorf("stepFunc called = %v , want %v", called, false)
  487. }
  488. }
  489. // TestHandleMsgApp ensures:
  490. // 1. Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm.
  491. // 2. If an existing entry conflicts with a new one (same index but different terms),
  492. // delete the existing entry and all that follow it; append any new entries not already in the log.
  493. // 3. If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry).
  494. func TestHandleMsgApp(t *testing.T) {
  495. tests := []struct {
  496. m pb.Message
  497. wIndex uint64
  498. wCommit uint64
  499. wReject bool
  500. }{
  501. // Ensure 1
  502. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 3, Index: 2, Commit: 3}, 2, 0, true}, // previous log mismatch
  503. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 3, Index: 3, Commit: 3}, 2, 0, true}, // previous log non-exist
  504. // Ensure 2
  505. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 1, Index: 1, Commit: 1}, 2, 1, false},
  506. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 0, Index: 0, Commit: 1, Entries: []pb.Entry{{Term: 2}}}, 1, 1, false},
  507. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 3, Entries: []pb.Entry{{Term: 2}, {Term: 2}}}, 4, 3, false},
  508. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 4, Entries: []pb.Entry{{Term: 2}}}, 3, 3, false},
  509. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 1, Index: 1, Commit: 4, Entries: []pb.Entry{{Term: 2}}}, 2, 2, false},
  510. // Ensure 3
  511. {pb.Message{Type: pb.MsgApp, Term: 1, LogTerm: 1, Index: 1, Commit: 3}, 2, 1, false}, // match entry 1, commit upto last new entry 1
  512. {pb.Message{Type: pb.MsgApp, Term: 1, LogTerm: 1, Index: 1, Commit: 3, Entries: []pb.Entry{{Term: 2}}}, 2, 2, false}, // match entry 1, commit upto last new entry 2
  513. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 3}, 2, 2, false}, // match entry 2, commit upto last new entry 2
  514. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 4}, 2, 2, false}, // commit upto log.last()
  515. }
  516. for i, tt := range tests {
  517. sm := &raft{
  518. state: StateFollower,
  519. HardState: pb.HardState{Term: 2},
  520. raftLog: &raftLog{committed: 0, ents: []pb.Entry{{}, {Term: 1}, {Term: 2}}},
  521. }
  522. sm.handleAppendEntries(tt.m)
  523. if sm.raftLog.lastIndex() != tt.wIndex {
  524. t.Errorf("#%d: lastIndex = %d, want %d", i, sm.raftLog.lastIndex(), tt.wIndex)
  525. }
  526. if sm.raftLog.committed != tt.wCommit {
  527. t.Errorf("#%d: committed = %d, want %d", i, sm.raftLog.committed, tt.wCommit)
  528. }
  529. m := sm.ReadMessages()
  530. if len(m) != 1 {
  531. t.Fatalf("#%d: msg = nil, want 1", i)
  532. }
  533. if m[0].Reject != tt.wReject {
  534. t.Errorf("#%d: reject = %v, want %v", i, m[0].Reject, tt.wReject)
  535. }
  536. }
  537. }
  538. func TestRecvMsgVote(t *testing.T) {
  539. tests := []struct {
  540. state StateType
  541. i, term uint64
  542. voteFor uint64
  543. wreject bool
  544. }{
  545. {StateFollower, 0, 0, None, true},
  546. {StateFollower, 0, 1, None, true},
  547. {StateFollower, 0, 2, None, true},
  548. {StateFollower, 0, 3, None, false},
  549. {StateFollower, 1, 0, None, true},
  550. {StateFollower, 1, 1, None, true},
  551. {StateFollower, 1, 2, None, true},
  552. {StateFollower, 1, 3, None, false},
  553. {StateFollower, 2, 0, None, true},
  554. {StateFollower, 2, 1, None, true},
  555. {StateFollower, 2, 2, None, false},
  556. {StateFollower, 2, 3, None, false},
  557. {StateFollower, 3, 0, None, true},
  558. {StateFollower, 3, 1, None, true},
  559. {StateFollower, 3, 2, None, false},
  560. {StateFollower, 3, 3, None, false},
  561. {StateFollower, 3, 2, 2, false},
  562. {StateFollower, 3, 2, 1, true},
  563. {StateLeader, 3, 3, 1, true},
  564. {StateCandidate, 3, 3, 1, true},
  565. }
  566. for i, tt := range tests {
  567. sm := newRaft(1, []uint64{1}, 10, 1)
  568. sm.state = tt.state
  569. switch tt.state {
  570. case StateFollower:
  571. sm.step = stepFollower
  572. case StateCandidate:
  573. sm.step = stepCandidate
  574. case StateLeader:
  575. sm.step = stepLeader
  576. }
  577. sm.HardState = pb.HardState{Vote: tt.voteFor}
  578. sm.raftLog = &raftLog{ents: []pb.Entry{{}, {Term: 2}, {Term: 2}}}
  579. sm.Step(pb.Message{Type: pb.MsgVote, From: 2, Index: tt.i, LogTerm: tt.term})
  580. msgs := sm.ReadMessages()
  581. if g := len(msgs); g != 1 {
  582. t.Fatalf("#%d: len(msgs) = %d, want 1", i, g)
  583. continue
  584. }
  585. if g := msgs[0].Reject; g != tt.wreject {
  586. t.Errorf("#%d, m.Reject = %v, want %v", i, g, tt.wreject)
  587. }
  588. }
  589. }
  590. func TestStateTransition(t *testing.T) {
  591. tests := []struct {
  592. from StateType
  593. to StateType
  594. wallow bool
  595. wterm uint64
  596. wlead uint64
  597. }{
  598. {StateFollower, StateFollower, true, 1, None},
  599. {StateFollower, StateCandidate, true, 1, None},
  600. {StateFollower, StateLeader, false, 0, None},
  601. {StateCandidate, StateFollower, true, 0, None},
  602. {StateCandidate, StateCandidate, true, 1, None},
  603. {StateCandidate, StateLeader, true, 0, 1},
  604. {StateLeader, StateFollower, true, 1, None},
  605. {StateLeader, StateCandidate, false, 1, None},
  606. {StateLeader, StateLeader, true, 0, 1},
  607. }
  608. for i, tt := range tests {
  609. func() {
  610. defer func() {
  611. if r := recover(); r != nil {
  612. if tt.wallow == true {
  613. t.Errorf("%d: allow = %v, want %v", i, false, true)
  614. }
  615. }
  616. }()
  617. sm := newRaft(1, []uint64{1}, 10, 1)
  618. sm.state = tt.from
  619. switch tt.to {
  620. case StateFollower:
  621. sm.becomeFollower(tt.wterm, tt.wlead)
  622. case StateCandidate:
  623. sm.becomeCandidate()
  624. case StateLeader:
  625. sm.becomeLeader()
  626. }
  627. if sm.Term != tt.wterm {
  628. t.Errorf("%d: term = %d, want %d", i, sm.Term, tt.wterm)
  629. }
  630. if sm.lead != tt.wlead {
  631. t.Errorf("%d: lead = %d, want %d", i, sm.lead, tt.wlead)
  632. }
  633. }()
  634. }
  635. }
  636. func TestAllServerStepdown(t *testing.T) {
  637. tests := []struct {
  638. state StateType
  639. wstate StateType
  640. wterm uint64
  641. windex uint64
  642. }{
  643. {StateFollower, StateFollower, 3, 1},
  644. {StateCandidate, StateFollower, 3, 1},
  645. {StateLeader, StateFollower, 3, 2},
  646. }
  647. tmsgTypes := [...]pb.MessageType{pb.MsgVote, pb.MsgApp}
  648. tterm := uint64(3)
  649. for i, tt := range tests {
  650. sm := newRaft(1, []uint64{1, 2, 3}, 10, 1)
  651. switch tt.state {
  652. case StateFollower:
  653. sm.becomeFollower(1, None)
  654. case StateCandidate:
  655. sm.becomeCandidate()
  656. case StateLeader:
  657. sm.becomeCandidate()
  658. sm.becomeLeader()
  659. }
  660. for j, msgType := range tmsgTypes {
  661. sm.Step(pb.Message{From: 2, Type: msgType, Term: tterm, LogTerm: tterm})
  662. if sm.state != tt.wstate {
  663. t.Errorf("#%d.%d state = %v , want %v", i, j, sm.state, tt.wstate)
  664. }
  665. if sm.Term != tt.wterm {
  666. t.Errorf("#%d.%d term = %v , want %v", i, j, sm.Term, tt.wterm)
  667. }
  668. if uint64(len(sm.raftLog.ents)) != tt.windex {
  669. t.Errorf("#%d.%d index = %v , want %v", i, j, len(sm.raftLog.ents), tt.windex)
  670. }
  671. wlead := uint64(2)
  672. if msgType == pb.MsgVote {
  673. wlead = None
  674. }
  675. if sm.lead != wlead {
  676. t.Errorf("#%d, sm.lead = %d, want %d", i, sm.lead, None)
  677. }
  678. }
  679. }
  680. }
  681. func TestLeaderAppResp(t *testing.T) {
  682. tests := []struct {
  683. index uint64
  684. reject bool
  685. wmsgNum int
  686. windex uint64
  687. wcommitted uint64
  688. }{
  689. {3, true, 0, 0, 0}, // stale resp; no replies
  690. {2, true, 1, 1, 0}, // denied resp; leader does not commit; decrese next and send probing msg
  691. {2, false, 2, 2, 2}, // accept resp; leader commits; broadcast with commit index
  692. }
  693. for i, tt := range tests {
  694. // sm term is 1 after it becomes the leader.
  695. // thus the last log term must be 1 to be committed.
  696. sm := newRaft(1, []uint64{1, 2, 3}, 10, 1)
  697. sm.raftLog = &raftLog{ents: []pb.Entry{{}, {Term: 0}, {Term: 1}}}
  698. sm.becomeCandidate()
  699. sm.becomeLeader()
  700. sm.ReadMessages()
  701. sm.Step(pb.Message{From: 2, Type: pb.MsgAppResp, Index: tt.index, Term: sm.Term, Reject: tt.reject})
  702. msgs := sm.ReadMessages()
  703. if len(msgs) != tt.wmsgNum {
  704. t.Errorf("#%d msgNum = %d, want %d", i, len(msgs), tt.wmsgNum)
  705. }
  706. for j, msg := range msgs {
  707. if msg.Index != tt.windex {
  708. t.Errorf("#%d.%d index = %d, want %d", i, j, msg.Index, tt.windex)
  709. }
  710. if msg.Commit != tt.wcommitted {
  711. t.Errorf("#%d.%d commit = %d, want %d", i, j, msg.Commit, tt.wcommitted)
  712. }
  713. }
  714. }
  715. }
  716. // When the leader receives a heartbeat tick, it should
  717. // send a msgApp with m.Index = 0, m.LogTerm=0 and empty entries.
  718. func TestBcastBeat(t *testing.T) {
  719. offset := uint64(1000)
  720. // make a state machine with log.offset = 1000
  721. s := pb.Snapshot{
  722. Index: offset,
  723. Term: 1,
  724. Nodes: []uint64{1, 2, 3},
  725. }
  726. sm := newRaft(1, []uint64{1, 2, 3}, 10, 1)
  727. sm.Term = 1
  728. sm.restore(s)
  729. sm.becomeCandidate()
  730. sm.becomeLeader()
  731. for i := 0; i < 10; i++ {
  732. sm.appendEntry(pb.Entry{})
  733. }
  734. sm.Step(pb.Message{Type: pb.MsgBeat})
  735. msgs := sm.ReadMessages()
  736. if len(msgs) != 2 {
  737. t.Fatalf("len(msgs) = %v, want 1", len(msgs))
  738. }
  739. tomap := map[uint64]bool{2: true, 3: true}
  740. for i, m := range msgs {
  741. if m.Type != pb.MsgApp {
  742. t.Fatalf("#%d: type = %v, want = %v", i, m.Type, pb.MsgApp)
  743. }
  744. if m.Index != 0 {
  745. t.Fatalf("#%d: prevIndex = %d, want %d", i, m.Index, 0)
  746. }
  747. if m.LogTerm != 0 {
  748. t.Fatalf("#%d: prevTerm = %d, want %d", i, m.LogTerm, 0)
  749. }
  750. if !tomap[m.To] {
  751. t.Fatalf("#%d: unexpected to %d", i, m.To)
  752. } else {
  753. delete(tomap, m.To)
  754. }
  755. if len(m.Entries) != 0 {
  756. t.Fatalf("#%d: len(entries) = %d, want 0", i, len(m.Entries))
  757. }
  758. }
  759. }
  760. // tests the output of the statemachine when receiving msgBeat
  761. func TestRecvMsgBeat(t *testing.T) {
  762. tests := []struct {
  763. state StateType
  764. wMsg int
  765. }{
  766. {StateLeader, 2},
  767. // candidate and follower should ignore msgBeat
  768. {StateCandidate, 0},
  769. {StateFollower, 0},
  770. }
  771. for i, tt := range tests {
  772. sm := newRaft(1, []uint64{1, 2, 3}, 10, 1)
  773. sm.raftLog = &raftLog{ents: []pb.Entry{{}, {Term: 0}, {Term: 1}}}
  774. sm.Term = 1
  775. sm.state = tt.state
  776. switch tt.state {
  777. case StateFollower:
  778. sm.step = stepFollower
  779. case StateCandidate:
  780. sm.step = stepCandidate
  781. case StateLeader:
  782. sm.step = stepLeader
  783. }
  784. sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
  785. msgs := sm.ReadMessages()
  786. if len(msgs) != tt.wMsg {
  787. t.Errorf("%d: len(msgs) = %d, want %d", i, len(msgs), tt.wMsg)
  788. }
  789. for _, m := range msgs {
  790. if m.Type != pb.MsgApp {
  791. t.Errorf("%d: msg.type = %v, want %v", i, m.Type, pb.MsgApp)
  792. }
  793. }
  794. }
  795. }
  796. func TestRestore(t *testing.T) {
  797. s := pb.Snapshot{
  798. Index: 11, // magic number
  799. Term: 11, // magic number
  800. Nodes: []uint64{1, 2, 3},
  801. }
  802. sm := newRaft(1, []uint64{1, 2}, 10, 1)
  803. if ok := sm.restore(s); !ok {
  804. t.Fatal("restore fail, want succeed")
  805. }
  806. if sm.raftLog.lastIndex() != s.Index {
  807. t.Errorf("log.lastIndex = %d, want %d", sm.raftLog.lastIndex(), s.Index)
  808. }
  809. if sm.raftLog.term(s.Index) != s.Term {
  810. t.Errorf("log.lastTerm = %d, want %d", sm.raftLog.term(s.Index), s.Term)
  811. }
  812. sg := sm.nodes()
  813. sort.Sort(uint64Slice(sg))
  814. if !reflect.DeepEqual(sg, s.Nodes) {
  815. t.Errorf("sm.Nodes = %+v, want %+v", sg, s.Nodes)
  816. }
  817. if !reflect.DeepEqual(sm.raftLog.snapshot, s) {
  818. t.Errorf("snapshot = %+v, want %+v", sm.raftLog.snapshot, s)
  819. }
  820. if ok := sm.restore(s); ok {
  821. t.Fatal("restore succeed, want fail")
  822. }
  823. }
  824. func TestProvideSnap(t *testing.T) {
  825. s := pb.Snapshot{
  826. Index: 11, // magic number
  827. Term: 11, // magic number
  828. Nodes: []uint64{1, 2},
  829. }
  830. sm := newRaft(1, []uint64{1}, 10, 1)
  831. // restore the statemachin from a snapshot
  832. // so it has a compacted log and a snapshot
  833. sm.restore(s)
  834. sm.becomeCandidate()
  835. sm.becomeLeader()
  836. // force set the next of node 1, so that
  837. // node 1 needs a snapshot
  838. sm.prs[2].next = sm.raftLog.offset
  839. sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs[2].next - 1, Reject: true})
  840. msgs := sm.ReadMessages()
  841. if len(msgs) != 1 {
  842. t.Fatalf("len(msgs) = %d, want 1", len(msgs))
  843. }
  844. m := msgs[0]
  845. if m.Type != pb.MsgSnap {
  846. t.Errorf("m.Type = %v, want %v", m.Type, pb.MsgSnap)
  847. }
  848. }
  849. func TestRestoreFromSnapMsg(t *testing.T) {
  850. s := pb.Snapshot{
  851. Index: 11, // magic number
  852. Term: 11, // magic number
  853. Nodes: []uint64{1, 2},
  854. }
  855. m := pb.Message{Type: pb.MsgSnap, From: 1, Term: 2, Snapshot: s}
  856. sm := newRaft(2, []uint64{1, 2}, 10, 1)
  857. sm.Step(m)
  858. if !reflect.DeepEqual(sm.raftLog.snapshot, s) {
  859. t.Errorf("snapshot = %+v, want %+v", sm.raftLog.snapshot, s)
  860. }
  861. }
  862. func TestSlowNodeRestore(t *testing.T) {
  863. nt := newNetwork(nil, nil, nil)
  864. nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  865. nt.isolate(3)
  866. for j := 0; j <= 100; j++ {
  867. nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
  868. }
  869. lead := nt.peers[1].(*raft)
  870. nextEnts(lead)
  871. lead.compact(lead.raftLog.applied, lead.nodes(), nil)
  872. nt.recover()
  873. // trigger a snapshot
  874. nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
  875. follower := nt.peers[3].(*raft)
  876. if !reflect.DeepEqual(follower.raftLog.snapshot, lead.raftLog.snapshot) {
  877. t.Errorf("follower.snap = %+v, want %+v", follower.raftLog.snapshot, lead.raftLog.snapshot)
  878. }
  879. // trigger a commit
  880. nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
  881. if follower.raftLog.committed != lead.raftLog.committed {
  882. t.Errorf("follower.comitted = %d, want %d", follower.raftLog.committed, lead.raftLog.committed)
  883. }
  884. }
  885. // TestStepConfig tests that when raft step msgProp in EntryConfChange type,
  886. // it appends the entry to log and sets pendingConf to be true.
  887. func TestStepConfig(t *testing.T) {
  888. // a raft that cannot make progress
  889. r := newRaft(1, []uint64{1, 2}, 10, 1)
  890. r.becomeCandidate()
  891. r.becomeLeader()
  892. index := r.raftLog.lastIndex()
  893. r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
  894. if g := r.raftLog.lastIndex(); g != index+1 {
  895. t.Errorf("index = %d, want %d", g, index+1)
  896. }
  897. if r.pendingConf != true {
  898. t.Errorf("pendingConf = %v, want true", r.pendingConf)
  899. }
  900. }
  901. // TestStepIgnoreConfig tests that if raft step the second msgProp in
  902. // EntryConfChange type when the first one is uncommitted, the node will deny
  903. // the proposal and keep its original state.
  904. func TestStepIgnoreConfig(t *testing.T) {
  905. // a raft that cannot make progress
  906. r := newRaft(1, []uint64{1, 2}, 10, 1)
  907. r.becomeCandidate()
  908. r.becomeLeader()
  909. r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
  910. index := r.raftLog.lastIndex()
  911. pendingConf := r.pendingConf
  912. r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
  913. if g := r.raftLog.lastIndex(); g != index {
  914. t.Errorf("index = %d, want %d", g, index)
  915. }
  916. if r.pendingConf != pendingConf {
  917. t.Errorf("pendingConf = %v, want %v", r.pendingConf, pendingConf)
  918. }
  919. }
  920. // TestRecoverPendingConfig tests that new leader recovers its pendingConf flag
  921. // based on uncommitted entries.
  922. func TestRecoverPendingConfig(t *testing.T) {
  923. tests := []struct {
  924. entType pb.EntryType
  925. wpending bool
  926. }{
  927. {pb.EntryNormal, false},
  928. {pb.EntryConfChange, true},
  929. }
  930. for i, tt := range tests {
  931. r := newRaft(1, []uint64{1, 2}, 10, 1)
  932. r.appendEntry(pb.Entry{Type: tt.entType})
  933. r.becomeCandidate()
  934. r.becomeLeader()
  935. if r.pendingConf != tt.wpending {
  936. t.Errorf("#%d: pendingConf = %v, want %v", i, r.pendingConf, tt.wpending)
  937. }
  938. }
  939. }
  940. // TestRecoverDoublePendingConfig tests that new leader will panic if
  941. // there exist two uncommitted config entries.
  942. func TestRecoverDoublePendingConfig(t *testing.T) {
  943. func() {
  944. defer func() {
  945. if err := recover(); err == nil {
  946. t.Errorf("expect panic, but nothing happens")
  947. }
  948. }()
  949. r := newRaft(1, []uint64{1, 2}, 10, 1)
  950. r.appendEntry(pb.Entry{Type: pb.EntryConfChange})
  951. r.appendEntry(pb.Entry{Type: pb.EntryConfChange})
  952. r.becomeCandidate()
  953. r.becomeLeader()
  954. }()
  955. }
  956. // TestAddNode tests that addNode could update pendingConf and nodes correctly.
  957. func TestAddNode(t *testing.T) {
  958. r := newRaft(1, []uint64{1}, 10, 1)
  959. r.pendingConf = true
  960. r.addNode(2)
  961. if r.pendingConf != false {
  962. t.Errorf("pendingConf = %v, want false", r.pendingConf)
  963. }
  964. nodes := r.nodes()
  965. sort.Sort(uint64Slice(nodes))
  966. wnodes := []uint64{1, 2}
  967. if !reflect.DeepEqual(nodes, wnodes) {
  968. t.Errorf("nodes = %v, want %v", nodes, wnodes)
  969. }
  970. }
  971. // TestRemoveNode tests that removeNode could update pendingConf, nodes and
  972. // and removed list correctly.
  973. func TestRemoveNode(t *testing.T) {
  974. r := newRaft(1, []uint64{1, 2}, 10, 1)
  975. r.pendingConf = true
  976. r.removeNode(2)
  977. if r.pendingConf != false {
  978. t.Errorf("pendingConf = %v, want false", r.pendingConf)
  979. }
  980. w := []uint64{1}
  981. if g := r.nodes(); !reflect.DeepEqual(g, w) {
  982. t.Errorf("nodes = %v, want %v", g, w)
  983. }
  984. }
  985. func TestPromotable(t *testing.T) {
  986. id := uint64(1)
  987. tests := []struct {
  988. peers []uint64
  989. wp bool
  990. }{
  991. {[]uint64{1}, true},
  992. {[]uint64{1, 2, 3}, true},
  993. {[]uint64{}, false},
  994. {[]uint64{2, 3}, false},
  995. }
  996. for i, tt := range tests {
  997. r := &raft{id: id, prs: make(map[uint64]*progress)}
  998. for _, id := range tt.peers {
  999. r.prs[id] = &progress{}
  1000. }
  1001. if g := r.promotable(); g != tt.wp {
  1002. t.Errorf("#%d: promotable = %v, want %v", i, g, tt.wp)
  1003. }
  1004. }
  1005. }
  1006. func ents(terms ...uint64) *raft {
  1007. ents := []pb.Entry{{}}
  1008. for _, term := range terms {
  1009. ents = append(ents, pb.Entry{Term: term})
  1010. }
  1011. sm := &raft{raftLog: &raftLog{ents: ents}}
  1012. sm.reset(0)
  1013. return sm
  1014. }
  1015. type network struct {
  1016. peers map[uint64]Interface
  1017. dropm map[connem]float64
  1018. ignorem map[pb.MessageType]bool
  1019. }
  1020. // newNetwork initializes a network from peers.
  1021. // A nil node will be replaced with a new *stateMachine.
  1022. // A *stateMachine will get its k, id.
  1023. // When using stateMachine, the address list is always [0, n).
  1024. func newNetwork(peers ...Interface) *network {
  1025. size := len(peers)
  1026. peerAddrs := make([]uint64, size)
  1027. for i := 0; i < size; i++ {
  1028. peerAddrs[i] = 1 + uint64(i)
  1029. }
  1030. npeers := make(map[uint64]Interface, size)
  1031. for i, p := range peers {
  1032. id := peerAddrs[i]
  1033. switch v := p.(type) {
  1034. case nil:
  1035. sm := newRaft(id, peerAddrs, 10, 1)
  1036. npeers[id] = sm
  1037. case *raft:
  1038. v.id = id
  1039. v.prs = make(map[uint64]*progress)
  1040. for i := 0; i < size; i++ {
  1041. v.prs[peerAddrs[i]] = &progress{}
  1042. }
  1043. v.reset(0)
  1044. npeers[id] = v
  1045. case *blackHole:
  1046. npeers[id] = v
  1047. default:
  1048. panic(fmt.Sprintf("unexpected state machine type: %T", p))
  1049. }
  1050. }
  1051. return &network{
  1052. peers: npeers,
  1053. dropm: make(map[connem]float64),
  1054. ignorem: make(map[pb.MessageType]bool),
  1055. }
  1056. }
  1057. func (nw *network) send(msgs ...pb.Message) {
  1058. for len(msgs) > 0 {
  1059. m := msgs[0]
  1060. p := nw.peers[m.To]
  1061. p.Step(m)
  1062. msgs = append(msgs[1:], nw.filter(p.ReadMessages())...)
  1063. }
  1064. }
  1065. func (nw *network) drop(from, to uint64, perc float64) {
  1066. nw.dropm[connem{from, to}] = perc
  1067. }
  1068. func (nw *network) cut(one, other uint64) {
  1069. nw.drop(one, other, 1)
  1070. nw.drop(other, one, 1)
  1071. }
  1072. func (nw *network) isolate(id uint64) {
  1073. for i := 0; i < len(nw.peers); i++ {
  1074. nid := uint64(i) + 1
  1075. if nid != id {
  1076. nw.drop(id, nid, 1.0)
  1077. nw.drop(nid, id, 1.0)
  1078. }
  1079. }
  1080. }
  1081. func (nw *network) ignore(t pb.MessageType) {
  1082. nw.ignorem[t] = true
  1083. }
  1084. func (nw *network) recover() {
  1085. nw.dropm = make(map[connem]float64)
  1086. nw.ignorem = make(map[pb.MessageType]bool)
  1087. }
  1088. func (nw *network) filter(msgs []pb.Message) []pb.Message {
  1089. mm := []pb.Message{}
  1090. for _, m := range msgs {
  1091. if nw.ignorem[m.Type] {
  1092. continue
  1093. }
  1094. switch m.Type {
  1095. case pb.MsgHup:
  1096. // hups never go over the network, so don't drop them but panic
  1097. panic("unexpected msgHup")
  1098. default:
  1099. perc := nw.dropm[connem{m.From, m.To}]
  1100. if n := rand.Float64(); n < perc {
  1101. continue
  1102. }
  1103. }
  1104. mm = append(mm, m)
  1105. }
  1106. return mm
  1107. }
  1108. type connem struct {
  1109. from, to uint64
  1110. }
  1111. type blackHole struct{}
  1112. func (blackHole) Step(pb.Message) error { return nil }
  1113. func (blackHole) ReadMessages() []pb.Message { return nil }
  1114. var nopStepper = &blackHole{}