raft_test.go 34 KB


  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package raft
  14. import (
  15. "bytes"
  16. "fmt"
  17. "math"
  18. "math/rand"
  19. "reflect"
  20. "sort"
  21. "testing"
  22. pb "github.com/coreos/etcd/raft/raftpb"
  23. )
  24. // nextEnts returns the appliable entries and updates the applied index
  25. func nextEnts(r *raft) (ents []pb.Entry) {
  26. ents = r.raftLog.nextEnts()
  27. r.raftLog.resetNextEnts()
  28. return ents
  29. }
  30. type Interface interface {
  31. Step(m pb.Message) error
  32. ReadMessages() []pb.Message
  33. }
  34. func TestLeaderElection(t *testing.T) {
  35. tests := []struct {
  36. *network
  37. state StateType
  38. }{
  39. {newNetwork(nil, nil, nil), StateLeader},
  40. {newNetwork(nil, nil, nopStepper), StateLeader},
  41. {newNetwork(nil, nopStepper, nopStepper), StateCandidate},
  42. {newNetwork(nil, nopStepper, nopStepper, nil), StateCandidate},
  43. {newNetwork(nil, nopStepper, nopStepper, nil, nil), StateLeader},
  44. // three logs further along than 0
  45. {newNetwork(nil, ents(1), ents(2), ents(1, 3), nil), StateFollower},
  46. // logs converge
  47. {newNetwork(ents(1), nil, ents(2), ents(1), nil), StateLeader},
  48. }
  49. for i, tt := range tests {
  50. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  51. sm := tt.network.peers[1].(*raft)
  52. if sm.state != tt.state {
  53. t.Errorf("#%d: state = %s, want %s", i, sm.state, tt.state)
  54. }
  55. if g := sm.Term; g != 1 {
  56. t.Errorf("#%d: term = %d, want %d", i, g, 1)
  57. }
  58. }
  59. }
  60. func TestLogReplication(t *testing.T) {
  61. tests := []struct {
  62. *network
  63. msgs []pb.Message
  64. wcommitted uint64
  65. }{
  66. {
  67. newNetwork(nil, nil, nil),
  68. []pb.Message{
  69. {From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}},
  70. },
  71. 2,
  72. },
  73. {
  74. newNetwork(nil, nil, nil),
  75. []pb.Message{
  76. {From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}},
  77. {From: 1, To: 2, Type: pb.MsgHup},
  78. {From: 1, To: 2, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}},
  79. },
  80. 4,
  81. },
  82. }
  83. for i, tt := range tests {
  84. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  85. for _, m := range tt.msgs {
  86. tt.send(m)
  87. }
  88. for j, x := range tt.network.peers {
  89. sm := x.(*raft)
  90. if sm.raftLog.committed != tt.wcommitted {
  91. t.Errorf("#%d.%d: committed = %d, want %d", i, j, sm.raftLog.committed, tt.wcommitted)
  92. }
  93. ents := []pb.Entry{}
  94. for _, e := range nextEnts(sm) {
  95. if e.Data != nil {
  96. ents = append(ents, e)
  97. }
  98. }
  99. props := []pb.Message{}
  100. for _, m := range tt.msgs {
  101. if m.Type == pb.MsgProp {
  102. props = append(props, m)
  103. }
  104. }
  105. for k, m := range props {
  106. if !bytes.Equal(ents[k].Data, m.Entries[0].Data) {
  107. t.Errorf("#%d.%d: data = %d, want %d", i, j, ents[k].Data, m.Entries[0].Data)
  108. }
  109. }
  110. }
  111. }
  112. }
  113. func TestSingleNodeCommit(t *testing.T) {
  114. tt := newNetwork(nil)
  115. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  116. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  117. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  118. sm := tt.peers[1].(*raft)
  119. if sm.raftLog.committed != 3 {
  120. t.Errorf("committed = %d, want %d", sm.raftLog.committed, 3)
  121. }
  122. }
  123. // TestCannotCommitWithoutNewTermEntry tests the entries cannot be committed
  124. // when leader changes, no new proposal comes in and ChangeTerm proposal is
  125. // filtered.
  126. func TestCannotCommitWithoutNewTermEntry(t *testing.T) {
  127. tt := newNetwork(nil, nil, nil, nil, nil)
  128. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  129. // 0 cannot reach 2,3,4
  130. tt.cut(1, 3)
  131. tt.cut(1, 4)
  132. tt.cut(1, 5)
  133. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  134. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  135. sm := tt.peers[1].(*raft)
  136. if sm.raftLog.committed != 1 {
  137. t.Errorf("committed = %d, want %d", sm.raftLog.committed, 1)
  138. }
  139. // network recovery
  140. tt.recover()
  141. // avoid committing ChangeTerm proposal
  142. tt.ignore(pb.MsgApp)
  143. // elect 1 as the new leader with term 2
  144. tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup})
  145. // no log entries from previous term should be committed
  146. sm = tt.peers[2].(*raft)
  147. if sm.raftLog.committed != 1 {
  148. t.Errorf("committed = %d, want %d", sm.raftLog.committed, 1)
  149. }
  150. tt.recover()
  151. // still be able to append a entry
  152. tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  153. if sm.raftLog.committed != 5 {
  154. t.Errorf("committed = %d, want %d", sm.raftLog.committed, 5)
  155. }
  156. }
  157. // TestCommitWithoutNewTermEntry tests the entries could be committed
  158. // when leader changes, no new proposal comes in.
  159. func TestCommitWithoutNewTermEntry(t *testing.T) {
  160. tt := newNetwork(nil, nil, nil, nil, nil)
  161. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  162. // 0 cannot reach 2,3,4
  163. tt.cut(1, 3)
  164. tt.cut(1, 4)
  165. tt.cut(1, 5)
  166. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  167. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  168. sm := tt.peers[1].(*raft)
  169. if sm.raftLog.committed != 1 {
  170. t.Errorf("committed = %d, want %d", sm.raftLog.committed, 1)
  171. }
  172. // network recovery
  173. tt.recover()
  174. // elect 1 as the new leader with term 2
  175. // after append a ChangeTerm entry from the current term, all entries
  176. // should be committed
  177. tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup})
  178. if sm.raftLog.committed != 4 {
  179. t.Errorf("committed = %d, want %d", sm.raftLog.committed, 4)
  180. }
  181. }
  182. func TestDuelingCandidates(t *testing.T) {
  183. a := newRaft(1, []uint64{1, 2, 3}, 10, 1)
  184. b := newRaft(2, []uint64{1, 2, 3}, 10, 1)
  185. c := newRaft(3, []uint64{1, 2, 3}, 10, 1)
  186. nt := newNetwork(a, b, c)
  187. nt.cut(1, 3)
  188. nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  189. nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
  190. nt.recover()
  191. nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
  192. wlog := &raftLog{ents: []pb.Entry{{}, pb.Entry{Data: nil, Term: 1, Index: 1}}, committed: 1}
  193. tests := []struct {
  194. sm *raft
  195. state StateType
  196. term uint64
  197. raftLog *raftLog
  198. }{
  199. {a, StateFollower, 2, wlog},
  200. {b, StateFollower, 2, wlog},
  201. {c, StateFollower, 2, newLog()},
  202. }
  203. for i, tt := range tests {
  204. if g := tt.sm.state; g != tt.state {
  205. t.Errorf("#%d: state = %s, want %s", i, g, tt.state)
  206. }
  207. if g := tt.sm.Term; g != tt.term {
  208. t.Errorf("#%d: term = %d, want %d", i, g, tt.term)
  209. }
  210. base := ltoa(tt.raftLog)
  211. if sm, ok := nt.peers[1+uint64(i)].(*raft); ok {
  212. l := ltoa(sm.raftLog)
  213. if g := diffu(base, l); g != "" {
  214. t.Errorf("#%d: diff:\n%s", i, g)
  215. }
  216. } else {
  217. t.Logf("#%d: empty log", i)
  218. }
  219. }
  220. }
  221. func TestCandidateConcede(t *testing.T) {
  222. tt := newNetwork(nil, nil, nil)
  223. tt.isolate(1)
  224. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  225. tt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
  226. // heal the partition
  227. tt.recover()
  228. data := []byte("force follower")
  229. // send a proposal to 2 to flush out a msgApp to 0
  230. tt.send(pb.Message{From: 3, To: 3, Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
  231. a := tt.peers[1].(*raft)
  232. if g := a.state; g != StateFollower {
  233. t.Errorf("state = %s, want %s", g, StateFollower)
  234. }
  235. if g := a.Term; g != 1 {
  236. t.Errorf("term = %d, want %d", g, 1)
  237. }
  238. wantLog := ltoa(&raftLog{ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}}, committed: 2})
  239. for i, p := range tt.peers {
  240. if sm, ok := p.(*raft); ok {
  241. l := ltoa(sm.raftLog)
  242. if g := diffu(wantLog, l); g != "" {
  243. t.Errorf("#%d: diff:\n%s", i, g)
  244. }
  245. } else {
  246. t.Logf("#%d: empty log", i)
  247. }
  248. }
  249. }
  250. func TestSingleNodeCandidate(t *testing.T) {
  251. tt := newNetwork(nil)
  252. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  253. sm := tt.peers[1].(*raft)
  254. if sm.state != StateLeader {
  255. t.Errorf("state = %d, want %d", sm.state, StateLeader)
  256. }
  257. }
  258. func TestOldMessages(t *testing.T) {
  259. tt := newNetwork(nil, nil, nil)
  260. // make 0 leader @ term 3
  261. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  262. tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup})
  263. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  264. // pretend we're an old leader trying to make progress
  265. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgApp, Term: 1, Entries: []pb.Entry{{Term: 1}}})
  266. l := &raftLog{
  267. ents: []pb.Entry{
  268. {}, {Data: nil, Term: 1, Index: 1},
  269. {Data: nil, Term: 2, Index: 2}, {Data: nil, Term: 3, Index: 3},
  270. },
  271. committed: 3,
  272. }
  273. base := ltoa(l)
  274. for i, p := range tt.peers {
  275. if sm, ok := p.(*raft); ok {
  276. l := ltoa(sm.raftLog)
  277. if g := diffu(base, l); g != "" {
  278. t.Errorf("#%d: diff:\n%s", i, g)
  279. }
  280. } else {
  281. t.Logf("#%d: empty log", i)
  282. }
  283. }
  284. }
  285. // TestOldMessagesReply - optimization - reply with new term.
  286. func TestProposal(t *testing.T) {
  287. tests := []struct {
  288. *network
  289. success bool
  290. }{
  291. {newNetwork(nil, nil, nil), true},
  292. {newNetwork(nil, nil, nopStepper), true},
  293. {newNetwork(nil, nopStepper, nopStepper), false},
  294. {newNetwork(nil, nopStepper, nopStepper, nil), false},
  295. {newNetwork(nil, nopStepper, nopStepper, nil, nil), true},
  296. }
  297. for i, tt := range tests {
  298. send := func(m pb.Message) {
  299. defer func() {
  300. // only recover is we expect it to panic so
  301. // panics we don't expect go up.
  302. if !tt.success {
  303. e := recover()
  304. if e != nil {
  305. t.Logf("#%d: err: %s", i, e)
  306. }
  307. }
  308. }()
  309. tt.send(m)
  310. }
  311. data := []byte("somedata")
  312. // promote 0 the leader
  313. send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  314. send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
  315. wantLog := newLog()
  316. if tt.success {
  317. wantLog = &raftLog{ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}}, committed: 2}
  318. }
  319. base := ltoa(wantLog)
  320. for i, p := range tt.peers {
  321. if sm, ok := p.(*raft); ok {
  322. l := ltoa(sm.raftLog)
  323. if g := diffu(base, l); g != "" {
  324. t.Errorf("#%d: diff:\n%s", i, g)
  325. }
  326. } else {
  327. t.Logf("#%d: empty log", i)
  328. }
  329. }
  330. sm := tt.network.peers[1].(*raft)
  331. if g := sm.Term; g != 1 {
  332. t.Errorf("#%d: term = %d, want %d", i, g, 1)
  333. }
  334. }
  335. }
  336. func TestProposalByProxy(t *testing.T) {
  337. data := []byte("somedata")
  338. tests := []*network{
  339. newNetwork(nil, nil, nil),
  340. newNetwork(nil, nil, nopStepper),
  341. }
  342. for i, tt := range tests {
  343. // promote 0 the leader
  344. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  345. // propose via follower
  346. tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
  347. wantLog := &raftLog{ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Data: data, Index: 2}}, committed: 2}
  348. base := ltoa(wantLog)
  349. for i, p := range tt.peers {
  350. if sm, ok := p.(*raft); ok {
  351. l := ltoa(sm.raftLog)
  352. if g := diffu(base, l); g != "" {
  353. t.Errorf("#%d: diff:\n%s", i, g)
  354. }
  355. } else {
  356. t.Logf("#%d: empty log", i)
  357. }
  358. }
  359. sm := tt.peers[1].(*raft)
  360. if g := sm.Term; g != 1 {
  361. t.Errorf("#%d: term = %d, want %d", i, g, 1)
  362. }
  363. }
  364. }
  365. func TestCompact(t *testing.T) {
  366. tests := []struct {
  367. compacti uint64
  368. nodes []uint64
  369. snapd []byte
  370. wpanic bool
  371. }{
  372. {1, []uint64{1, 2, 3}, []byte("some data"), false},
  373. {2, []uint64{1, 2, 3}, []byte("some data"), false},
  374. {4, []uint64{1, 2, 3}, []byte("some data"), true}, // compact out of range
  375. }
  376. for i, tt := range tests {
  377. func() {
  378. defer func() {
  379. if r := recover(); r != nil {
  380. if tt.wpanic != true {
  381. t.Errorf("%d: panic = %v, want %v", i, true, tt.wpanic)
  382. }
  383. }
  384. }()
  385. sm := &raft{
  386. state: StateLeader,
  387. raftLog: &raftLog{
  388. committed: 2,
  389. applied: 2,
  390. ents: []pb.Entry{{}, {Term: 1}, {Term: 1}, {Term: 1}},
  391. },
  392. }
  393. sm.compact(tt.compacti, tt.nodes, tt.snapd)
  394. sort.Sort(uint64Slice(sm.raftLog.snapshot.Nodes))
  395. if sm.raftLog.offset != tt.compacti {
  396. t.Errorf("%d: log.offset = %d, want %d", i, sm.raftLog.offset, tt.compacti)
  397. }
  398. if !reflect.DeepEqual(sm.raftLog.snapshot.Nodes, tt.nodes) {
  399. t.Errorf("%d: snap.nodes = %v, want %v", i, sm.raftLog.snapshot.Nodes, tt.nodes)
  400. }
  401. if !reflect.DeepEqual(sm.raftLog.snapshot.Data, tt.snapd) {
  402. t.Errorf("%d: snap.data = %v, want %v", i, sm.raftLog.snapshot.Data, tt.snapd)
  403. }
  404. }()
  405. }
  406. }
  407. func TestCommit(t *testing.T) {
  408. tests := []struct {
  409. matches []uint64
  410. logs []pb.Entry
  411. smTerm uint64
  412. w uint64
  413. }{
  414. // single
  415. {[]uint64{1}, []pb.Entry{{}, {Term: 1}}, 1, 1},
  416. {[]uint64{1}, []pb.Entry{{}, {Term: 1}}, 2, 0},
  417. {[]uint64{2}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 2, 2},
  418. {[]uint64{1}, []pb.Entry{{}, {Term: 2}}, 2, 1},
  419. // odd
  420. {[]uint64{2, 1, 1}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
  421. {[]uint64{2, 1, 1}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
  422. {[]uint64{2, 1, 2}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 2, 2},
  423. {[]uint64{2, 1, 2}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
  424. // even
  425. {[]uint64{2, 1, 1, 1}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
  426. {[]uint64{2, 1, 1, 1}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
  427. {[]uint64{2, 1, 1, 2}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
  428. {[]uint64{2, 1, 1, 2}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
  429. {[]uint64{2, 1, 2, 2}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 2, 2},
  430. {[]uint64{2, 1, 2, 2}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
  431. }
  432. for i, tt := range tests {
  433. prs := make(map[uint64]*progress)
  434. for j := 0; j < len(tt.matches); j++ {
  435. prs[uint64(j)] = &progress{tt.matches[j], tt.matches[j] + 1}
  436. }
  437. sm := &raft{raftLog: &raftLog{ents: tt.logs}, prs: prs, HardState: pb.HardState{Term: tt.smTerm}}
  438. sm.maybeCommit()
  439. if g := sm.raftLog.committed; g != tt.w {
  440. t.Errorf("#%d: committed = %d, want %d", i, g, tt.w)
  441. }
  442. }
  443. }
  444. func TestIsElectionTimeout(t *testing.T) {
  445. tests := []struct {
  446. elapse int
  447. wprobability float64
  448. round bool
  449. }{
  450. {5, 0, false},
  451. {13, 0.3, true},
  452. {15, 0.5, true},
  453. {18, 0.8, true},
  454. {20, 1, false},
  455. }
  456. for i, tt := range tests {
  457. sm := newRaft(1, []uint64{1}, 10, 1)
  458. sm.elapsed = tt.elapse
  459. c := 0
  460. for j := 0; j < 10000; j++ {
  461. if sm.isElectionTimeout() {
  462. c++
  463. }
  464. }
  465. got := float64(c) / 10000.0
  466. if tt.round {
  467. got = math.Floor(got*10+0.5) / 10.0
  468. }
  469. if got != tt.wprobability {
  470. t.Errorf("#%d: possibility = %v, want %v", i, got, tt.wprobability)
  471. }
  472. }
  473. }
  474. // ensure that the Step function ignores the message from old term and does not pass it to the
  475. // acutal stepX function.
  476. func TestStepIgnoreOldTermMsg(t *testing.T) {
  477. called := false
  478. fakeStep := func(r *raft, m pb.Message) {
  479. called = true
  480. }
  481. sm := newRaft(1, []uint64{1}, 10, 1)
  482. sm.step = fakeStep
  483. sm.Term = 2
  484. sm.Step(pb.Message{Type: pb.MsgApp, Term: sm.Term - 1})
  485. if called == true {
  486. t.Errorf("stepFunc called = %v , want %v", called, false)
  487. }
  488. }
  489. // TestHandleMsgApp ensures:
  490. // 1. Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm.
  491. // 2. If an existing entry conflicts with a new one (same index but different terms),
  492. // delete the existing entry and all that follow it; append any new entries not already in the log.
  493. // 3. If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry).
  494. func TestHandleMsgApp(t *testing.T) {
  495. tests := []struct {
  496. m pb.Message
  497. wIndex uint64
  498. wCommit uint64
  499. wReject bool
  500. }{
  501. // Ensure 1
  502. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 3, Index: 2, Commit: 3}, 2, 0, true}, // previous log mismatch
  503. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 3, Index: 3, Commit: 3}, 2, 0, true}, // previous log non-exist
  504. // Ensure 2
  505. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 1, Index: 1, Commit: 1}, 2, 1, false},
  506. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 0, Index: 0, Commit: 1, Entries: []pb.Entry{{Term: 2}}}, 1, 1, false},
  507. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 3, Entries: []pb.Entry{{Term: 2}, {Term: 2}}}, 4, 3, false},
  508. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 4, Entries: []pb.Entry{{Term: 2}}}, 3, 3, false},
  509. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 1, Index: 1, Commit: 4, Entries: []pb.Entry{{Term: 2}}}, 2, 2, false},
  510. // Ensure 3
  511. {pb.Message{Type: pb.MsgApp, Term: 1, LogTerm: 1, Index: 1, Commit: 3}, 2, 1, false}, // match entry 1, commit upto last new entry 1
  512. {pb.Message{Type: pb.MsgApp, Term: 1, LogTerm: 1, Index: 1, Commit: 3, Entries: []pb.Entry{{Term: 2}}}, 2, 2, false}, // match entry 1, commit upto last new entry 2
  513. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 3}, 2, 2, false}, // match entry 2, commit upto last new entry 2
  514. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 4}, 2, 2, false}, // commit upto log.last()
  515. }
  516. for i, tt := range tests {
  517. sm := &raft{
  518. state: StateFollower,
  519. HardState: pb.HardState{Term: 2},
  520. raftLog: &raftLog{committed: 0, ents: []pb.Entry{{}, {Term: 1}, {Term: 2}}},
  521. }
  522. sm.handleAppendEntries(tt.m)
  523. if sm.raftLog.lastIndex() != tt.wIndex {
  524. t.Errorf("#%d: lastIndex = %d, want %d", i, sm.raftLog.lastIndex(), tt.wIndex)
  525. }
  526. if sm.raftLog.committed != tt.wCommit {
  527. t.Errorf("#%d: committed = %d, want %d", i, sm.raftLog.committed, tt.wCommit)
  528. }
  529. m := sm.ReadMessages()
  530. if len(m) != 1 {
  531. t.Fatalf("#%d: msg = nil, want 1", i)
  532. }
  533. if m[0].Reject != tt.wReject {
  534. t.Errorf("#%d: reject = %v, want %v", i, m[0].Reject, tt.wReject)
  535. }
  536. }
  537. }
  538. func TestRecvMsgVote(t *testing.T) {
  539. tests := []struct {
  540. state StateType
  541. i, term uint64
  542. voteFor uint64
  543. wreject bool
  544. }{
  545. {StateFollower, 0, 0, None, true},
  546. {StateFollower, 0, 1, None, true},
  547. {StateFollower, 0, 2, None, true},
  548. {StateFollower, 0, 3, None, false},
  549. {StateFollower, 1, 0, None, true},
  550. {StateFollower, 1, 1, None, true},
  551. {StateFollower, 1, 2, None, true},
  552. {StateFollower, 1, 3, None, false},
  553. {StateFollower, 2, 0, None, true},
  554. {StateFollower, 2, 1, None, true},
  555. {StateFollower, 2, 2, None, false},
  556. {StateFollower, 2, 3, None, false},
  557. {StateFollower, 3, 0, None, true},
  558. {StateFollower, 3, 1, None, true},
  559. {StateFollower, 3, 2, None, false},
  560. {StateFollower, 3, 3, None, false},
  561. {StateFollower, 3, 2, 2, false},
  562. {StateFollower, 3, 2, 1, true},
  563. {StateLeader, 3, 3, 1, true},
  564. {StateCandidate, 3, 3, 1, true},
  565. }
  566. for i, tt := range tests {
  567. sm := newRaft(1, []uint64{1}, 10, 1)
  568. sm.state = tt.state
  569. switch tt.state {
  570. case StateFollower:
  571. sm.step = stepFollower
  572. case StateCandidate:
  573. sm.step = stepCandidate
  574. case StateLeader:
  575. sm.step = stepLeader
  576. }
  577. sm.HardState = pb.HardState{Vote: tt.voteFor}
  578. sm.raftLog = &raftLog{ents: []pb.Entry{{}, {Term: 2}, {Term: 2}}}
  579. sm.Step(pb.Message{Type: pb.MsgVote, From: 2, Index: tt.i, LogTerm: tt.term})
  580. msgs := sm.ReadMessages()
  581. if g := len(msgs); g != 1 {
  582. t.Fatalf("#%d: len(msgs) = %d, want 1", i, g)
  583. continue
  584. }
  585. if g := msgs[0].Reject; g != tt.wreject {
  586. t.Errorf("#%d, m.Reject = %v, want %v", i, g, tt.wreject)
  587. }
  588. }
  589. }
  590. func TestStateTransition(t *testing.T) {
  591. tests := []struct {
  592. from StateType
  593. to StateType
  594. wallow bool
  595. wterm uint64
  596. wlead uint64
  597. }{
  598. {StateFollower, StateFollower, true, 1, None},
  599. {StateFollower, StateCandidate, true, 1, None},
  600. {StateFollower, StateLeader, false, 0, None},
  601. {StateCandidate, StateFollower, true, 0, None},
  602. {StateCandidate, StateCandidate, true, 1, None},
  603. {StateCandidate, StateLeader, true, 0, 1},
  604. {StateLeader, StateFollower, true, 1, None},
  605. {StateLeader, StateCandidate, false, 1, None},
  606. {StateLeader, StateLeader, true, 0, 1},
  607. }
  608. for i, tt := range tests {
  609. func() {
  610. defer func() {
  611. if r := recover(); r != nil {
  612. if tt.wallow == true {
  613. t.Errorf("%d: allow = %v, want %v", i, false, true)
  614. }
  615. }
  616. }()
  617. sm := newRaft(1, []uint64{1}, 10, 1)
  618. sm.state = tt.from
  619. switch tt.to {
  620. case StateFollower:
  621. sm.becomeFollower(tt.wterm, tt.wlead)
  622. case StateCandidate:
  623. sm.becomeCandidate()
  624. case StateLeader:
  625. sm.becomeLeader()
  626. }
  627. if sm.Term != tt.wterm {
  628. t.Errorf("%d: term = %d, want %d", i, sm.Term, tt.wterm)
  629. }
  630. if sm.lead != tt.wlead {
  631. t.Errorf("%d: lead = %d, want %d", i, sm.lead, tt.wlead)
  632. }
  633. }()
  634. }
  635. }
  636. func TestAllServerStepdown(t *testing.T) {
  637. tests := []struct {
  638. state StateType
  639. wstate StateType
  640. wterm uint64
  641. windex uint64
  642. }{
  643. {StateFollower, StateFollower, 3, 1},
  644. {StateCandidate, StateFollower, 3, 1},
  645. {StateLeader, StateFollower, 3, 2},
  646. }
  647. tmsgTypes := [...]pb.MessageType{pb.MsgVote, pb.MsgApp}
  648. tterm := uint64(3)
  649. for i, tt := range tests {
  650. sm := newRaft(1, []uint64{1, 2, 3}, 10, 1)
  651. switch tt.state {
  652. case StateFollower:
  653. sm.becomeFollower(1, None)
  654. case StateCandidate:
  655. sm.becomeCandidate()
  656. case StateLeader:
  657. sm.becomeCandidate()
  658. sm.becomeLeader()
  659. }
  660. for j, msgType := range tmsgTypes {
  661. sm.Step(pb.Message{From: 2, Type: msgType, Term: tterm, LogTerm: tterm})
  662. if sm.state != tt.wstate {
  663. t.Errorf("#%d.%d state = %v , want %v", i, j, sm.state, tt.wstate)
  664. }
  665. if sm.Term != tt.wterm {
  666. t.Errorf("#%d.%d term = %v , want %v", i, j, sm.Term, tt.wterm)
  667. }
  668. if uint64(len(sm.raftLog.ents)) != tt.windex {
  669. t.Errorf("#%d.%d index = %v , want %v", i, j, len(sm.raftLog.ents), tt.windex)
  670. }
  671. wlead := uint64(2)
  672. if msgType == pb.MsgVote {
  673. wlead = None
  674. }
  675. if sm.lead != wlead {
  676. t.Errorf("#%d, sm.lead = %d, want %d", i, sm.lead, None)
  677. }
  678. }
  679. }
  680. }
  681. func TestLeaderAppResp(t *testing.T) {
  682. // initial progress: match = 0; netx = 3
  683. tests := []struct {
  684. index uint64
  685. reject bool
  686. // progress
  687. wmatch uint64
  688. wnext uint64
  689. // message
  690. wmsgNum int
  691. windex uint64
  692. wcommitted uint64
  693. }{
  694. {3, true, 0, 3, 0, 0, 0}, // stale resp; no replies
  695. {2, true, 0, 2, 1, 1, 0}, // denied resp; leader does not commit; decrese next and send probing msg
  696. {2, false, 2, 3, 2, 2, 2}, // accept resp; leader commits; broadcast with commit index
  697. {0, false, 0, 3, 0, 0, 0}, // ignore heartbeat replies
  698. }
  699. for i, tt := range tests {
  700. // sm term is 1 after it becomes the leader.
  701. // thus the last log term must be 1 to be committed.
  702. sm := newRaft(1, []uint64{1, 2, 3}, 10, 1)
  703. sm.raftLog = &raftLog{ents: []pb.Entry{{}, {Term: 0}, {Term: 1}}}
  704. sm.becomeCandidate()
  705. sm.becomeLeader()
  706. sm.ReadMessages()
  707. sm.Step(pb.Message{From: 2, Type: pb.MsgAppResp, Index: tt.index, Term: sm.Term, Reject: tt.reject})
  708. p := sm.prs[2]
  709. if p.match != tt.wmatch {
  710. t.Errorf("#%d match = %d, want %d", i, p.match, tt.wmatch)
  711. }
  712. if p.next != tt.wnext {
  713. t.Errorf("#%d next = %d, want %d", i, p.next, tt.wnext)
  714. }
  715. msgs := sm.ReadMessages()
  716. if len(msgs) != tt.wmsgNum {
  717. t.Errorf("#%d msgNum = %d, want %d", i, len(msgs), tt.wmsgNum)
  718. }
  719. for j, msg := range msgs {
  720. if msg.Index != tt.windex {
  721. t.Errorf("#%d.%d index = %d, want %d", i, j, msg.Index, tt.windex)
  722. }
  723. if msg.Commit != tt.wcommitted {
  724. t.Errorf("#%d.%d commit = %d, want %d", i, j, msg.Commit, tt.wcommitted)
  725. }
  726. }
  727. }
  728. }
  729. // When the leader receives a heartbeat tick, it should
  730. // send a msgApp with m.Index = 0, m.LogTerm=0 and empty entries.
  731. func TestBcastBeat(t *testing.T) {
  732. offset := uint64(1000)
  733. // make a state machine with log.offset = 1000
  734. s := pb.Snapshot{
  735. Index: offset,
  736. Term: 1,
  737. Nodes: []uint64{1, 2, 3},
  738. }
  739. sm := newRaft(1, []uint64{1, 2, 3}, 10, 1)
  740. sm.Term = 1
  741. sm.restore(s)
  742. sm.becomeCandidate()
  743. sm.becomeLeader()
  744. for i := 0; i < 10; i++ {
  745. sm.appendEntry(pb.Entry{})
  746. }
  747. sm.Step(pb.Message{Type: pb.MsgBeat})
  748. msgs := sm.ReadMessages()
  749. if len(msgs) != 2 {
  750. t.Fatalf("len(msgs) = %v, want 1", len(msgs))
  751. }
  752. tomap := map[uint64]bool{2: true, 3: true}
  753. for i, m := range msgs {
  754. if m.Type != pb.MsgApp {
  755. t.Fatalf("#%d: type = %v, want = %v", i, m.Type, pb.MsgApp)
  756. }
  757. if m.Index != 0 {
  758. t.Fatalf("#%d: prevIndex = %d, want %d", i, m.Index, 0)
  759. }
  760. if m.LogTerm != 0 {
  761. t.Fatalf("#%d: prevTerm = %d, want %d", i, m.LogTerm, 0)
  762. }
  763. if !tomap[m.To] {
  764. t.Fatalf("#%d: unexpected to %d", i, m.To)
  765. } else {
  766. delete(tomap, m.To)
  767. }
  768. if len(m.Entries) != 0 {
  769. t.Fatalf("#%d: len(entries) = %d, want 0", i, len(m.Entries))
  770. }
  771. }
  772. }
  773. // tests the output of the statemachine when receiving msgBeat
  774. func TestRecvMsgBeat(t *testing.T) {
  775. tests := []struct {
  776. state StateType
  777. wMsg int
  778. }{
  779. {StateLeader, 2},
  780. // candidate and follower should ignore msgBeat
  781. {StateCandidate, 0},
  782. {StateFollower, 0},
  783. }
  784. for i, tt := range tests {
  785. sm := newRaft(1, []uint64{1, 2, 3}, 10, 1)
  786. sm.raftLog = &raftLog{ents: []pb.Entry{{}, {Term: 0}, {Term: 1}}}
  787. sm.Term = 1
  788. sm.state = tt.state
  789. switch tt.state {
  790. case StateFollower:
  791. sm.step = stepFollower
  792. case StateCandidate:
  793. sm.step = stepCandidate
  794. case StateLeader:
  795. sm.step = stepLeader
  796. }
  797. sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
  798. msgs := sm.ReadMessages()
  799. if len(msgs) != tt.wMsg {
  800. t.Errorf("%d: len(msgs) = %d, want %d", i, len(msgs), tt.wMsg)
  801. }
  802. for _, m := range msgs {
  803. if m.Type != pb.MsgApp {
  804. t.Errorf("%d: msg.type = %v, want %v", i, m.Type, pb.MsgApp)
  805. }
  806. }
  807. }
  808. }
  809. func TestRestore(t *testing.T) {
  810. s := pb.Snapshot{
  811. Index: 11, // magic number
  812. Term: 11, // magic number
  813. Nodes: []uint64{1, 2, 3},
  814. }
  815. sm := newRaft(1, []uint64{1, 2}, 10, 1)
  816. if ok := sm.restore(s); !ok {
  817. t.Fatal("restore fail, want succeed")
  818. }
  819. if sm.raftLog.lastIndex() != s.Index {
  820. t.Errorf("log.lastIndex = %d, want %d", sm.raftLog.lastIndex(), s.Index)
  821. }
  822. if sm.raftLog.term(s.Index) != s.Term {
  823. t.Errorf("log.lastTerm = %d, want %d", sm.raftLog.term(s.Index), s.Term)
  824. }
  825. sg := sm.nodes()
  826. sort.Sort(uint64Slice(sg))
  827. if !reflect.DeepEqual(sg, s.Nodes) {
  828. t.Errorf("sm.Nodes = %+v, want %+v", sg, s.Nodes)
  829. }
  830. if !reflect.DeepEqual(sm.raftLog.snapshot, s) {
  831. t.Errorf("snapshot = %+v, want %+v", sm.raftLog.snapshot, s)
  832. }
  833. if ok := sm.restore(s); ok {
  834. t.Fatal("restore succeed, want fail")
  835. }
  836. }
  837. func TestProvideSnap(t *testing.T) {
  838. s := pb.Snapshot{
  839. Index: 11, // magic number
  840. Term: 11, // magic number
  841. Nodes: []uint64{1, 2},
  842. }
  843. sm := newRaft(1, []uint64{1}, 10, 1)
  844. // restore the statemachin from a snapshot
  845. // so it has a compacted log and a snapshot
  846. sm.restore(s)
  847. sm.becomeCandidate()
  848. sm.becomeLeader()
  849. // force set the next of node 1, so that
  850. // node 1 needs a snapshot
  851. sm.prs[2].next = sm.raftLog.offset
  852. sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs[2].next - 1, Reject: true})
  853. msgs := sm.ReadMessages()
  854. if len(msgs) != 1 {
  855. t.Fatalf("len(msgs) = %d, want 1", len(msgs))
  856. }
  857. m := msgs[0]
  858. if m.Type != pb.MsgSnap {
  859. t.Errorf("m.Type = %v, want %v", m.Type, pb.MsgSnap)
  860. }
  861. }
  862. func TestRestoreFromSnapMsg(t *testing.T) {
  863. s := pb.Snapshot{
  864. Index: 11, // magic number
  865. Term: 11, // magic number
  866. Nodes: []uint64{1, 2},
  867. }
  868. m := pb.Message{Type: pb.MsgSnap, From: 1, Term: 2, Snapshot: s}
  869. sm := newRaft(2, []uint64{1, 2}, 10, 1)
  870. sm.Step(m)
  871. if !reflect.DeepEqual(sm.raftLog.snapshot, s) {
  872. t.Errorf("snapshot = %+v, want %+v", sm.raftLog.snapshot, s)
  873. }
  874. }
  875. func TestSlowNodeRestore(t *testing.T) {
  876. nt := newNetwork(nil, nil, nil)
  877. nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  878. nt.isolate(3)
  879. for j := 0; j <= 100; j++ {
  880. nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
  881. }
  882. lead := nt.peers[1].(*raft)
  883. nextEnts(lead)
  884. lead.compact(lead.raftLog.applied, lead.nodes(), nil)
  885. nt.recover()
  886. // trigger a snapshot
  887. nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
  888. follower := nt.peers[3].(*raft)
  889. if !reflect.DeepEqual(follower.raftLog.snapshot, lead.raftLog.snapshot) {
  890. t.Errorf("follower.snap = %+v, want %+v", follower.raftLog.snapshot, lead.raftLog.snapshot)
  891. }
  892. // trigger a commit
  893. nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
  894. if follower.raftLog.committed != lead.raftLog.committed {
  895. t.Errorf("follower.comitted = %d, want %d", follower.raftLog.committed, lead.raftLog.committed)
  896. }
  897. }
  898. // TestStepConfig tests that when raft step msgProp in EntryConfChange type,
  899. // it appends the entry to log and sets pendingConf to be true.
  900. func TestStepConfig(t *testing.T) {
  901. // a raft that cannot make progress
  902. r := newRaft(1, []uint64{1, 2}, 10, 1)
  903. r.becomeCandidate()
  904. r.becomeLeader()
  905. index := r.raftLog.lastIndex()
  906. r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
  907. if g := r.raftLog.lastIndex(); g != index+1 {
  908. t.Errorf("index = %d, want %d", g, index+1)
  909. }
  910. if r.pendingConf != true {
  911. t.Errorf("pendingConf = %v, want true", r.pendingConf)
  912. }
  913. }
  914. // TestStepIgnoreConfig tests that if raft step the second msgProp in
  915. // EntryConfChange type when the first one is uncommitted, the node will deny
  916. // the proposal and keep its original state.
  917. func TestStepIgnoreConfig(t *testing.T) {
  918. // a raft that cannot make progress
  919. r := newRaft(1, []uint64{1, 2}, 10, 1)
  920. r.becomeCandidate()
  921. r.becomeLeader()
  922. r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
  923. index := r.raftLog.lastIndex()
  924. pendingConf := r.pendingConf
  925. r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
  926. if g := r.raftLog.lastIndex(); g != index {
  927. t.Errorf("index = %d, want %d", g, index)
  928. }
  929. if r.pendingConf != pendingConf {
  930. t.Errorf("pendingConf = %v, want %v", r.pendingConf, pendingConf)
  931. }
  932. }
  933. // TestRecoverPendingConfig tests that new leader recovers its pendingConf flag
  934. // based on uncommitted entries.
  935. func TestRecoverPendingConfig(t *testing.T) {
  936. tests := []struct {
  937. entType pb.EntryType
  938. wpending bool
  939. }{
  940. {pb.EntryNormal, false},
  941. {pb.EntryConfChange, true},
  942. }
  943. for i, tt := range tests {
  944. r := newRaft(1, []uint64{1, 2}, 10, 1)
  945. r.appendEntry(pb.Entry{Type: tt.entType})
  946. r.becomeCandidate()
  947. r.becomeLeader()
  948. if r.pendingConf != tt.wpending {
  949. t.Errorf("#%d: pendingConf = %v, want %v", i, r.pendingConf, tt.wpending)
  950. }
  951. }
  952. }
  953. // TestRecoverDoublePendingConfig tests that new leader will panic if
  954. // there exist two uncommitted config entries.
  955. func TestRecoverDoublePendingConfig(t *testing.T) {
  956. func() {
  957. defer func() {
  958. if err := recover(); err == nil {
  959. t.Errorf("expect panic, but nothing happens")
  960. }
  961. }()
  962. r := newRaft(1, []uint64{1, 2}, 10, 1)
  963. r.appendEntry(pb.Entry{Type: pb.EntryConfChange})
  964. r.appendEntry(pb.Entry{Type: pb.EntryConfChange})
  965. r.becomeCandidate()
  966. r.becomeLeader()
  967. }()
  968. }
  969. // TestAddNode tests that addNode could update pendingConf and nodes correctly.
  970. func TestAddNode(t *testing.T) {
  971. r := newRaft(1, []uint64{1}, 10, 1)
  972. r.pendingConf = true
  973. r.addNode(2)
  974. if r.pendingConf != false {
  975. t.Errorf("pendingConf = %v, want false", r.pendingConf)
  976. }
  977. nodes := r.nodes()
  978. sort.Sort(uint64Slice(nodes))
  979. wnodes := []uint64{1, 2}
  980. if !reflect.DeepEqual(nodes, wnodes) {
  981. t.Errorf("nodes = %v, want %v", nodes, wnodes)
  982. }
  983. }
  984. // TestRemoveNode tests that removeNode could update pendingConf, nodes and
  985. // and removed list correctly.
  986. func TestRemoveNode(t *testing.T) {
  987. r := newRaft(1, []uint64{1, 2}, 10, 1)
  988. r.pendingConf = true
  989. r.removeNode(2)
  990. if r.pendingConf != false {
  991. t.Errorf("pendingConf = %v, want false", r.pendingConf)
  992. }
  993. w := []uint64{1}
  994. if g := r.nodes(); !reflect.DeepEqual(g, w) {
  995. t.Errorf("nodes = %v, want %v", g, w)
  996. }
  997. }
  998. func TestPromotable(t *testing.T) {
  999. id := uint64(1)
  1000. tests := []struct {
  1001. peers []uint64
  1002. wp bool
  1003. }{
  1004. {[]uint64{1}, true},
  1005. {[]uint64{1, 2, 3}, true},
  1006. {[]uint64{}, false},
  1007. {[]uint64{2, 3}, false},
  1008. }
  1009. for i, tt := range tests {
  1010. r := &raft{id: id, prs: make(map[uint64]*progress)}
  1011. for _, id := range tt.peers {
  1012. r.prs[id] = &progress{}
  1013. }
  1014. if g := r.promotable(); g != tt.wp {
  1015. t.Errorf("#%d: promotable = %v, want %v", i, g, tt.wp)
  1016. }
  1017. }
  1018. }
  1019. func ents(terms ...uint64) *raft {
  1020. ents := []pb.Entry{{}}
  1021. for _, term := range terms {
  1022. ents = append(ents, pb.Entry{Term: term})
  1023. }
  1024. sm := &raft{raftLog: &raftLog{ents: ents}}
  1025. sm.reset(0)
  1026. return sm
  1027. }
  1028. type network struct {
  1029. peers map[uint64]Interface
  1030. dropm map[connem]float64
  1031. ignorem map[pb.MessageType]bool
  1032. }
  1033. // newNetwork initializes a network from peers.
  1034. // A nil node will be replaced with a new *stateMachine.
  1035. // A *stateMachine will get its k, id.
  1036. // When using stateMachine, the address list is always [0, n).
  1037. func newNetwork(peers ...Interface) *network {
  1038. size := len(peers)
  1039. peerAddrs := make([]uint64, size)
  1040. for i := 0; i < size; i++ {
  1041. peerAddrs[i] = 1 + uint64(i)
  1042. }
  1043. npeers := make(map[uint64]Interface, size)
  1044. for i, p := range peers {
  1045. id := peerAddrs[i]
  1046. switch v := p.(type) {
  1047. case nil:
  1048. sm := newRaft(id, peerAddrs, 10, 1)
  1049. npeers[id] = sm
  1050. case *raft:
  1051. v.id = id
  1052. v.prs = make(map[uint64]*progress)
  1053. for i := 0; i < size; i++ {
  1054. v.prs[peerAddrs[i]] = &progress{}
  1055. }
  1056. v.reset(0)
  1057. npeers[id] = v
  1058. case *blackHole:
  1059. npeers[id] = v
  1060. default:
  1061. panic(fmt.Sprintf("unexpected state machine type: %T", p))
  1062. }
  1063. }
  1064. return &network{
  1065. peers: npeers,
  1066. dropm: make(map[connem]float64),
  1067. ignorem: make(map[pb.MessageType]bool),
  1068. }
  1069. }
  1070. func (nw *network) send(msgs ...pb.Message) {
  1071. for len(msgs) > 0 {
  1072. m := msgs[0]
  1073. p := nw.peers[m.To]
  1074. p.Step(m)
  1075. msgs = append(msgs[1:], nw.filter(p.ReadMessages())...)
  1076. }
  1077. }
  1078. func (nw *network) drop(from, to uint64, perc float64) {
  1079. nw.dropm[connem{from, to}] = perc
  1080. }
  1081. func (nw *network) cut(one, other uint64) {
  1082. nw.drop(one, other, 1)
  1083. nw.drop(other, one, 1)
  1084. }
  1085. func (nw *network) isolate(id uint64) {
  1086. for i := 0; i < len(nw.peers); i++ {
  1087. nid := uint64(i) + 1
  1088. if nid != id {
  1089. nw.drop(id, nid, 1.0)
  1090. nw.drop(nid, id, 1.0)
  1091. }
  1092. }
  1093. }
  1094. func (nw *network) ignore(t pb.MessageType) {
  1095. nw.ignorem[t] = true
  1096. }
  1097. func (nw *network) recover() {
  1098. nw.dropm = make(map[connem]float64)
  1099. nw.ignorem = make(map[pb.MessageType]bool)
  1100. }
  1101. func (nw *network) filter(msgs []pb.Message) []pb.Message {
  1102. mm := []pb.Message{}
  1103. for _, m := range msgs {
  1104. if nw.ignorem[m.Type] {
  1105. continue
  1106. }
  1107. switch m.Type {
  1108. case pb.MsgHup:
  1109. // hups never go over the network, so don't drop them but panic
  1110. panic("unexpected msgHup")
  1111. default:
  1112. perc := nw.dropm[connem{m.From, m.To}]
  1113. if n := rand.Float64(); n < perc {
  1114. continue
  1115. }
  1116. }
  1117. mm = append(mm, m)
  1118. }
  1119. return mm
  1120. }
  1121. type connem struct {
  1122. from, to uint64
  1123. }
  1124. type blackHole struct{}
  1125. func (blackHole) Step(pb.Message) error { return nil }
  1126. func (blackHole) ReadMessages() []pb.Message { return nil }
  1127. var nopStepper = &blackHole{}