raft_test.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723
  1. package raft
  2. import (
  3. "bytes"
  4. "math/rand"
  5. "testing"
  6. )
  7. func TestLeaderElection(t *testing.T) {
  8. tests := []struct {
  9. *network
  10. state stateType
  11. }{
  12. {newNetwork(nil, nil, nil), stateLeader},
  13. {newNetwork(nil, nil, nopStepper), stateLeader},
  14. {newNetwork(nil, nopStepper, nopStepper), stateCandidate},
  15. {newNetwork(nil, nopStepper, nopStepper, nil), stateCandidate},
  16. {newNetwork(nil, nopStepper, nopStepper, nil, nil), stateLeader},
  17. // three logs further along than 0
  18. {newNetwork(nil, ents(1), ents(2), ents(1, 3), nil), stateFollower},
  19. // logs converge
  20. {newNetwork(ents(1), nil, ents(2), ents(1), nil), stateLeader},
  21. }
  22. for i, tt := range tests {
  23. tt.send(Message{To: 0, Type: msgHup})
  24. sm := tt.network.peers[0].(*stateMachine)
  25. if sm.state != tt.state {
  26. t.Errorf("#%d: state = %s, want %s", i, sm.state, tt.state)
  27. }
  28. if g := sm.term; g != 1 {
  29. t.Errorf("#%d: term = %d, want %d", i, g, 1)
  30. }
  31. }
  32. }
  33. func TestLogReplication(t *testing.T) {
  34. tests := []struct {
  35. *network
  36. msgs []Message
  37. wcommitted int
  38. }{
  39. {
  40. newNetwork(nil, nil, nil),
  41. []Message{
  42. {To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("somedata")}}},
  43. },
  44. 1,
  45. },
  46. {
  47. newNetwork(nil, nil, nil),
  48. []Message{
  49. {To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("somedata")}}},
  50. {To: 1, Type: msgHup},
  51. {To: 1, Type: msgProp, Entries: []Entry{{Data: []byte("somedata")}}},
  52. },
  53. 2,
  54. },
  55. }
  56. for i, tt := range tests {
  57. tt.send(Message{To: 0, Type: msgHup})
  58. for _, m := range tt.msgs {
  59. tt.send(m)
  60. }
  61. for j, x := range tt.network.peers {
  62. sm := x.(*stateMachine)
  63. if sm.log.committed != tt.wcommitted {
  64. t.Errorf("#%d.%d: committed = %d, want %d", i, j, sm.log.committed, tt.wcommitted)
  65. }
  66. ents := sm.nextEnts()
  67. props := make([]Message, 0)
  68. for _, m := range tt.msgs {
  69. if m.Type == msgProp {
  70. props = append(props, m)
  71. }
  72. }
  73. for k, m := range props {
  74. if !bytes.Equal(ents[k].Data, m.Entries[0].Data) {
  75. t.Errorf("#%d.%d: data = %d, want %d", i, j, ents[k].Data, m.Entries[0].Data)
  76. }
  77. }
  78. }
  79. }
  80. }
  81. func TestSingleNodeCommit(t *testing.T) {
  82. tt := newNetwork(nil)
  83. tt.send(Message{To: 0, Type: msgHup})
  84. tt.send(Message{To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}})
  85. tt.send(Message{To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}})
  86. sm := tt.peers[0].(*stateMachine)
  87. if sm.log.committed != 2 {
  88. t.Errorf("committed = %d, want %d", sm.log.committed, 2)
  89. }
  90. }
  91. func TestCannotCommitWithoutNewTermEntry(t *testing.T) {
  92. tt := newNetwork(nil, nil, nil, nil, nil)
  93. tt.send(Message{To: 0, Type: msgHup})
  94. // 0 cannot reach 2,3,4
  95. tt.cut(0, 2)
  96. tt.cut(0, 3)
  97. tt.cut(0, 4)
  98. tt.send(Message{To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}})
  99. tt.send(Message{To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}})
  100. sm := tt.peers[0].(*stateMachine)
  101. if sm.log.committed != 0 {
  102. t.Errorf("committed = %d, want %d", sm.log.committed, 0)
  103. }
  104. // network recovery
  105. tt.recover()
  106. // elect 1 as the new leader with term 2
  107. tt.send(Message{To: 1, Type: msgHup})
  108. // send out a heartbeat
  109. tt.send(Message{To: 1, Type: msgBeat})
  110. // no log entries from previous term should be committed
  111. sm = tt.peers[1].(*stateMachine)
  112. if sm.log.committed != 0 {
  113. t.Errorf("committed = %d, want %d", sm.log.committed, 0)
  114. }
  115. // after append a entry from the current term, all entries
  116. // should be committed
  117. tt.send(Message{To: 1, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}})
  118. if sm.log.committed != 3 {
  119. t.Errorf("committed = %d, want %d", sm.log.committed, 3)
  120. }
  121. }
  122. func TestDuelingCandidates(t *testing.T) {
  123. a := newStateMachine(0, nil) // k, addr are set later
  124. c := newStateMachine(0, nil)
  125. tt := newNetwork(a, nil, c)
  126. tt.cut(0, 2)
  127. tt.send(Message{To: 0, Type: msgHup})
  128. tt.send(Message{To: 2, Type: msgHup})
  129. tt.recover()
  130. tt.send(Message{To: 2, Type: msgHup})
  131. tests := []struct {
  132. sm *stateMachine
  133. state stateType
  134. term int
  135. }{
  136. {a, stateFollower, 2},
  137. {c, stateLeader, 2},
  138. }
  139. for i, tt := range tests {
  140. if g := tt.sm.state; g != tt.state {
  141. t.Errorf("#%d: state = %s, want %s", i, g, tt.state)
  142. }
  143. if g := tt.sm.term; g != tt.term {
  144. t.Errorf("#%d: term = %d, want %d", i, g, tt.term)
  145. }
  146. }
  147. base := ltoa(newLog())
  148. for i, p := range tt.peers {
  149. if sm, ok := p.(*stateMachine); ok {
  150. l := ltoa(sm.log)
  151. if g := diffu(base, l); g != "" {
  152. t.Errorf("#%d: diff:\n%s", i, g)
  153. }
  154. } else {
  155. t.Logf("#%d: empty log", i)
  156. }
  157. }
  158. }
  159. func TestCandidateConcede(t *testing.T) {
  160. tt := newNetwork(nil, nil, nil)
  161. tt.isolate(0)
  162. tt.send(Message{To: 0, Type: msgHup})
  163. tt.send(Message{To: 2, Type: msgHup})
  164. // heal the partition
  165. tt.recover()
  166. data := []byte("force follower")
  167. // send a proposal to 2 to flush out a msgApp to 0
  168. tt.send(Message{To: 2, Type: msgProp, Entries: []Entry{{Data: data}}})
  169. a := tt.peers[0].(*stateMachine)
  170. if g := a.state; g != stateFollower {
  171. t.Errorf("state = %s, want %s", g, stateFollower)
  172. }
  173. if g := a.term; g != 1 {
  174. t.Errorf("term = %d, want %d", g, 1)
  175. }
  176. wantLog := ltoa(&log{ents: []Entry{{}, {Term: 1, Data: data}}, committed: 1})
  177. for i, p := range tt.peers {
  178. if sm, ok := p.(*stateMachine); ok {
  179. l := ltoa(sm.log)
  180. if g := diffu(wantLog, l); g != "" {
  181. t.Errorf("#%d: diff:\n%s", i, g)
  182. }
  183. } else {
  184. t.Logf("#%d: empty log", i)
  185. }
  186. }
  187. }
  188. func TestSingleNodeCandidate(t *testing.T) {
  189. tt := newNetwork(nil)
  190. tt.send(Message{To: 0, Type: msgHup})
  191. sm := tt.peers[0].(*stateMachine)
  192. if sm.state != stateLeader {
  193. t.Errorf("state = %d, want %d", sm.state, stateLeader)
  194. }
  195. }
  196. func TestOldMessages(t *testing.T) {
  197. tt := newNetwork(nil, nil, nil)
  198. // make 0 leader @ term 3
  199. tt.send(Message{To: 0, Type: msgHup})
  200. tt.send(Message{To: 1, Type: msgHup})
  201. tt.send(Message{To: 0, Type: msgHup})
  202. // pretend we're an old leader trying to make progress
  203. tt.send(Message{To: 0, Type: msgApp, Term: 1, Entries: []Entry{{Term: 1}}})
  204. base := ltoa(newLog())
  205. for i, p := range tt.peers {
  206. if sm, ok := p.(*stateMachine); ok {
  207. l := ltoa(sm.log)
  208. if g := diffu(base, l); g != "" {
  209. t.Errorf("#%d: diff:\n%s", i, g)
  210. }
  211. } else {
  212. t.Logf("#%d: empty log", i)
  213. }
  214. }
  215. }
  216. // TestOldMessagesReply - optimization - reply with new term.
  217. func TestProposal(t *testing.T) {
  218. tests := []struct {
  219. *network
  220. success bool
  221. }{
  222. {newNetwork(nil, nil, nil), true},
  223. {newNetwork(nil, nil, nopStepper), true},
  224. {newNetwork(nil, nopStepper, nopStepper), false},
  225. {newNetwork(nil, nopStepper, nopStepper, nil), false},
  226. {newNetwork(nil, nopStepper, nopStepper, nil, nil), true},
  227. }
  228. for i, tt := range tests {
  229. send := func(m Message) {
  230. defer func() {
  231. // only recover is we expect it to panic so
  232. // panics we don't expect go up.
  233. if !tt.success {
  234. e := recover()
  235. if e != nil {
  236. t.Logf("#%d: err: %s", i, e)
  237. }
  238. }
  239. }()
  240. tt.send(m)
  241. }
  242. data := []byte("somedata")
  243. // promote 0 the leader
  244. send(Message{To: 0, Type: msgHup})
  245. send(Message{To: 0, Type: msgProp, Entries: []Entry{{Data: data}}})
  246. wantLog := newLog()
  247. if tt.success {
  248. wantLog = &log{ents: []Entry{{}, {Term: 1, Data: data}}, committed: 1}
  249. }
  250. base := ltoa(wantLog)
  251. for i, p := range tt.peers {
  252. if sm, ok := p.(*stateMachine); ok {
  253. l := ltoa(sm.log)
  254. if g := diffu(base, l); g != "" {
  255. t.Errorf("#%d: diff:\n%s", i, g)
  256. }
  257. } else {
  258. t.Logf("#%d: empty log", i)
  259. }
  260. }
  261. sm := tt.network.peers[0].(*stateMachine)
  262. if g := sm.term; g != 1 {
  263. t.Errorf("#%d: term = %d, want %d", i, g, 1)
  264. }
  265. }
  266. }
  267. func TestProposalByProxy(t *testing.T) {
  268. data := []byte("somedata")
  269. tests := []*network{
  270. newNetwork(nil, nil, nil),
  271. newNetwork(nil, nil, nopStepper),
  272. }
  273. for i, tt := range tests {
  274. // promote 0 the leader
  275. tt.send(Message{To: 0, Type: msgHup})
  276. // propose via follower
  277. tt.send(Message{To: 1, Type: msgProp, Entries: []Entry{{Data: []byte("somedata")}}})
  278. wantLog := &log{ents: []Entry{{}, {Term: 1, Data: data}}, committed: 1}
  279. base := ltoa(wantLog)
  280. for i, p := range tt.peers {
  281. if sm, ok := p.(*stateMachine); ok {
  282. l := ltoa(sm.log)
  283. if g := diffu(base, l); g != "" {
  284. t.Errorf("#%d: diff:\n%s", i, g)
  285. }
  286. } else {
  287. t.Logf("#%d: empty log", i)
  288. }
  289. }
  290. sm := tt.peers[0].(*stateMachine)
  291. if g := sm.term; g != 1 {
  292. t.Errorf("#%d: term = %d, want %d", i, g, 1)
  293. }
  294. }
  295. }
  296. func TestCommit(t *testing.T) {
  297. tests := []struct {
  298. matches []int
  299. logs []Entry
  300. smTerm int
  301. w int
  302. }{
  303. // single
  304. {[]int{1}, []Entry{{}, {Term: 1}}, 1, 1},
  305. {[]int{1}, []Entry{{}, {Term: 1}}, 2, 0},
  306. {[]int{2}, []Entry{{}, {Term: 1}, {Term: 2}}, 2, 2},
  307. {[]int{1}, []Entry{{}, {Term: 2}}, 2, 1},
  308. // odd
  309. {[]int{2, 1, 1}, []Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
  310. {[]int{2, 1, 1}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
  311. {[]int{2, 1, 2}, []Entry{{}, {Term: 1}, {Term: 2}}, 2, 2},
  312. {[]int{2, 1, 2}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
  313. // even
  314. {[]int{2, 1, 1, 1}, []Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
  315. {[]int{2, 1, 1, 1}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
  316. {[]int{2, 1, 1, 2}, []Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
  317. {[]int{2, 1, 1, 2}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
  318. {[]int{2, 1, 2, 2}, []Entry{{}, {Term: 1}, {Term: 2}}, 2, 2},
  319. {[]int{2, 1, 2, 2}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
  320. }
  321. for i, tt := range tests {
  322. ins := make(map[int]*index)
  323. for j := 0; j < len(tt.matches); j++ {
  324. ins[j] = &index{tt.matches[j], tt.matches[j] + 1}
  325. }
  326. sm := &stateMachine{log: &log{ents: tt.logs}, ins: ins, term: tt.smTerm}
  327. sm.maybeCommit()
  328. if g := sm.log.committed; g != tt.w {
  329. t.Errorf("#%d: committed = %d, want %d", i, g, tt.w)
  330. }
  331. }
  332. }
  333. func TestVote(t *testing.T) {
  334. tests := []struct {
  335. state stateType
  336. i, term int
  337. voteFor int
  338. w int
  339. }{
  340. {stateFollower, 0, 0, none, -1},
  341. {stateFollower, 0, 1, none, -1},
  342. {stateFollower, 0, 2, none, -1},
  343. {stateFollower, 0, 3, none, 2},
  344. {stateFollower, 1, 0, none, -1},
  345. {stateFollower, 1, 1, none, -1},
  346. {stateFollower, 1, 2, none, -1},
  347. {stateFollower, 1, 3, none, 2},
  348. {stateFollower, 2, 0, none, -1},
  349. {stateFollower, 2, 1, none, -1},
  350. {stateFollower, 2, 2, none, 2},
  351. {stateFollower, 2, 3, none, 2},
  352. {stateFollower, 3, 0, none, -1},
  353. {stateFollower, 3, 1, none, -1},
  354. {stateFollower, 3, 2, none, 2},
  355. {stateFollower, 3, 3, none, 2},
  356. {stateFollower, 3, 2, 1, 2},
  357. {stateFollower, 3, 2, 0, -1},
  358. {stateLeader, 3, 3, 0, -1},
  359. {stateCandidate, 3, 3, 0, -1},
  360. }
  361. for i, tt := range tests {
  362. called := false
  363. sm := &stateMachine{
  364. state: tt.state,
  365. vote: tt.voteFor,
  366. log: &log{ents: []Entry{{}, {Term: 2}, {Term: 2}}},
  367. }
  368. sm.Step(Message{Type: msgVote, From: 1, Index: tt.i, LogTerm: tt.term})
  369. for _, m := range sm.Msgs() {
  370. called = true
  371. if m.Index != tt.w {
  372. t.Errorf("#%d, m.Index = %d, want %d", i, m.Index, tt.w)
  373. }
  374. }
  375. if !called {
  376. t.Fatal("#%d: not called", i)
  377. }
  378. }
  379. }
  380. func TestStateTransition(t *testing.T) {
  381. tests := []struct {
  382. from stateType
  383. to stateType
  384. wallow bool
  385. wterm int
  386. wlead int
  387. }{
  388. {stateFollower, stateFollower, true, 1, none},
  389. {stateFollower, stateCandidate, true, 1, none},
  390. {stateFollower, stateLeader, false, -1, none},
  391. {stateCandidate, stateFollower, true, 0, none},
  392. {stateCandidate, stateCandidate, true, 1, none},
  393. {stateCandidate, stateLeader, true, 0, 0},
  394. {stateLeader, stateFollower, true, 1, none},
  395. {stateLeader, stateCandidate, false, 1, none},
  396. {stateLeader, stateLeader, true, 0, 0},
  397. }
  398. for i, tt := range tests {
  399. func() {
  400. defer func() {
  401. if r := recover(); r != nil {
  402. if tt.wallow == true {
  403. t.Errorf("%d: allow = %v, want %v", i, false, true)
  404. }
  405. }
  406. }()
  407. sm := newStateMachine(0, []int{0})
  408. sm.state = tt.from
  409. switch tt.to {
  410. case stateFollower:
  411. sm.becomeFollower(tt.wterm, tt.wlead)
  412. case stateCandidate:
  413. sm.becomeCandidate()
  414. case stateLeader:
  415. sm.becomeLeader()
  416. }
  417. if sm.term != tt.wterm {
  418. t.Errorf("%d: term = %d, want %d", i, sm.term, tt.wterm)
  419. }
  420. if sm.lead != tt.wlead {
  421. t.Errorf("%d: lead = %d, want %d", i, sm.lead, tt.wlead)
  422. }
  423. }()
  424. }
  425. }
  426. func TestConf(t *testing.T) {
  427. sm := newStateMachine(0, []int{0})
  428. sm.becomeCandidate()
  429. sm.becomeLeader()
  430. sm.Step(Message{Type: msgProp, Entries: []Entry{{Type: config}}})
  431. if sm.log.lastIndex() != 1 {
  432. t.Errorf("lastindex = %d, want %d", sm.log.lastIndex(), 1)
  433. }
  434. if !sm.pendingConf {
  435. t.Errorf("pendingConf = %v, want %v", sm.pendingConf, true)
  436. }
  437. if sm.log.ents[1].Type != config {
  438. t.Errorf("type = %d, want %d", sm.log.ents[1].Type, config)
  439. }
  440. // deny the second configuration change request if there is a pending one
  441. sm.Step(Message{Type: msgProp, Entries: []Entry{{Type: config}}})
  442. if sm.log.lastIndex() != 1 {
  443. t.Errorf("lastindex = %d, want %d", sm.log.lastIndex(), 1)
  444. }
  445. }
  446. func TestAllServerStepdown(t *testing.T) {
  447. tests := []stateType{stateFollower, stateCandidate, stateLeader}
  448. want := struct {
  449. state stateType
  450. term int
  451. index int
  452. }{stateFollower, 3, 1}
  453. tmsgTypes := [...]messageType{msgVote, msgApp}
  454. tterm := 3
  455. for i, tt := range tests {
  456. sm := newStateMachine(0, []int{0, 1, 2})
  457. switch tt {
  458. case stateFollower:
  459. sm.becomeFollower(1, 0)
  460. case stateCandidate:
  461. sm.becomeCandidate()
  462. case stateLeader:
  463. sm.becomeCandidate()
  464. sm.becomeLeader()
  465. }
  466. for j, msgType := range tmsgTypes {
  467. sm.Step(Message{Type: msgType, Term: tterm, LogTerm: tterm})
  468. if sm.state != want.state {
  469. t.Errorf("#%d.%d state = %v , want %v", i, j, sm.state, want.state)
  470. }
  471. if sm.term != want.term {
  472. t.Errorf("#%d.%d term = %v , want %v", i, j, sm.term, want.term)
  473. }
  474. if len(sm.log.ents) != want.index {
  475. t.Errorf("#%d.%d index = %v , want %v", i, j, len(sm.log.ents), want.index)
  476. }
  477. }
  478. }
  479. }
  480. func TestLeaderAppResp(t *testing.T) {
  481. tests := []struct {
  482. index int
  483. wmsgNum int
  484. windex int
  485. wcommitted int
  486. }{
  487. {-1, 1, 1, 0}, // bad resp; leader does not commit; reply with log entries
  488. {2, 2, 2, 2}, // good resp; leader commits; broadcast with commit index
  489. }
  490. for i, tt := range tests {
  491. // sm term is 1 after it becomes the leader.
  492. // thus the last log term must be 1 to be committed.
  493. sm := newStateMachine(0, []int{0, 1, 2})
  494. sm.log = &log{ents: []Entry{{}, {Term: 0}, {Term: 1}}}
  495. sm.becomeCandidate()
  496. sm.becomeLeader()
  497. sm.Step(Message{From: 1, Type: msgAppResp, Index: tt.index, Term: sm.term})
  498. msgs := sm.Msgs()
  499. if len(msgs) != tt.wmsgNum {
  500. t.Errorf("#%d msgNum = %d, want %d", i, len(msgs), tt.wmsgNum)
  501. }
  502. for j, msg := range msgs {
  503. if msg.Index != tt.windex {
  504. t.Errorf("#%d.%d index = %d, want %d", i, j, msg.Index, tt.windex)
  505. }
  506. if msg.Commit != tt.wcommitted {
  507. t.Errorf("#%d.%d commit = %d, want %d", i, j, msg.Commit, tt.wcommitted)
  508. }
  509. }
  510. }
  511. }
  512. // tests the output of the statemachine when receiving msgBeat
  513. func TestRecvMsgBeat(t *testing.T) {
  514. tests := []struct {
  515. state stateType
  516. wMsg int
  517. }{
  518. {stateLeader, 2},
  519. // candidate and follower should ignore msgBeat
  520. {stateCandidate, 0},
  521. {stateFollower, 0},
  522. }
  523. for i, tt := range tests {
  524. sm := newStateMachine(0, []int{0, 1, 2})
  525. sm.log = &log{ents: []Entry{{}, {Term: 0}, {Term: 1}}}
  526. sm.term = 1
  527. sm.state = tt.state
  528. sm.Step(Message{Type: msgBeat})
  529. msgs := sm.Msgs()
  530. if len(msgs) != tt.wMsg {
  531. t.Errorf("%d: len(msgs) = %d, want %d", i, len(msgs), tt.wMsg)
  532. }
  533. for _, m := range msgs {
  534. if m.Type != msgApp {
  535. t.Errorf("%d: msg.type = %v, want %v", m.Type, msgApp)
  536. }
  537. }
  538. }
  539. }
  540. func ents(terms ...int) *stateMachine {
  541. ents := []Entry{{}}
  542. for _, term := range terms {
  543. ents = append(ents, Entry{Term: term})
  544. }
  545. sm := &stateMachine{log: &log{ents: ents}}
  546. sm.reset()
  547. return sm
  548. }
  549. type network struct {
  550. peers []Interface
  551. dropm map[connem]float64
  552. }
  553. // newNetwork initializes a network from peers. A nil node will be replaced
  554. // with a new *stateMachine. A *stateMachine will get its k, addr.
  555. func newNetwork(peers ...Interface) *network {
  556. peerAddrs := make([]int, len(peers))
  557. for i := range peers {
  558. peerAddrs[i] = i
  559. }
  560. for addr, p := range peers {
  561. switch v := p.(type) {
  562. case nil:
  563. sm := newStateMachine(addr, peerAddrs)
  564. peers[addr] = sm
  565. case *stateMachine:
  566. v.addr = addr
  567. v.ins = make(map[int]*index)
  568. for i := range peerAddrs {
  569. v.ins[i] = &index{}
  570. }
  571. v.reset()
  572. }
  573. }
  574. return &network{peers: peers, dropm: make(map[connem]float64)}
  575. }
  576. func (nw *network) send(msgs ...Message) {
  577. for len(msgs) > 0 {
  578. m := msgs[0]
  579. p := nw.peers[m.To]
  580. p.Step(m)
  581. msgs = append(msgs[1:], nw.filter(p.Msgs())...)
  582. }
  583. }
  584. func (nw *network) drop(from, to int, perc float64) {
  585. nw.dropm[connem{from, to}] = perc
  586. }
  587. func (nw *network) cut(one, other int) {
  588. nw.drop(one, other, 1)
  589. nw.drop(other, one, 1)
  590. }
  591. func (nw *network) isolate(addr int) {
  592. for i := 0; i < len(nw.peers); i++ {
  593. if i != addr {
  594. nw.drop(addr, i, 1.0)
  595. nw.drop(i, addr, 1.0)
  596. }
  597. }
  598. }
  599. func (nw *network) recover() {
  600. nw.dropm = make(map[connem]float64)
  601. }
  602. func (nw *network) filter(msgs []Message) []Message {
  603. mm := make([]Message, 0)
  604. for _, m := range msgs {
  605. switch m.Type {
  606. case msgHup:
  607. // hups never go over the network, so don't drop them but panic
  608. panic("unexpected msgHup")
  609. default:
  610. perc := nw.dropm[connem{m.From, m.To}]
  611. if n := rand.Float64(); n < perc {
  612. continue
  613. }
  614. }
  615. mm = append(mm, m)
  616. }
  617. return mm
  618. }
  619. type connem struct {
  620. from, to int
  621. }
  622. type blackHole struct{}
  623. func (blackHole) Step(Message) {}
  624. func (blackHole) Msgs() []Message { return nil }
  625. var nopStepper = &blackHole{}