raft_test.go 14 KB


  1. package raft
  2. import (
  3. "bytes"
  4. "math/rand"
  5. "testing"
  6. )
  7. func TestLeaderElection(t *testing.T) {
  8. tests := []struct {
  9. *network
  10. state stateType
  11. }{
  12. {newNetwork(nil, nil, nil), stateLeader},
  13. {newNetwork(nil, nil, nopStepper), stateLeader},
  14. {newNetwork(nil, nopStepper, nopStepper), stateCandidate},
  15. {newNetwork(nil, nopStepper, nopStepper, nil), stateCandidate},
  16. {newNetwork(nil, nopStepper, nopStepper, nil, nil), stateLeader},
  17. // three logs further along than 0
  18. {newNetwork(nil, ents(1), ents(2), ents(1, 3), nil), stateFollower},
  19. // logs converge
  20. {newNetwork(ents(1), nil, ents(2), ents(1), nil), stateLeader},
  21. }
  22. for i, tt := range tests {
  23. tt.send(Message{To: 0, Type: msgHup})
  24. sm := tt.network.peers[0].(*stateMachine)
  25. if sm.state != tt.state {
  26. t.Errorf("#%d: state = %s, want %s", i, sm.state, tt.state)
  27. }
  28. if g := sm.term; g != 1 {
  29. t.Errorf("#%d: term = %d, want %d", i, g, 1)
  30. }
  31. }
  32. }
  33. func TestLogReplication(t *testing.T) {
  34. tests := []struct {
  35. *network
  36. msgs []Message
  37. wcommitted int
  38. }{
  39. {
  40. newNetwork(nil, nil, nil),
  41. []Message{
  42. {To: 0, Type: msgProp, Data: []byte("somedata")},
  43. },
  44. 1,
  45. },
  46. {
  47. newNetwork(nil, nil, nil),
  48. []Message{
  49. {To: 0, Type: msgProp, Data: []byte("somedata")},
  50. {To: 1, Type: msgHup},
  51. {To: 1, Type: msgProp, Data: []byte("somedata")},
  52. },
  53. 2,
  54. },
  55. }
  56. for i, tt := range tests {
  57. tt.send(Message{To: 0, Type: msgHup})
  58. for _, m := range tt.msgs {
  59. tt.send(m)
  60. }
  61. for j, x := range tt.network.peers {
  62. sm := x.(*stateMachine)
  63. if sm.log.committed != tt.wcommitted {
  64. t.Errorf("#%d.%d: committed = %d, want %d", i, j, sm.log.committed, tt.wcommitted)
  65. }
  66. ents := sm.nextEnts()
  67. props := make([]Message, 0)
  68. for _, m := range tt.msgs {
  69. if m.Type == msgProp {
  70. props = append(props, m)
  71. }
  72. }
  73. for k, m := range props {
  74. if !bytes.Equal(ents[k].Data, m.Data) {
  75. t.Errorf("#%d.%d: data = %d, want %d", i, j, ents[k].Data, m.Data)
  76. }
  77. }
  78. }
  79. }
  80. }
  81. func TestSingleNodeCommit(t *testing.T) {
  82. tt := newNetwork(nil)
  83. tt.send(Message{To: 0, Type: msgHup})
  84. tt.send(Message{To: 0, Type: msgProp, Data: []byte("some data")})
  85. tt.send(Message{To: 0, Type: msgProp, Data: []byte("some data")})
  86. sm := tt.peers[0].(*stateMachine)
  87. if sm.log.committed != 2 {
  88. t.Errorf("committed = %d, want %d", sm.log.committed, 2)
  89. }
  90. }
  91. func TestCannotCommitWithoutNewTermEntry(t *testing.T) {
  92. tt := newNetwork(nil, nil, nil, nil, nil)
  93. tt.send(Message{To: 0, Type: msgHup})
  94. // 0 cannot reach 2,3,4
  95. tt.cut(0, 2)
  96. tt.cut(0, 3)
  97. tt.cut(0, 4)
  98. tt.send(Message{To: 0, Type: msgProp, Data: []byte("some data")})
  99. tt.send(Message{To: 0, Type: msgProp, Data: []byte("some data")})
  100. sm := tt.peers[0].(*stateMachine)
  101. if sm.log.committed != 0 {
  102. t.Errorf("committed = %d, want %d", sm.log.committed, 0)
  103. }
  104. // network recovery
  105. tt.recover()
  106. // elect 1 as the new leader with term 2
  107. tt.send(Message{To: 1, Type: msgHup})
  108. // send out a heartbeat
  109. tt.send(Message{To: 1, Type: msgBeat})
  110. // no log entries from previous term should be committed
  111. sm = tt.peers[1].(*stateMachine)
  112. if sm.log.committed != 0 {
  113. t.Errorf("committed = %d, want %d", sm.log.committed, 0)
  114. }
  115. // after append a entry from the current term, all entries
  116. // should be committed
  117. tt.send(Message{To: 1, Type: msgProp, Data: []byte("some data")})
  118. if sm.log.committed != 3 {
  119. t.Errorf("committed = %d, want %d", sm.log.committed, 3)
  120. }
  121. }
  122. func TestDuelingCandidates(t *testing.T) {
  123. a := newStateMachine(0, 0) // k, addr are set later
  124. c := newStateMachine(0, 0)
  125. tt := newNetwork(a, nil, c)
  126. tt.cut(0, 2)
  127. tt.send(Message{To: 0, Type: msgHup})
  128. tt.send(Message{To: 2, Type: msgHup})
  129. tt.recover()
  130. tt.send(Message{To: 2, Type: msgHup})
  131. tests := []struct {
  132. sm *stateMachine
  133. state stateType
  134. term int
  135. }{
  136. {a, stateFollower, 2},
  137. {c, stateLeader, 2},
  138. }
  139. for i, tt := range tests {
  140. if g := tt.sm.state; g != tt.state {
  141. t.Errorf("#%d: state = %s, want %s", i, g, tt.state)
  142. }
  143. if g := tt.sm.term; g != tt.term {
  144. t.Errorf("#%d: term = %d, want %d", i, g, tt.term)
  145. }
  146. }
  147. base := ltoa(newLog())
  148. for i, p := range tt.peers {
  149. if sm, ok := p.(*stateMachine); ok {
  150. l := ltoa(sm.log)
  151. if g := diffu(base, l); g != "" {
  152. t.Errorf("#%d: diff:\n%s", i, g)
  153. }
  154. } else {
  155. t.Logf("#%d: empty log", i)
  156. }
  157. }
  158. }
  159. func TestCandidateConcede(t *testing.T) {
  160. tt := newNetwork(nil, nil, nil)
  161. tt.isolate(0)
  162. tt.send(Message{To: 0, Type: msgHup})
  163. tt.send(Message{To: 2, Type: msgHup})
  164. // heal the partition
  165. tt.recover()
  166. data := []byte("force follower")
  167. // send a proposal to 2 to flush out a msgApp to 0
  168. tt.send(Message{To: 2, Type: msgProp, Data: data})
  169. a := tt.peers[0].(*stateMachine)
  170. if g := a.state; g != stateFollower {
  171. t.Errorf("state = %s, want %s", g, stateFollower)
  172. }
  173. if g := a.term; g != 1 {
  174. t.Errorf("term = %d, want %d", g, 1)
  175. }
  176. wantLog := ltoa(&log{ents: []Entry{{}, {Term: 1, Data: data}}, committed: 1})
  177. for i, p := range tt.peers {
  178. if sm, ok := p.(*stateMachine); ok {
  179. l := ltoa(sm.log)
  180. if g := diffu(wantLog, l); g != "" {
  181. t.Errorf("#%d: diff:\n%s", i, g)
  182. }
  183. } else {
  184. t.Logf("#%d: empty log", i)
  185. }
  186. }
  187. }
  188. func TestSingleNodeCandidate(t *testing.T) {
  189. tt := newNetwork(nil)
  190. tt.send(Message{To: 0, Type: msgHup})
  191. sm := tt.peers[0].(*stateMachine)
  192. if sm.state != stateLeader {
  193. t.Errorf("state = %d, want %d", sm.state, stateLeader)
  194. }
  195. }
  196. func TestOldMessages(t *testing.T) {
  197. tt := newNetwork(nil, nil, nil)
  198. // make 0 leader @ term 3
  199. tt.send(Message{To: 0, Type: msgHup})
  200. tt.send(Message{To: 1, Type: msgHup})
  201. tt.send(Message{To: 0, Type: msgHup})
  202. // pretend we're an old leader trying to make progress
  203. tt.send(Message{To: 0, Type: msgApp, Term: 1, Entries: []Entry{{Term: 1}}})
  204. base := ltoa(newLog())
  205. for i, p := range tt.peers {
  206. if sm, ok := p.(*stateMachine); ok {
  207. l := ltoa(sm.log)
  208. if g := diffu(base, l); g != "" {
  209. t.Errorf("#%d: diff:\n%s", i, g)
  210. }
  211. } else {
  212. t.Logf("#%d: empty log", i)
  213. }
  214. }
  215. }
  216. // TestOldMessagesReply - optimization - reply with new term.
  217. func TestProposal(t *testing.T) {
  218. tests := []struct {
  219. *network
  220. success bool
  221. }{
  222. {newNetwork(nil, nil, nil), true},
  223. {newNetwork(nil, nil, nopStepper), true},
  224. {newNetwork(nil, nopStepper, nopStepper), false},
  225. {newNetwork(nil, nopStepper, nopStepper, nil), false},
  226. {newNetwork(nil, nopStepper, nopStepper, nil, nil), true},
  227. }
  228. for i, tt := range tests {
  229. send := func(m Message) {
  230. defer func() {
  231. // only recover is we expect it to panic so
  232. // panics we don't expect go up.
  233. if !tt.success {
  234. e := recover()
  235. if e != nil {
  236. t.Logf("#%d: err: %s", i, e)
  237. }
  238. }
  239. }()
  240. tt.send(m)
  241. }
  242. data := []byte("somedata")
  243. // promote 0 the leader
  244. send(Message{To: 0, Type: msgHup})
  245. send(Message{To: 0, Type: msgProp, Data: data})
  246. wantLog := newLog()
  247. if tt.success {
  248. wantLog = &log{ents: []Entry{{}, {Term: 1, Data: data}}, committed: 1}
  249. }
  250. base := ltoa(wantLog)
  251. for i, p := range tt.peers {
  252. if sm, ok := p.(*stateMachine); ok {
  253. l := ltoa(sm.log)
  254. if g := diffu(base, l); g != "" {
  255. t.Errorf("#%d: diff:\n%s", i, g)
  256. }
  257. } else {
  258. t.Logf("#%d: empty log", i)
  259. }
  260. }
  261. sm := tt.network.peers[0].(*stateMachine)
  262. if g := sm.term; g != 1 {
  263. t.Errorf("#%d: term = %d, want %d", i, g, 1)
  264. }
  265. }
  266. }
  267. func TestProposalByProxy(t *testing.T) {
  268. data := []byte("somedata")
  269. tests := []*network{
  270. newNetwork(nil, nil, nil),
  271. newNetwork(nil, nil, nopStepper),
  272. }
  273. for i, tt := range tests {
  274. // promote 0 the leader
  275. tt.send(Message{To: 0, Type: msgHup})
  276. // propose via follower
  277. tt.send(Message{To: 1, Type: msgProp, Data: []byte("somedata")})
  278. wantLog := &log{ents: []Entry{{}, {Term: 1, Data: data}}, committed: 1}
  279. base := ltoa(wantLog)
  280. for i, p := range tt.peers {
  281. if sm, ok := p.(*stateMachine); ok {
  282. l := ltoa(sm.log)
  283. if g := diffu(base, l); g != "" {
  284. t.Errorf("#%d: diff:\n%s", i, g)
  285. }
  286. } else {
  287. t.Logf("#%d: empty log", i)
  288. }
  289. }
  290. sm := tt.peers[0].(*stateMachine)
  291. if g := sm.term; g != 1 {
  292. t.Errorf("#%d: term = %d, want %d", i, g, 1)
  293. }
  294. }
  295. }
  296. func TestCommit(t *testing.T) {
  297. tests := []struct {
  298. matches []int
  299. logs []Entry
  300. smTerm int
  301. w int
  302. }{
  303. // odd
  304. {[]int{2, 1, 1}, []Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
  305. {[]int{2, 1, 1}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
  306. {[]int{2, 1, 2}, []Entry{{}, {Term: 1}, {Term: 2}}, 2, 2},
  307. {[]int{2, 1, 2}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
  308. // even
  309. {[]int{2, 1, 1, 1}, []Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
  310. {[]int{2, 1, 1, 1}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
  311. {[]int{2, 1, 1, 2}, []Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
  312. {[]int{2, 1, 1, 2}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
  313. {[]int{2, 1, 2, 2}, []Entry{{}, {Term: 1}, {Term: 2}}, 2, 2},
  314. {[]int{2, 1, 2, 2}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
  315. }
  316. for i, tt := range tests {
  317. ins := make([]index, len(tt.matches))
  318. for j := 0; j < len(ins); j++ {
  319. ins[j] = index{tt.matches[j], tt.matches[j] + 1}
  320. }
  321. sm := &stateMachine{log: &log{ents: tt.logs}, ins: ins, k: len(ins), term: tt.smTerm}
  322. sm.maybeCommit()
  323. if g := sm.log.committed; g != tt.w {
  324. t.Errorf("#%d: committed = %d, want %d", i, g, tt.w)
  325. }
  326. }
  327. }
  328. func TestVote(t *testing.T) {
  329. tests := []struct {
  330. state stateType
  331. i, term int
  332. voteFor int
  333. w int
  334. }{
  335. {stateFollower, 0, 0, none, -1},
  336. {stateFollower, 0, 1, none, -1},
  337. {stateFollower, 0, 2, none, -1},
  338. {stateFollower, 0, 3, none, 2},
  339. {stateFollower, 1, 0, none, -1},
  340. {stateFollower, 1, 1, none, -1},
  341. {stateFollower, 1, 2, none, -1},
  342. {stateFollower, 1, 3, none, 2},
  343. {stateFollower, 2, 0, none, -1},
  344. {stateFollower, 2, 1, none, -1},
  345. {stateFollower, 2, 2, none, 2},
  346. {stateFollower, 2, 3, none, 2},
  347. {stateFollower, 3, 0, none, -1},
  348. {stateFollower, 3, 1, none, -1},
  349. {stateFollower, 3, 2, none, 2},
  350. {stateFollower, 3, 3, none, 2},
  351. {stateFollower, 3, 2, 1, 2},
  352. {stateFollower, 3, 2, 0, -1},
  353. {stateLeader, 3, 3, 0, -1},
  354. {stateCandidate, 3, 3, 0, -1},
  355. }
  356. for i, tt := range tests {
  357. called := false
  358. sm := &stateMachine{
  359. state: tt.state,
  360. vote: tt.voteFor,
  361. log: &log{ents: []Entry{{}, {Term: 2}, {Term: 2}}},
  362. }
  363. sm.Step(Message{Type: msgVote, From: 1, Index: tt.i, LogTerm: tt.term})
  364. for _, m := range sm.Msgs() {
  365. called = true
  366. if m.Index != tt.w {
  367. t.Errorf("#%d, m.Index = %d, want %d", i, m.Index, tt.w)
  368. }
  369. }
  370. if !called {
  371. t.Fatal("#%d: not called", i)
  372. }
  373. }
  374. }
  375. func TestAllServerStepdown(t *testing.T) {
  376. tests := []stateType{stateFollower, stateCandidate, stateLeader}
  377. want := struct {
  378. state stateType
  379. term int
  380. index int
  381. }{stateFollower, 3, 1}
  382. tmsgTypes := [...]messageType{msgVote, msgApp}
  383. tterm := 3
  384. for i, tt := range tests {
  385. sm := newStateMachine(3, 0)
  386. switch tt {
  387. case stateFollower:
  388. sm.becomeFollower(1, 0)
  389. case stateCandidate:
  390. sm.becomeCandidate()
  391. case stateLeader:
  392. sm.becomeCandidate()
  393. sm.becomeLeader()
  394. }
  395. for j, msgType := range tmsgTypes {
  396. sm.Step(Message{Type: msgType, Term: tterm, LogTerm: tterm})
  397. if sm.state != want.state {
  398. t.Errorf("#%d.%d state = %v , want %v", i, j, sm.state, want.state)
  399. }
  400. if sm.term != want.term {
  401. t.Errorf("#%d.%d term = %v , want %v", i, j, sm.term, want.term)
  402. }
  403. if len(sm.log.ents) != want.index {
  404. t.Errorf("#%d.%d index = %v , want %v", i, j, len(sm.log.ents), want.index)
  405. }
  406. }
  407. }
  408. }
  409. func TestLeaderAppResp(t *testing.T) {
  410. tests := []struct {
  411. index int
  412. wmsgNum int
  413. windex int
  414. wcommitted int
  415. }{
  416. {-1, 1, 1, 0}, // bad resp; leader does not commit; reply with log entries
  417. {2, 2, 2, 2}, // good resp; leader commits; broadcast with commit index
  418. }
  419. for i, tt := range tests {
  420. // sm term is 1 after it becomes the leader.
  421. // thus the last log term must be 1 to be committed.
  422. sm := &stateMachine{addr: 0, k: 3, log: &log{ents: []Entry{{}, {Term: 0}, {Term: 1}}}}
  423. sm.becomeCandidate()
  424. sm.becomeLeader()
  425. sm.Step(Message{From: 1, Type: msgAppResp, Index: tt.index, Term: sm.term})
  426. msgs := sm.Msgs()
  427. if len(msgs) != tt.wmsgNum {
  428. t.Errorf("#%d msgNum = %d, want %d", i, len(msgs), tt.wmsgNum)
  429. }
  430. for j, msg := range msgs {
  431. if msg.Index != tt.windex {
  432. t.Errorf("#%d.%d index = %d, want %d", i, j, msg.Index, tt.windex)
  433. }
  434. if msg.Commit != tt.wcommitted {
  435. t.Errorf("#%d.%d commit = %d, want %d", i, j, msg.Commit, tt.wcommitted)
  436. }
  437. }
  438. }
  439. }
  440. func ents(terms ...int) *stateMachine {
  441. ents := []Entry{{}}
  442. for _, term := range terms {
  443. ents = append(ents, Entry{Term: term})
  444. }
  445. sm := &stateMachine{log: &log{ents: ents}}
  446. sm.reset()
  447. return sm
  448. }
  449. type network struct {
  450. peers []Interface
  451. dropm map[connem]float64
  452. }
  453. // newNetwork initializes a network from peers. A nil node will be replaced
  454. // with a new *stateMachine. A *stateMachine will get its k, addr.
  455. func newNetwork(peers ...Interface) *network {
  456. for addr, p := range peers {
  457. switch v := p.(type) {
  458. case nil:
  459. sm := newStateMachine(len(peers), addr)
  460. peers[addr] = sm
  461. case *stateMachine:
  462. v.k = len(peers)
  463. v.addr = addr
  464. }
  465. }
  466. return &network{peers: peers, dropm: make(map[connem]float64)}
  467. }
  468. func (nw *network) send(msgs ...Message) {
  469. for len(msgs) > 0 {
  470. m := msgs[0]
  471. p := nw.peers[m.To]
  472. p.Step(m)
  473. msgs = append(msgs[1:], nw.filter(p.Msgs())...)
  474. }
  475. }
  476. func (nw *network) drop(from, to int, perc float64) {
  477. nw.dropm[connem{from, to}] = perc
  478. }
  479. func (nw *network) cut(one, other int) {
  480. nw.drop(one, other, 1)
  481. nw.drop(other, one, 1)
  482. }
  483. func (nw *network) isolate(addr int) {
  484. for i := 0; i < len(nw.peers); i++ {
  485. if i != addr {
  486. nw.drop(addr, i, 1.0)
  487. nw.drop(i, addr, 1.0)
  488. }
  489. }
  490. }
  491. func (nw *network) recover() {
  492. nw.dropm = make(map[connem]float64)
  493. }
  494. func (nw *network) filter(msgs []Message) []Message {
  495. mm := make([]Message, 0)
  496. for _, m := range msgs {
  497. switch m.Type {
  498. case msgHup:
  499. // hups never go over the network, so don't drop them but panic
  500. panic("unexpected msgHup")
  501. default:
  502. perc := nw.dropm[connem{m.From, m.To}]
  503. if n := rand.Float64(); n < perc {
  504. continue
  505. }
  506. }
  507. mm = append(mm, m)
  508. }
  509. return mm
  510. }
  511. type connem struct {
  512. from, to int
  513. }
  514. type blackHole struct{}
  515. func (blackHole) Step(Message) {}
  516. func (blackHole) Msgs() []Message { return nil }
  517. var nopStepper = &blackHole{}