raft_test.go 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package raft
  14. import (
  15. "bytes"
  16. "fmt"
  17. "math"
  18. "math/rand"
  19. "reflect"
  20. "sort"
  21. "testing"
  22. pb "github.com/coreos/etcd/raft/raftpb"
  23. )
  24. // nextEnts returns the appliable entries and updates the applied index
  25. func nextEnts(r *raft) (ents []pb.Entry) {
  26. ents = r.raftLog.nextEnts()
  27. r.raftLog.resetNextEnts()
  28. return ents
  29. }
  30. type Interface interface {
  31. Step(m pb.Message) error
  32. readMessages() []pb.Message
  33. }
  34. func (r *raft) readMessages() []pb.Message {
  35. msgs := r.msgs
  36. r.msgs = make([]pb.Message, 0)
  37. return msgs
  38. }
  39. func TestLeaderElection(t *testing.T) {
  40. tests := []struct {
  41. *network
  42. state StateType
  43. }{
  44. {newNetwork(nil, nil, nil), StateLeader},
  45. {newNetwork(nil, nil, nopStepper), StateLeader},
  46. {newNetwork(nil, nopStepper, nopStepper), StateCandidate},
  47. {newNetwork(nil, nopStepper, nopStepper, nil), StateCandidate},
  48. {newNetwork(nil, nopStepper, nopStepper, nil, nil), StateLeader},
  49. // three logs further along than 0
  50. {newNetwork(nil, ents(1), ents(2), ents(1, 3), nil), StateFollower},
  51. // logs converge
  52. {newNetwork(ents(1), nil, ents(2), ents(1), nil), StateLeader},
  53. }
  54. for i, tt := range tests {
  55. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  56. sm := tt.network.peers[1].(*raft)
  57. if sm.state != tt.state {
  58. t.Errorf("#%d: state = %s, want %s", i, sm.state, tt.state)
  59. }
  60. if g := sm.Term; g != 1 {
  61. t.Errorf("#%d: term = %d, want %d", i, g, 1)
  62. }
  63. }
  64. }
  65. func TestLogReplication(t *testing.T) {
  66. tests := []struct {
  67. *network
  68. msgs []pb.Message
  69. wcommitted uint64
  70. }{
  71. {
  72. newNetwork(nil, nil, nil),
  73. []pb.Message{
  74. {From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}},
  75. },
  76. 2,
  77. },
  78. {
  79. newNetwork(nil, nil, nil),
  80. []pb.Message{
  81. {From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}},
  82. {From: 1, To: 2, Type: pb.MsgHup},
  83. {From: 1, To: 2, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}},
  84. },
  85. 4,
  86. },
  87. }
  88. for i, tt := range tests {
  89. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  90. for _, m := range tt.msgs {
  91. tt.send(m)
  92. }
  93. for j, x := range tt.network.peers {
  94. sm := x.(*raft)
  95. if sm.raftLog.committed != tt.wcommitted {
  96. t.Errorf("#%d.%d: committed = %d, want %d", i, j, sm.raftLog.committed, tt.wcommitted)
  97. }
  98. ents := []pb.Entry{}
  99. for _, e := range nextEnts(sm) {
  100. if e.Data != nil {
  101. ents = append(ents, e)
  102. }
  103. }
  104. props := []pb.Message{}
  105. for _, m := range tt.msgs {
  106. if m.Type == pb.MsgProp {
  107. props = append(props, m)
  108. }
  109. }
  110. for k, m := range props {
  111. if !bytes.Equal(ents[k].Data, m.Entries[0].Data) {
  112. t.Errorf("#%d.%d: data = %d, want %d", i, j, ents[k].Data, m.Entries[0].Data)
  113. }
  114. }
  115. }
  116. }
  117. }
  118. func TestSingleNodeCommit(t *testing.T) {
  119. tt := newNetwork(nil)
  120. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  121. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  122. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  123. sm := tt.peers[1].(*raft)
  124. if sm.raftLog.committed != 3 {
  125. t.Errorf("committed = %d, want %d", sm.raftLog.committed, 3)
  126. }
  127. }
  128. // TestCannotCommitWithoutNewTermEntry tests the entries cannot be committed
  129. // when leader changes, no new proposal comes in and ChangeTerm proposal is
  130. // filtered.
  131. func TestCannotCommitWithoutNewTermEntry(t *testing.T) {
  132. tt := newNetwork(nil, nil, nil, nil, nil)
  133. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  134. // 0 cannot reach 2,3,4
  135. tt.cut(1, 3)
  136. tt.cut(1, 4)
  137. tt.cut(1, 5)
  138. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  139. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  140. sm := tt.peers[1].(*raft)
  141. if sm.raftLog.committed != 1 {
  142. t.Errorf("committed = %d, want %d", sm.raftLog.committed, 1)
  143. }
  144. // network recovery
  145. tt.recover()
  146. // avoid committing ChangeTerm proposal
  147. tt.ignore(pb.MsgApp)
  148. // elect 1 as the new leader with term 2
  149. tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup})
  150. // no log entries from previous term should be committed
  151. sm = tt.peers[2].(*raft)
  152. if sm.raftLog.committed != 1 {
  153. t.Errorf("committed = %d, want %d", sm.raftLog.committed, 1)
  154. }
  155. tt.recover()
  156. // still be able to append a entry
  157. tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  158. if sm.raftLog.committed != 5 {
  159. t.Errorf("committed = %d, want %d", sm.raftLog.committed, 5)
  160. }
  161. }
  162. // TestCommitWithoutNewTermEntry tests the entries could be committed
  163. // when leader changes, no new proposal comes in.
  164. func TestCommitWithoutNewTermEntry(t *testing.T) {
  165. tt := newNetwork(nil, nil, nil, nil, nil)
  166. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  167. // 0 cannot reach 2,3,4
  168. tt.cut(1, 3)
  169. tt.cut(1, 4)
  170. tt.cut(1, 5)
  171. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  172. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  173. sm := tt.peers[1].(*raft)
  174. if sm.raftLog.committed != 1 {
  175. t.Errorf("committed = %d, want %d", sm.raftLog.committed, 1)
  176. }
  177. // network recovery
  178. tt.recover()
  179. // elect 1 as the new leader with term 2
  180. // after append a ChangeTerm entry from the current term, all entries
  181. // should be committed
  182. tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup})
  183. if sm.raftLog.committed != 4 {
  184. t.Errorf("committed = %d, want %d", sm.raftLog.committed, 4)
  185. }
  186. }
  187. func TestDuelingCandidates(t *testing.T) {
  188. a := newRaft(1, []uint64{1, 2, 3}, 10, 1)
  189. b := newRaft(2, []uint64{1, 2, 3}, 10, 1)
  190. c := newRaft(3, []uint64{1, 2, 3}, 10, 1)
  191. nt := newNetwork(a, b, c)
  192. nt.cut(1, 3)
  193. nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  194. nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
  195. nt.recover()
  196. nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
  197. wlog := &raftLog{ents: []pb.Entry{{}, pb.Entry{Data: nil, Term: 1, Index: 1}}, committed: 1}
  198. tests := []struct {
  199. sm *raft
  200. state StateType
  201. term uint64
  202. raftLog *raftLog
  203. }{
  204. {a, StateFollower, 2, wlog},
  205. {b, StateFollower, 2, wlog},
  206. {c, StateFollower, 2, newLog()},
  207. }
  208. for i, tt := range tests {
  209. if g := tt.sm.state; g != tt.state {
  210. t.Errorf("#%d: state = %s, want %s", i, g, tt.state)
  211. }
  212. if g := tt.sm.Term; g != tt.term {
  213. t.Errorf("#%d: term = %d, want %d", i, g, tt.term)
  214. }
  215. base := ltoa(tt.raftLog)
  216. if sm, ok := nt.peers[1+uint64(i)].(*raft); ok {
  217. l := ltoa(sm.raftLog)
  218. if g := diffu(base, l); g != "" {
  219. t.Errorf("#%d: diff:\n%s", i, g)
  220. }
  221. } else {
  222. t.Logf("#%d: empty log", i)
  223. }
  224. }
  225. }
  226. func TestCandidateConcede(t *testing.T) {
  227. tt := newNetwork(nil, nil, nil)
  228. tt.isolate(1)
  229. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  230. tt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
  231. // heal the partition
  232. tt.recover()
  233. data := []byte("force follower")
  234. // send a proposal to 2 to flush out a msgApp to 0
  235. tt.send(pb.Message{From: 3, To: 3, Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
  236. a := tt.peers[1].(*raft)
  237. if g := a.state; g != StateFollower {
  238. t.Errorf("state = %s, want %s", g, StateFollower)
  239. }
  240. if g := a.Term; g != 1 {
  241. t.Errorf("term = %d, want %d", g, 1)
  242. }
  243. wantLog := ltoa(&raftLog{ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}}, committed: 2})
  244. for i, p := range tt.peers {
  245. if sm, ok := p.(*raft); ok {
  246. l := ltoa(sm.raftLog)
  247. if g := diffu(wantLog, l); g != "" {
  248. t.Errorf("#%d: diff:\n%s", i, g)
  249. }
  250. } else {
  251. t.Logf("#%d: empty log", i)
  252. }
  253. }
  254. }
  255. func TestSingleNodeCandidate(t *testing.T) {
  256. tt := newNetwork(nil)
  257. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  258. sm := tt.peers[1].(*raft)
  259. if sm.state != StateLeader {
  260. t.Errorf("state = %d, want %d", sm.state, StateLeader)
  261. }
  262. }
  263. func TestOldMessages(t *testing.T) {
  264. tt := newNetwork(nil, nil, nil)
  265. // make 0 leader @ term 3
  266. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  267. tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup})
  268. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  269. // pretend we're an old leader trying to make progress
  270. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgApp, Term: 1, Entries: []pb.Entry{{Term: 1}}})
  271. l := &raftLog{
  272. ents: []pb.Entry{
  273. {}, {Data: nil, Term: 1, Index: 1},
  274. {Data: nil, Term: 2, Index: 2}, {Data: nil, Term: 3, Index: 3},
  275. },
  276. committed: 3,
  277. }
  278. base := ltoa(l)
  279. for i, p := range tt.peers {
  280. if sm, ok := p.(*raft); ok {
  281. l := ltoa(sm.raftLog)
  282. if g := diffu(base, l); g != "" {
  283. t.Errorf("#%d: diff:\n%s", i, g)
  284. }
  285. } else {
  286. t.Logf("#%d: empty log", i)
  287. }
  288. }
  289. }
  290. // TestOldMessagesReply - optimization - reply with new term.
  291. func TestProposal(t *testing.T) {
  292. tests := []struct {
  293. *network
  294. success bool
  295. }{
  296. {newNetwork(nil, nil, nil), true},
  297. {newNetwork(nil, nil, nopStepper), true},
  298. {newNetwork(nil, nopStepper, nopStepper), false},
  299. {newNetwork(nil, nopStepper, nopStepper, nil), false},
  300. {newNetwork(nil, nopStepper, nopStepper, nil, nil), true},
  301. }
  302. for i, tt := range tests {
  303. send := func(m pb.Message) {
  304. defer func() {
  305. // only recover is we expect it to panic so
  306. // panics we don't expect go up.
  307. if !tt.success {
  308. e := recover()
  309. if e != nil {
  310. t.Logf("#%d: err: %s", i, e)
  311. }
  312. }
  313. }()
  314. tt.send(m)
  315. }
  316. data := []byte("somedata")
  317. // promote 0 the leader
  318. send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  319. send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
  320. wantLog := newLog()
  321. if tt.success {
  322. wantLog = &raftLog{ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}}, committed: 2}
  323. }
  324. base := ltoa(wantLog)
  325. for i, p := range tt.peers {
  326. if sm, ok := p.(*raft); ok {
  327. l := ltoa(sm.raftLog)
  328. if g := diffu(base, l); g != "" {
  329. t.Errorf("#%d: diff:\n%s", i, g)
  330. }
  331. } else {
  332. t.Logf("#%d: empty log", i)
  333. }
  334. }
  335. sm := tt.network.peers[1].(*raft)
  336. if g := sm.Term; g != 1 {
  337. t.Errorf("#%d: term = %d, want %d", i, g, 1)
  338. }
  339. }
  340. }
  341. func TestProposalByProxy(t *testing.T) {
  342. data := []byte("somedata")
  343. tests := []*network{
  344. newNetwork(nil, nil, nil),
  345. newNetwork(nil, nil, nopStepper),
  346. }
  347. for i, tt := range tests {
  348. // promote 0 the leader
  349. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  350. // propose via follower
  351. tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
  352. wantLog := &raftLog{ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Data: data, Index: 2}}, committed: 2}
  353. base := ltoa(wantLog)
  354. for i, p := range tt.peers {
  355. if sm, ok := p.(*raft); ok {
  356. l := ltoa(sm.raftLog)
  357. if g := diffu(base, l); g != "" {
  358. t.Errorf("#%d: diff:\n%s", i, g)
  359. }
  360. } else {
  361. t.Logf("#%d: empty log", i)
  362. }
  363. }
  364. sm := tt.peers[1].(*raft)
  365. if g := sm.Term; g != 1 {
  366. t.Errorf("#%d: term = %d, want %d", i, g, 1)
  367. }
  368. }
  369. }
  370. func TestCompact(t *testing.T) {
  371. tests := []struct {
  372. compacti uint64
  373. nodes []uint64
  374. snapd []byte
  375. wpanic bool
  376. }{
  377. {1, []uint64{1, 2, 3}, []byte("some data"), false},
  378. {2, []uint64{1, 2, 3}, []byte("some data"), false},
  379. {4, []uint64{1, 2, 3}, []byte("some data"), true}, // compact out of range
  380. }
  381. for i, tt := range tests {
  382. func() {
  383. defer func() {
  384. if r := recover(); r != nil {
  385. if tt.wpanic != true {
  386. t.Errorf("%d: panic = %v, want %v", i, true, tt.wpanic)
  387. }
  388. }
  389. }()
  390. sm := &raft{
  391. state: StateLeader,
  392. raftLog: &raftLog{
  393. committed: 2,
  394. applied: 2,
  395. ents: []pb.Entry{{}, {Term: 1}, {Term: 1}, {Term: 1}},
  396. },
  397. }
  398. sm.compact(tt.compacti, tt.nodes, tt.snapd)
  399. sort.Sort(uint64Slice(sm.raftLog.snapshot.Nodes))
  400. if sm.raftLog.offset != tt.compacti {
  401. t.Errorf("%d: log.offset = %d, want %d", i, sm.raftLog.offset, tt.compacti)
  402. }
  403. if !reflect.DeepEqual(sm.raftLog.snapshot.Nodes, tt.nodes) {
  404. t.Errorf("%d: snap.nodes = %v, want %v", i, sm.raftLog.snapshot.Nodes, tt.nodes)
  405. }
  406. if !reflect.DeepEqual(sm.raftLog.snapshot.Data, tt.snapd) {
  407. t.Errorf("%d: snap.data = %v, want %v", i, sm.raftLog.snapshot.Data, tt.snapd)
  408. }
  409. }()
  410. }
  411. }
  412. func TestCommit(t *testing.T) {
  413. tests := []struct {
  414. matches []uint64
  415. logs []pb.Entry
  416. smTerm uint64
  417. w uint64
  418. }{
  419. // single
  420. {[]uint64{1}, []pb.Entry{{}, {Term: 1}}, 1, 1},
  421. {[]uint64{1}, []pb.Entry{{}, {Term: 1}}, 2, 0},
  422. {[]uint64{2}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 2, 2},
  423. {[]uint64{1}, []pb.Entry{{}, {Term: 2}}, 2, 1},
  424. // odd
  425. {[]uint64{2, 1, 1}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
  426. {[]uint64{2, 1, 1}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
  427. {[]uint64{2, 1, 2}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 2, 2},
  428. {[]uint64{2, 1, 2}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
  429. // even
  430. {[]uint64{2, 1, 1, 1}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
  431. {[]uint64{2, 1, 1, 1}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
  432. {[]uint64{2, 1, 1, 2}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
  433. {[]uint64{2, 1, 1, 2}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
  434. {[]uint64{2, 1, 2, 2}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 2, 2},
  435. {[]uint64{2, 1, 2, 2}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
  436. }
  437. for i, tt := range tests {
  438. prs := make(map[uint64]*progress)
  439. for j := 0; j < len(tt.matches); j++ {
  440. prs[uint64(j)] = &progress{tt.matches[j], tt.matches[j] + 1}
  441. }
  442. sm := &raft{raftLog: &raftLog{ents: tt.logs}, prs: prs, HardState: pb.HardState{Term: tt.smTerm}}
  443. sm.maybeCommit()
  444. if g := sm.raftLog.committed; g != tt.w {
  445. t.Errorf("#%d: committed = %d, want %d", i, g, tt.w)
  446. }
  447. }
  448. }
  449. func TestIsElectionTimeout(t *testing.T) {
  450. tests := []struct {
  451. elapse int
  452. wprobability float64
  453. round bool
  454. }{
  455. {5, 0, false},
  456. {13, 0.3, true},
  457. {15, 0.5, true},
  458. {18, 0.8, true},
  459. {20, 1, false},
  460. }
  461. for i, tt := range tests {
  462. sm := newRaft(1, []uint64{1}, 10, 1)
  463. sm.elapsed = tt.elapse
  464. c := 0
  465. for j := 0; j < 10000; j++ {
  466. if sm.isElectionTimeout() {
  467. c++
  468. }
  469. }
  470. got := float64(c) / 10000.0
  471. if tt.round {
  472. got = math.Floor(got*10+0.5) / 10.0
  473. }
  474. if got != tt.wprobability {
  475. t.Errorf("#%d: possibility = %v, want %v", i, got, tt.wprobability)
  476. }
  477. }
  478. }
  479. // ensure that the Step function ignores the message from old term and does not pass it to the
  480. // acutal stepX function.
  481. func TestStepIgnoreOldTermMsg(t *testing.T) {
  482. called := false
  483. fakeStep := func(r *raft, m pb.Message) {
  484. called = true
  485. }
  486. sm := newRaft(1, []uint64{1}, 10, 1)
  487. sm.step = fakeStep
  488. sm.Term = 2
  489. sm.Step(pb.Message{Type: pb.MsgApp, Term: sm.Term - 1})
  490. if called == true {
  491. t.Errorf("stepFunc called = %v , want %v", called, false)
  492. }
  493. }
  494. // TestHandleMsgApp ensures:
  495. // 1. Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm.
  496. // 2. If an existing entry conflicts with a new one (same index but different terms),
  497. // delete the existing entry and all that follow it; append any new entries not already in the log.
  498. // 3. If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry).
  499. func TestHandleMsgApp(t *testing.T) {
  500. tests := []struct {
  501. m pb.Message
  502. wIndex uint64
  503. wCommit uint64
  504. wReject bool
  505. }{
  506. // Ensure 1
  507. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 3, Index: 2, Commit: 3}, 2, 0, true}, // previous log mismatch
  508. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 3, Index: 3, Commit: 3}, 2, 0, true}, // previous log non-exist
  509. // Ensure 2
  510. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 1, Index: 1, Commit: 1}, 2, 1, false},
  511. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 0, Index: 0, Commit: 1, Entries: []pb.Entry{{Term: 2}}}, 1, 1, false},
  512. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 3, Entries: []pb.Entry{{Term: 2}, {Term: 2}}}, 4, 3, false},
  513. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 4, Entries: []pb.Entry{{Term: 2}}}, 3, 3, false},
  514. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 1, Index: 1, Commit: 4, Entries: []pb.Entry{{Term: 2}}}, 2, 2, false},
  515. // Ensure 3
  516. {pb.Message{Type: pb.MsgApp, Term: 1, LogTerm: 1, Index: 1, Commit: 3}, 2, 1, false}, // match entry 1, commit upto last new entry 1
  517. {pb.Message{Type: pb.MsgApp, Term: 1, LogTerm: 1, Index: 1, Commit: 3, Entries: []pb.Entry{{Term: 2}}}, 2, 2, false}, // match entry 1, commit upto last new entry 2
  518. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 3}, 2, 2, false}, // match entry 2, commit upto last new entry 2
  519. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 4}, 2, 2, false}, // commit upto log.last()
  520. }
  521. for i, tt := range tests {
  522. sm := &raft{
  523. state: StateFollower,
  524. HardState: pb.HardState{Term: 2},
  525. raftLog: &raftLog{committed: 0, ents: []pb.Entry{{}, {Term: 1}, {Term: 2}}},
  526. }
  527. sm.handleAppendEntries(tt.m)
  528. if sm.raftLog.lastIndex() != tt.wIndex {
  529. t.Errorf("#%d: lastIndex = %d, want %d", i, sm.raftLog.lastIndex(), tt.wIndex)
  530. }
  531. if sm.raftLog.committed != tt.wCommit {
  532. t.Errorf("#%d: committed = %d, want %d", i, sm.raftLog.committed, tt.wCommit)
  533. }
  534. m := sm.readMessages()
  535. if len(m) != 1 {
  536. t.Fatalf("#%d: msg = nil, want 1", i)
  537. }
  538. if m[0].Reject != tt.wReject {
  539. t.Errorf("#%d: reject = %v, want %v", i, m[0].Reject, tt.wReject)
  540. }
  541. }
  542. }
  543. func TestRecvMsgVote(t *testing.T) {
  544. tests := []struct {
  545. state StateType
  546. i, term uint64
  547. voteFor uint64
  548. wreject bool
  549. }{
  550. {StateFollower, 0, 0, None, true},
  551. {StateFollower, 0, 1, None, true},
  552. {StateFollower, 0, 2, None, true},
  553. {StateFollower, 0, 3, None, false},
  554. {StateFollower, 1, 0, None, true},
  555. {StateFollower, 1, 1, None, true},
  556. {StateFollower, 1, 2, None, true},
  557. {StateFollower, 1, 3, None, false},
  558. {StateFollower, 2, 0, None, true},
  559. {StateFollower, 2, 1, None, true},
  560. {StateFollower, 2, 2, None, false},
  561. {StateFollower, 2, 3, None, false},
  562. {StateFollower, 3, 0, None, true},
  563. {StateFollower, 3, 1, None, true},
  564. {StateFollower, 3, 2, None, false},
  565. {StateFollower, 3, 3, None, false},
  566. {StateFollower, 3, 2, 2, false},
  567. {StateFollower, 3, 2, 1, true},
  568. {StateLeader, 3, 3, 1, true},
  569. {StateCandidate, 3, 3, 1, true},
  570. }
  571. for i, tt := range tests {
  572. sm := newRaft(1, []uint64{1}, 10, 1)
  573. sm.state = tt.state
  574. switch tt.state {
  575. case StateFollower:
  576. sm.step = stepFollower
  577. case StateCandidate:
  578. sm.step = stepCandidate
  579. case StateLeader:
  580. sm.step = stepLeader
  581. }
  582. sm.HardState = pb.HardState{Vote: tt.voteFor}
  583. sm.raftLog = &raftLog{ents: []pb.Entry{{}, {Term: 2}, {Term: 2}}}
  584. sm.Step(pb.Message{Type: pb.MsgVote, From: 2, Index: tt.i, LogTerm: tt.term})
  585. msgs := sm.readMessages()
  586. if g := len(msgs); g != 1 {
  587. t.Fatalf("#%d: len(msgs) = %d, want 1", i, g)
  588. continue
  589. }
  590. if g := msgs[0].Reject; g != tt.wreject {
  591. t.Errorf("#%d, m.Reject = %v, want %v", i, g, tt.wreject)
  592. }
  593. }
  594. }
  595. func TestStateTransition(t *testing.T) {
  596. tests := []struct {
  597. from StateType
  598. to StateType
  599. wallow bool
  600. wterm uint64
  601. wlead uint64
  602. }{
  603. {StateFollower, StateFollower, true, 1, None},
  604. {StateFollower, StateCandidate, true, 1, None},
  605. {StateFollower, StateLeader, false, 0, None},
  606. {StateCandidate, StateFollower, true, 0, None},
  607. {StateCandidate, StateCandidate, true, 1, None},
  608. {StateCandidate, StateLeader, true, 0, 1},
  609. {StateLeader, StateFollower, true, 1, None},
  610. {StateLeader, StateCandidate, false, 1, None},
  611. {StateLeader, StateLeader, true, 0, 1},
  612. }
  613. for i, tt := range tests {
  614. func() {
  615. defer func() {
  616. if r := recover(); r != nil {
  617. if tt.wallow == true {
  618. t.Errorf("%d: allow = %v, want %v", i, false, true)
  619. }
  620. }
  621. }()
  622. sm := newRaft(1, []uint64{1}, 10, 1)
  623. sm.state = tt.from
  624. switch tt.to {
  625. case StateFollower:
  626. sm.becomeFollower(tt.wterm, tt.wlead)
  627. case StateCandidate:
  628. sm.becomeCandidate()
  629. case StateLeader:
  630. sm.becomeLeader()
  631. }
  632. if sm.Term != tt.wterm {
  633. t.Errorf("%d: term = %d, want %d", i, sm.Term, tt.wterm)
  634. }
  635. if sm.lead != tt.wlead {
  636. t.Errorf("%d: lead = %d, want %d", i, sm.lead, tt.wlead)
  637. }
  638. }()
  639. }
  640. }
  641. func TestAllServerStepdown(t *testing.T) {
  642. tests := []struct {
  643. state StateType
  644. wstate StateType
  645. wterm uint64
  646. windex uint64
  647. }{
  648. {StateFollower, StateFollower, 3, 1},
  649. {StateCandidate, StateFollower, 3, 1},
  650. {StateLeader, StateFollower, 3, 2},
  651. }
  652. tmsgTypes := [...]pb.MessageType{pb.MsgVote, pb.MsgApp}
  653. tterm := uint64(3)
  654. for i, tt := range tests {
  655. sm := newRaft(1, []uint64{1, 2, 3}, 10, 1)
  656. switch tt.state {
  657. case StateFollower:
  658. sm.becomeFollower(1, None)
  659. case StateCandidate:
  660. sm.becomeCandidate()
  661. case StateLeader:
  662. sm.becomeCandidate()
  663. sm.becomeLeader()
  664. }
  665. for j, msgType := range tmsgTypes {
  666. sm.Step(pb.Message{From: 2, Type: msgType, Term: tterm, LogTerm: tterm})
  667. if sm.state != tt.wstate {
  668. t.Errorf("#%d.%d state = %v , want %v", i, j, sm.state, tt.wstate)
  669. }
  670. if sm.Term != tt.wterm {
  671. t.Errorf("#%d.%d term = %v , want %v", i, j, sm.Term, tt.wterm)
  672. }
  673. if uint64(len(sm.raftLog.ents)) != tt.windex {
  674. t.Errorf("#%d.%d index = %v , want %v", i, j, len(sm.raftLog.ents), tt.windex)
  675. }
  676. wlead := uint64(2)
  677. if msgType == pb.MsgVote {
  678. wlead = None
  679. }
  680. if sm.lead != wlead {
  681. t.Errorf("#%d, sm.lead = %d, want %d", i, sm.lead, None)
  682. }
  683. }
  684. }
  685. }
  686. func TestLeaderAppResp(t *testing.T) {
  687. // initial progress: match = 0; netx = 3
  688. tests := []struct {
  689. index uint64
  690. reject bool
  691. // progress
  692. wmatch uint64
  693. wnext uint64
  694. // message
  695. wmsgNum int
  696. windex uint64
  697. wcommitted uint64
  698. }{
  699. {3, true, 0, 3, 0, 0, 0}, // stale resp; no replies
  700. {2, true, 0, 2, 1, 1, 0}, // denied resp; leader does not commit; decrese next and send probing msg
  701. {2, false, 2, 3, 2, 2, 2}, // accept resp; leader commits; broadcast with commit index
  702. {0, false, 0, 3, 0, 0, 0}, // ignore heartbeat replies
  703. }
  704. for i, tt := range tests {
  705. // sm term is 1 after it becomes the leader.
  706. // thus the last log term must be 1 to be committed.
  707. sm := newRaft(1, []uint64{1, 2, 3}, 10, 1)
  708. sm.raftLog = &raftLog{ents: []pb.Entry{{}, {Term: 0}, {Term: 1}}}
  709. sm.becomeCandidate()
  710. sm.becomeLeader()
  711. sm.readMessages()
  712. sm.Step(pb.Message{From: 2, Type: pb.MsgAppResp, Index: tt.index, Term: sm.Term, Reject: tt.reject})
  713. p := sm.prs[2]
  714. if p.match != tt.wmatch {
  715. t.Errorf("#%d match = %d, want %d", i, p.match, tt.wmatch)
  716. }
  717. if p.next != tt.wnext {
  718. t.Errorf("#%d next = %d, want %d", i, p.next, tt.wnext)
  719. }
  720. msgs := sm.readMessages()
  721. if len(msgs) != tt.wmsgNum {
  722. t.Errorf("#%d msgNum = %d, want %d", i, len(msgs), tt.wmsgNum)
  723. }
  724. for j, msg := range msgs {
  725. if msg.Index != tt.windex {
  726. t.Errorf("#%d.%d index = %d, want %d", i, j, msg.Index, tt.windex)
  727. }
  728. if msg.Commit != tt.wcommitted {
  729. t.Errorf("#%d.%d commit = %d, want %d", i, j, msg.Commit, tt.wcommitted)
  730. }
  731. }
  732. }
  733. }
  734. // When the leader receives a heartbeat tick, it should
  735. // send a msgApp with m.Index = 0, m.LogTerm=0 and empty entries.
  736. func TestBcastBeat(t *testing.T) {
  737. offset := uint64(1000)
  738. // make a state machine with log.offset = 1000
  739. s := pb.Snapshot{
  740. Index: offset,
  741. Term: 1,
  742. Nodes: []uint64{1, 2, 3},
  743. }
  744. sm := newRaft(1, []uint64{1, 2, 3}, 10, 1)
  745. sm.Term = 1
  746. sm.restore(s)
  747. sm.becomeCandidate()
  748. sm.becomeLeader()
  749. for i := 0; i < 10; i++ {
  750. sm.appendEntry(pb.Entry{})
  751. }
  752. sm.Step(pb.Message{Type: pb.MsgBeat})
  753. msgs := sm.readMessages()
  754. if len(msgs) != 2 {
  755. t.Fatalf("len(msgs) = %v, want 1", len(msgs))
  756. }
  757. tomap := map[uint64]bool{2: true, 3: true}
  758. for i, m := range msgs {
  759. if m.Type != pb.MsgApp {
  760. t.Fatalf("#%d: type = %v, want = %v", i, m.Type, pb.MsgApp)
  761. }
  762. if m.Index != 0 {
  763. t.Fatalf("#%d: prevIndex = %d, want %d", i, m.Index, 0)
  764. }
  765. if m.LogTerm != 0 {
  766. t.Fatalf("#%d: prevTerm = %d, want %d", i, m.LogTerm, 0)
  767. }
  768. if !tomap[m.To] {
  769. t.Fatalf("#%d: unexpected to %d", i, m.To)
  770. } else {
  771. delete(tomap, m.To)
  772. }
  773. if len(m.Entries) != 0 {
  774. t.Fatalf("#%d: len(entries) = %d, want 0", i, len(m.Entries))
  775. }
  776. }
  777. }
  778. // tests the output of the statemachine when receiving msgBeat
  779. func TestRecvMsgBeat(t *testing.T) {
  780. tests := []struct {
  781. state StateType
  782. wMsg int
  783. }{
  784. {StateLeader, 2},
  785. // candidate and follower should ignore msgBeat
  786. {StateCandidate, 0},
  787. {StateFollower, 0},
  788. }
  789. for i, tt := range tests {
  790. sm := newRaft(1, []uint64{1, 2, 3}, 10, 1)
  791. sm.raftLog = &raftLog{ents: []pb.Entry{{}, {Term: 0}, {Term: 1}}}
  792. sm.Term = 1
  793. sm.state = tt.state
  794. switch tt.state {
  795. case StateFollower:
  796. sm.step = stepFollower
  797. case StateCandidate:
  798. sm.step = stepCandidate
  799. case StateLeader:
  800. sm.step = stepLeader
  801. }
  802. sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
  803. msgs := sm.readMessages()
  804. if len(msgs) != tt.wMsg {
  805. t.Errorf("%d: len(msgs) = %d, want %d", i, len(msgs), tt.wMsg)
  806. }
  807. for _, m := range msgs {
  808. if m.Type != pb.MsgApp {
  809. t.Errorf("%d: msg.type = %v, want %v", i, m.Type, pb.MsgApp)
  810. }
  811. }
  812. }
  813. }
  814. func TestRestore(t *testing.T) {
  815. s := pb.Snapshot{
  816. Index: 11, // magic number
  817. Term: 11, // magic number
  818. Nodes: []uint64{1, 2, 3},
  819. }
  820. sm := newRaft(1, []uint64{1, 2}, 10, 1)
  821. if ok := sm.restore(s); !ok {
  822. t.Fatal("restore fail, want succeed")
  823. }
  824. if sm.raftLog.lastIndex() != s.Index {
  825. t.Errorf("log.lastIndex = %d, want %d", sm.raftLog.lastIndex(), s.Index)
  826. }
  827. if sm.raftLog.term(s.Index) != s.Term {
  828. t.Errorf("log.lastTerm = %d, want %d", sm.raftLog.term(s.Index), s.Term)
  829. }
  830. sg := sm.nodes()
  831. sort.Sort(uint64Slice(sg))
  832. if !reflect.DeepEqual(sg, s.Nodes) {
  833. t.Errorf("sm.Nodes = %+v, want %+v", sg, s.Nodes)
  834. }
  835. if !reflect.DeepEqual(sm.raftLog.snapshot, s) {
  836. t.Errorf("snapshot = %+v, want %+v", sm.raftLog.snapshot, s)
  837. }
  838. if ok := sm.restore(s); ok {
  839. t.Fatal("restore succeed, want fail")
  840. }
  841. }
  842. func TestProvideSnap(t *testing.T) {
  843. s := pb.Snapshot{
  844. Index: 11, // magic number
  845. Term: 11, // magic number
  846. Nodes: []uint64{1, 2},
  847. }
  848. sm := newRaft(1, []uint64{1}, 10, 1)
  849. // restore the statemachin from a snapshot
  850. // so it has a compacted log and a snapshot
  851. sm.restore(s)
  852. sm.becomeCandidate()
  853. sm.becomeLeader()
  854. // force set the next of node 1, so that
  855. // node 1 needs a snapshot
  856. sm.prs[2].next = sm.raftLog.offset
  857. sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs[2].next - 1, Reject: true})
  858. msgs := sm.readMessages()
  859. if len(msgs) != 1 {
  860. t.Fatalf("len(msgs) = %d, want 1", len(msgs))
  861. }
  862. m := msgs[0]
  863. if m.Type != pb.MsgSnap {
  864. t.Errorf("m.Type = %v, want %v", m.Type, pb.MsgSnap)
  865. }
  866. }
  867. func TestRestoreFromSnapMsg(t *testing.T) {
  868. s := pb.Snapshot{
  869. Index: 11, // magic number
  870. Term: 11, // magic number
  871. Nodes: []uint64{1, 2},
  872. }
  873. m := pb.Message{Type: pb.MsgSnap, From: 1, Term: 2, Snapshot: s}
  874. sm := newRaft(2, []uint64{1, 2}, 10, 1)
  875. sm.Step(m)
  876. if !reflect.DeepEqual(sm.raftLog.snapshot, s) {
  877. t.Errorf("snapshot = %+v, want %+v", sm.raftLog.snapshot, s)
  878. }
  879. }
  880. func TestSlowNodeRestore(t *testing.T) {
  881. nt := newNetwork(nil, nil, nil)
  882. nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  883. nt.isolate(3)
  884. for j := 0; j <= 100; j++ {
  885. nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
  886. }
  887. lead := nt.peers[1].(*raft)
  888. nextEnts(lead)
  889. lead.compact(lead.raftLog.applied, lead.nodes(), nil)
  890. nt.recover()
  891. // trigger a snapshot
  892. nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
  893. follower := nt.peers[3].(*raft)
  894. if !reflect.DeepEqual(follower.raftLog.snapshot, lead.raftLog.snapshot) {
  895. t.Errorf("follower.snap = %+v, want %+v", follower.raftLog.snapshot, lead.raftLog.snapshot)
  896. }
  897. // trigger a commit
  898. nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
  899. if follower.raftLog.committed != lead.raftLog.committed {
  900. t.Errorf("follower.comitted = %d, want %d", follower.raftLog.committed, lead.raftLog.committed)
  901. }
  902. }
  903. // TestStepConfig tests that when raft step msgProp in EntryConfChange type,
  904. // it appends the entry to log and sets pendingConf to be true.
  905. func TestStepConfig(t *testing.T) {
  906. // a raft that cannot make progress
  907. r := newRaft(1, []uint64{1, 2}, 10, 1)
  908. r.becomeCandidate()
  909. r.becomeLeader()
  910. index := r.raftLog.lastIndex()
  911. r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
  912. if g := r.raftLog.lastIndex(); g != index+1 {
  913. t.Errorf("index = %d, want %d", g, index+1)
  914. }
  915. if r.pendingConf != true {
  916. t.Errorf("pendingConf = %v, want true", r.pendingConf)
  917. }
  918. }
  919. // TestStepIgnoreConfig tests that if raft step the second msgProp in
  920. // EntryConfChange type when the first one is uncommitted, the node will deny
  921. // the proposal and keep its original state.
  922. func TestStepIgnoreConfig(t *testing.T) {
  923. // a raft that cannot make progress
  924. r := newRaft(1, []uint64{1, 2}, 10, 1)
  925. r.becomeCandidate()
  926. r.becomeLeader()
  927. r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
  928. index := r.raftLog.lastIndex()
  929. pendingConf := r.pendingConf
  930. r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
  931. if g := r.raftLog.lastIndex(); g != index {
  932. t.Errorf("index = %d, want %d", g, index)
  933. }
  934. if r.pendingConf != pendingConf {
  935. t.Errorf("pendingConf = %v, want %v", r.pendingConf, pendingConf)
  936. }
  937. }
  938. // TestRecoverPendingConfig tests that new leader recovers its pendingConf flag
  939. // based on uncommitted entries.
  940. func TestRecoverPendingConfig(t *testing.T) {
  941. tests := []struct {
  942. entType pb.EntryType
  943. wpending bool
  944. }{
  945. {pb.EntryNormal, false},
  946. {pb.EntryConfChange, true},
  947. }
  948. for i, tt := range tests {
  949. r := newRaft(1, []uint64{1, 2}, 10, 1)
  950. r.appendEntry(pb.Entry{Type: tt.entType})
  951. r.becomeCandidate()
  952. r.becomeLeader()
  953. if r.pendingConf != tt.wpending {
  954. t.Errorf("#%d: pendingConf = %v, want %v", i, r.pendingConf, tt.wpending)
  955. }
  956. }
  957. }
  958. // TestRecoverDoublePendingConfig tests that new leader will panic if
  959. // there exist two uncommitted config entries.
  960. func TestRecoverDoublePendingConfig(t *testing.T) {
  961. func() {
  962. defer func() {
  963. if err := recover(); err == nil {
  964. t.Errorf("expect panic, but nothing happens")
  965. }
  966. }()
  967. r := newRaft(1, []uint64{1, 2}, 10, 1)
  968. r.appendEntry(pb.Entry{Type: pb.EntryConfChange})
  969. r.appendEntry(pb.Entry{Type: pb.EntryConfChange})
  970. r.becomeCandidate()
  971. r.becomeLeader()
  972. }()
  973. }
  974. // TestAddNode tests that addNode could update pendingConf and nodes correctly.
  975. func TestAddNode(t *testing.T) {
  976. r := newRaft(1, []uint64{1}, 10, 1)
  977. r.pendingConf = true
  978. r.addNode(2)
  979. if r.pendingConf != false {
  980. t.Errorf("pendingConf = %v, want false", r.pendingConf)
  981. }
  982. nodes := r.nodes()
  983. sort.Sort(uint64Slice(nodes))
  984. wnodes := []uint64{1, 2}
  985. if !reflect.DeepEqual(nodes, wnodes) {
  986. t.Errorf("nodes = %v, want %v", nodes, wnodes)
  987. }
  988. }
  989. // TestRemoveNode tests that removeNode could update pendingConf, nodes and
  990. // and removed list correctly.
  991. func TestRemoveNode(t *testing.T) {
  992. r := newRaft(1, []uint64{1, 2}, 10, 1)
  993. r.pendingConf = true
  994. r.removeNode(2)
  995. if r.pendingConf != false {
  996. t.Errorf("pendingConf = %v, want false", r.pendingConf)
  997. }
  998. w := []uint64{1}
  999. if g := r.nodes(); !reflect.DeepEqual(g, w) {
  1000. t.Errorf("nodes = %v, want %v", g, w)
  1001. }
  1002. }
  1003. func TestPromotable(t *testing.T) {
  1004. id := uint64(1)
  1005. tests := []struct {
  1006. peers []uint64
  1007. wp bool
  1008. }{
  1009. {[]uint64{1}, true},
  1010. {[]uint64{1, 2, 3}, true},
  1011. {[]uint64{}, false},
  1012. {[]uint64{2, 3}, false},
  1013. }
  1014. for i, tt := range tests {
  1015. r := &raft{id: id, prs: make(map[uint64]*progress)}
  1016. for _, id := range tt.peers {
  1017. r.prs[id] = &progress{}
  1018. }
  1019. if g := r.promotable(); g != tt.wp {
  1020. t.Errorf("#%d: promotable = %v, want %v", i, g, tt.wp)
  1021. }
  1022. }
  1023. }
  1024. func ents(terms ...uint64) *raft {
  1025. ents := []pb.Entry{{}}
  1026. for _, term := range terms {
  1027. ents = append(ents, pb.Entry{Term: term})
  1028. }
  1029. sm := &raft{raftLog: &raftLog{ents: ents}}
  1030. sm.reset(0)
  1031. return sm
  1032. }
  1033. type network struct {
  1034. peers map[uint64]Interface
  1035. dropm map[connem]float64
  1036. ignorem map[pb.MessageType]bool
  1037. }
  1038. // newNetwork initializes a network from peers.
  1039. // A nil node will be replaced with a new *stateMachine.
  1040. // A *stateMachine will get its k, id.
  1041. // When using stateMachine, the address list is always [1, n].
  1042. func newNetwork(peers ...Interface) *network {
  1043. size := len(peers)
  1044. peerAddrs := idsBySize(size)
  1045. npeers := make(map[uint64]Interface, size)
  1046. for i, p := range peers {
  1047. id := peerAddrs[i]
  1048. switch v := p.(type) {
  1049. case nil:
  1050. sm := newRaft(id, peerAddrs, 10, 1)
  1051. npeers[id] = sm
  1052. case *raft:
  1053. v.id = id
  1054. v.prs = make(map[uint64]*progress)
  1055. for i := 0; i < size; i++ {
  1056. v.prs[peerAddrs[i]] = &progress{}
  1057. }
  1058. v.reset(0)
  1059. npeers[id] = v
  1060. case *blackHole:
  1061. npeers[id] = v
  1062. default:
  1063. panic(fmt.Sprintf("unexpected state machine type: %T", p))
  1064. }
  1065. }
  1066. return &network{
  1067. peers: npeers,
  1068. dropm: make(map[connem]float64),
  1069. ignorem: make(map[pb.MessageType]bool),
  1070. }
  1071. }
  1072. func (nw *network) send(msgs ...pb.Message) {
  1073. for len(msgs) > 0 {
  1074. m := msgs[0]
  1075. p := nw.peers[m.To]
  1076. p.Step(m)
  1077. msgs = append(msgs[1:], nw.filter(p.readMessages())...)
  1078. }
  1079. }
  1080. func (nw *network) drop(from, to uint64, perc float64) {
  1081. nw.dropm[connem{from, to}] = perc
  1082. }
  1083. func (nw *network) cut(one, other uint64) {
  1084. nw.drop(one, other, 1)
  1085. nw.drop(other, one, 1)
  1086. }
  1087. func (nw *network) isolate(id uint64) {
  1088. for i := 0; i < len(nw.peers); i++ {
  1089. nid := uint64(i) + 1
  1090. if nid != id {
  1091. nw.drop(id, nid, 1.0)
  1092. nw.drop(nid, id, 1.0)
  1093. }
  1094. }
  1095. }
  1096. func (nw *network) ignore(t pb.MessageType) {
  1097. nw.ignorem[t] = true
  1098. }
  1099. func (nw *network) recover() {
  1100. nw.dropm = make(map[connem]float64)
  1101. nw.ignorem = make(map[pb.MessageType]bool)
  1102. }
  1103. func (nw *network) filter(msgs []pb.Message) []pb.Message {
  1104. mm := []pb.Message{}
  1105. for _, m := range msgs {
  1106. if nw.ignorem[m.Type] {
  1107. continue
  1108. }
  1109. switch m.Type {
  1110. case pb.MsgHup:
  1111. // hups never go over the network, so don't drop them but panic
  1112. panic("unexpected msgHup")
  1113. default:
  1114. perc := nw.dropm[connem{m.From, m.To}]
  1115. if n := rand.Float64(); n < perc {
  1116. continue
  1117. }
  1118. }
  1119. mm = append(mm, m)
  1120. }
  1121. return mm
  1122. }
  1123. type connem struct {
  1124. from, to uint64
  1125. }
  1126. type blackHole struct{}
  1127. func (blackHole) Step(pb.Message) error { return nil }
  1128. func (blackHole) readMessages() []pb.Message { return nil }
  1129. var nopStepper = &blackHole{}
  1130. func idsBySize(size int) []uint64 {
  1131. ids := make([]uint64, size)
  1132. for i := 0; i < size; i++ {
  1133. ids[i] = 1 + uint64(i)
  1134. }
  1135. return ids
  1136. }