raft_test.go 13 KB


  1. package raft
  2. import (
  3. "bytes"
  4. "math/rand"
  5. "testing"
  6. )
  7. func TestLeaderElection(t *testing.T) {
  8. tests := []struct {
  9. *network
  10. state stateType
  11. }{
  12. {newNetwork(nil, nil, nil), stateLeader},
  13. {newNetwork(nil, nil, nopStepper), stateLeader},
  14. {newNetwork(nil, nopStepper, nopStepper), stateCandidate},
  15. {newNetwork(nil, nopStepper, nopStepper, nil), stateCandidate},
  16. {newNetwork(nil, nopStepper, nopStepper, nil, nil), stateLeader},
  17. // three logs further along than 0
  18. {newNetwork(nil, ents(1), ents(2), ents(1, 3), nil), stateFollower},
  19. // logs converge
  20. {newNetwork(ents(1), nil, ents(2), ents(1), nil), stateLeader},
  21. }
  22. for i, tt := range tests {
  23. tt.send(Message{To: 0, Type: msgHup})
  24. sm := tt.network.peers[0].(*stateMachine)
  25. if sm.state != tt.state {
  26. t.Errorf("#%d: state = %s, want %s", i, sm.state, tt.state)
  27. }
  28. if g := sm.term; g != 1 {
  29. t.Errorf("#%d: term = %d, want %d", i, g, 1)
  30. }
  31. }
  32. }
  33. func TestLogReplication(t *testing.T) {
  34. tests := []struct {
  35. *network
  36. msgs []Message
  37. wcommitted int
  38. }{
  39. {
  40. newNetwork(nil, nil, nil),
  41. []Message{
  42. {To: 0, Type: msgProp, Data: []byte("somedata")},
  43. },
  44. 1,
  45. },
  46. {
  47. newNetwork(nil, nil, nil),
  48. []Message{
  49. {To: 0, Type: msgProp, Data: []byte("somedata")},
  50. {To: 1, Type: msgHup},
  51. {To: 1, Type: msgProp, Data: []byte("somedata")},
  52. },
  53. 2,
  54. },
  55. }
  56. for i, tt := range tests {
  57. tt.send(Message{To: 0, Type: msgHup})
  58. for _, m := range tt.msgs {
  59. tt.send(m)
  60. }
  61. for j, x := range tt.network.peers {
  62. sm := x.(*stateMachine)
  63. if sm.log.committed != tt.wcommitted {
  64. t.Errorf("#%d.%d: committed = %d, want %d", i, j, sm.log.committed, tt.wcommitted)
  65. }
  66. ents := sm.nextEnts()
  67. props := make([]Message, 0)
  68. for _, m := range tt.msgs {
  69. if m.Type == msgProp {
  70. props = append(props, m)
  71. }
  72. }
  73. for k, m := range props {
  74. if !bytes.Equal(ents[k].Data, m.Data) {
  75. t.Errorf("#%d.%d: data = %d, want %d", i, j, ents[k].Data, m.Data)
  76. }
  77. }
  78. }
  79. }
  80. }
  81. func TestSingleNodeCommit(t *testing.T) {
  82. tt := newNetwork(nil)
  83. tt.send(Message{To: 0, Type: msgHup})
  84. tt.send(Message{To: 0, Type: msgProp, Data: []byte("some data")})
  85. tt.send(Message{To: 0, Type: msgProp, Data: []byte("some data")})
  86. sm := tt.peers[0].(*stateMachine)
  87. if sm.log.committed != 2 {
  88. t.Errorf("committed = %d, want %d", sm.log.committed, 2)
  89. }
  90. }
  91. func TestDuelingCandidates(t *testing.T) {
  92. a := newStateMachine(0, 0) // k, addr are set later
  93. c := newStateMachine(0, 0)
  94. tt := newNetwork(a, nil, c)
  95. tt.drop(0, 2, 1.0)
  96. tt.drop(2, 0, 1.0)
  97. tt.send(Message{To: 0, Type: msgHup})
  98. tt.send(Message{To: 2, Type: msgHup})
  99. tt.drop(0, 2, 0)
  100. tt.drop(2, 0, 0)
  101. tt.send(Message{To: 2, Type: msgHup})
  102. tests := []struct {
  103. sm *stateMachine
  104. state stateType
  105. term int
  106. }{
  107. {a, stateFollower, 2},
  108. {c, stateLeader, 2},
  109. }
  110. for i, tt := range tests {
  111. if g := tt.sm.state; g != tt.state {
  112. t.Errorf("#%d: state = %s, want %s", i, g, tt.state)
  113. }
  114. if g := tt.sm.term; g != tt.term {
  115. t.Errorf("#%d: term = %d, want %d", i, g, tt.term)
  116. }
  117. }
  118. base := ltoa(newLog())
  119. for i, p := range tt.peers {
  120. if sm, ok := p.(*stateMachine); ok {
  121. l := ltoa(sm.log)
  122. if g := diffu(base, l); g != "" {
  123. t.Errorf("#%d: diff:\n%s", i, g)
  124. }
  125. } else {
  126. t.Logf("#%d: empty log", i)
  127. }
  128. }
  129. }
  130. func TestCandidateConcede(t *testing.T) {
  131. tt := newNetwork(nil, nil, nil)
  132. tt.isolate(0)
  133. tt.send(Message{To: 0, Type: msgHup})
  134. tt.send(Message{To: 2, Type: msgHup})
  135. // heal the partition
  136. tt.recover()
  137. data := []byte("force follower")
  138. // send a proposal to 2 to flush out a msgApp to 0
  139. tt.send(Message{To: 2, Type: msgProp, Data: data})
  140. a := tt.peers[0].(*stateMachine)
  141. if g := a.state; g != stateFollower {
  142. t.Errorf("state = %s, want %s", g, stateFollower)
  143. }
  144. if g := a.term; g != 1 {
  145. t.Errorf("term = %d, want %d", g, 1)
  146. }
  147. wantLog := ltoa(&log{ents: []Entry{{}, {Term: 1, Data: data}}, committed: 1})
  148. for i, p := range tt.peers {
  149. if sm, ok := p.(*stateMachine); ok {
  150. l := ltoa(sm.log)
  151. if g := diffu(wantLog, l); g != "" {
  152. t.Errorf("#%d: diff:\n%s", i, g)
  153. }
  154. } else {
  155. t.Logf("#%d: empty log", i)
  156. }
  157. }
  158. }
  159. func TestSingleNodeCandidate(t *testing.T) {
  160. tt := newNetwork(nil)
  161. tt.send(Message{To: 0, Type: msgHup})
  162. sm := tt.peers[0].(*stateMachine)
  163. if sm.state != stateLeader {
  164. t.Errorf("state = %d, want %d", sm.state, stateLeader)
  165. }
  166. }
  167. func TestOldMessages(t *testing.T) {
  168. tt := newNetwork(nil, nil, nil)
  169. // make 0 leader @ term 3
  170. tt.send(Message{To: 0, Type: msgHup})
  171. tt.send(Message{To: 1, Type: msgHup})
  172. tt.send(Message{To: 0, Type: msgHup})
  173. // pretend we're an old leader trying to make progress
  174. tt.send(Message{To: 0, Type: msgApp, Term: 1, Entries: []Entry{{Term: 1}}})
  175. base := ltoa(newLog())
  176. for i, p := range tt.peers {
  177. if sm, ok := p.(*stateMachine); ok {
  178. l := ltoa(sm.log)
  179. if g := diffu(base, l); g != "" {
  180. t.Errorf("#%d: diff:\n%s", i, g)
  181. }
  182. } else {
  183. t.Logf("#%d: empty log", i)
  184. }
  185. }
  186. }
  187. // TestOldMessagesReply - optimization - reply with new term.
  188. func TestProposal(t *testing.T) {
  189. tests := []struct {
  190. *network
  191. success bool
  192. }{
  193. {newNetwork(nil, nil, nil), true},
  194. {newNetwork(nil, nil, nopStepper), true},
  195. {newNetwork(nil, nopStepper, nopStepper), false},
  196. {newNetwork(nil, nopStepper, nopStepper, nil), false},
  197. {newNetwork(nil, nopStepper, nopStepper, nil, nil), true},
  198. }
  199. for i, tt := range tests {
  200. send := func(m Message) {
  201. defer func() {
  202. // only recover is we expect it to panic so
  203. // panics we don't expect go up.
  204. if !tt.success {
  205. e := recover()
  206. if e != nil {
  207. t.Logf("#%d: err: %s", i, e)
  208. }
  209. }
  210. }()
  211. tt.send(m)
  212. }
  213. data := []byte("somedata")
  214. // promote 0 the leader
  215. send(Message{To: 0, Type: msgHup})
  216. send(Message{To: 0, Type: msgProp, Data: data})
  217. wantLog := newLog()
  218. if tt.success {
  219. wantLog = &log{ents: []Entry{{}, {Term: 1, Data: data}}, committed: 1}
  220. }
  221. base := ltoa(wantLog)
  222. for i, p := range tt.peers {
  223. if sm, ok := p.(*stateMachine); ok {
  224. l := ltoa(sm.log)
  225. if g := diffu(base, l); g != "" {
  226. t.Errorf("#%d: diff:\n%s", i, g)
  227. }
  228. } else {
  229. t.Logf("#%d: empty log", i)
  230. }
  231. }
  232. sm := tt.network.peers[0].(*stateMachine)
  233. if g := sm.term; g != 1 {
  234. t.Errorf("#%d: term = %d, want %d", i, g, 1)
  235. }
  236. }
  237. }
  238. func TestProposalByProxy(t *testing.T) {
  239. data := []byte("somedata")
  240. tests := []*network{
  241. newNetwork(nil, nil, nil),
  242. newNetwork(nil, nil, nopStepper),
  243. }
  244. for i, tt := range tests {
  245. // promote 0 the leader
  246. tt.send(Message{To: 0, Type: msgHup})
  247. // propose via follower
  248. tt.send(Message{To: 1, Type: msgProp, Data: []byte("somedata")})
  249. wantLog := &log{ents: []Entry{{}, {Term: 1, Data: data}}, committed: 1}
  250. base := ltoa(wantLog)
  251. for i, p := range tt.peers {
  252. if sm, ok := p.(*stateMachine); ok {
  253. l := ltoa(sm.log)
  254. if g := diffu(base, l); g != "" {
  255. t.Errorf("#%d: diff:\n%s", i, g)
  256. }
  257. } else {
  258. t.Logf("#%d: empty log", i)
  259. }
  260. }
  261. sm := tt.peers[0].(*stateMachine)
  262. if g := sm.term; g != 1 {
  263. t.Errorf("#%d: term = %d, want %d", i, g, 1)
  264. }
  265. }
  266. }
  267. func TestCommit(t *testing.T) {
  268. tests := []struct {
  269. matches []int
  270. logs []Entry
  271. smTerm int
  272. w int
  273. }{
  274. // odd
  275. {[]int{2, 1, 1}, []Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
  276. {[]int{2, 1, 1}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
  277. {[]int{2, 1, 2}, []Entry{{}, {Term: 1}, {Term: 2}}, 2, 2},
  278. {[]int{2, 1, 2}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
  279. // even
  280. {[]int{2, 1, 1, 1}, []Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
  281. {[]int{2, 1, 1, 1}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
  282. {[]int{2, 1, 1, 2}, []Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
  283. {[]int{2, 1, 1, 2}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
  284. {[]int{2, 1, 2, 2}, []Entry{{}, {Term: 1}, {Term: 2}}, 2, 2},
  285. {[]int{2, 1, 2, 2}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
  286. }
  287. for i, tt := range tests {
  288. ins := make([]index, len(tt.matches))
  289. for j := 0; j < len(ins); j++ {
  290. ins[j] = index{tt.matches[j], tt.matches[j] + 1}
  291. }
  292. sm := &stateMachine{log: &log{ents: tt.logs}, ins: ins, k: len(ins), term: tt.smTerm}
  293. sm.maybeCommit()
  294. if g := sm.log.committed; g != tt.w {
  295. t.Errorf("#%d: committed = %d, want %d", i, g, tt.w)
  296. }
  297. }
  298. }
  299. func TestVote(t *testing.T) {
  300. tests := []struct {
  301. state stateType
  302. i, term int
  303. voteFor int
  304. w int
  305. }{
  306. {stateFollower, 0, 0, none, -1},
  307. {stateFollower, 0, 1, none, -1},
  308. {stateFollower, 0, 2, none, -1},
  309. {stateFollower, 0, 3, none, 2},
  310. {stateFollower, 1, 0, none, -1},
  311. {stateFollower, 1, 1, none, -1},
  312. {stateFollower, 1, 2, none, -1},
  313. {stateFollower, 1, 3, none, 2},
  314. {stateFollower, 2, 0, none, -1},
  315. {stateFollower, 2, 1, none, -1},
  316. {stateFollower, 2, 2, none, 2},
  317. {stateFollower, 2, 3, none, 2},
  318. {stateFollower, 3, 0, none, -1},
  319. {stateFollower, 3, 1, none, -1},
  320. {stateFollower, 3, 2, none, 2},
  321. {stateFollower, 3, 3, none, 2},
  322. {stateFollower, 3, 2, 1, 2},
  323. {stateFollower, 3, 2, 0, -1},
  324. {stateLeader, 3, 3, 0, -1},
  325. {stateCandidate, 3, 3, 0, -1},
  326. }
  327. for i, tt := range tests {
  328. called := false
  329. sm := &stateMachine{
  330. state: tt.state,
  331. vote: tt.voteFor,
  332. log: &log{ents: []Entry{{}, {Term: 2}, {Term: 2}}},
  333. }
  334. sm.Step(Message{Type: msgVote, From: 1, Index: tt.i, LogTerm: tt.term})
  335. for _, m := range sm.Msgs() {
  336. called = true
  337. if m.Index != tt.w {
  338. t.Errorf("#%d, m.Index = %d, want %d", i, m.Index, tt.w)
  339. }
  340. }
  341. if !called {
  342. t.Fatal("#%d: not called", i)
  343. }
  344. }
  345. }
  346. func TestAllServerStepdown(t *testing.T) {
  347. tests := []stateType{stateFollower, stateCandidate, stateLeader}
  348. want := struct {
  349. state stateType
  350. term int
  351. index int
  352. }{stateFollower, 3, 1}
  353. tmsgTypes := [...]messageType{msgVote, msgApp}
  354. tterm := 3
  355. for i, tt := range tests {
  356. sm := newStateMachine(3, 0)
  357. switch tt {
  358. case stateFollower:
  359. sm.becomeFollower(1, 0)
  360. case stateCandidate:
  361. sm.becomeCandidate()
  362. case stateLeader:
  363. sm.becomeCandidate()
  364. sm.becomeLeader()
  365. }
  366. for j, msgType := range tmsgTypes {
  367. sm.Step(Message{Type: msgType, Term: tterm, LogTerm: tterm})
  368. if sm.state != want.state {
  369. t.Errorf("#%d.%d state = %v , want %v", i, j, sm.state, want.state)
  370. }
  371. if sm.term != want.term {
  372. t.Errorf("#%d.%d term = %v , want %v", i, j, sm.term, want.term)
  373. }
  374. if len(sm.log.ents) != want.index {
  375. t.Errorf("#%d.%d index = %v , want %v", i, j, len(sm.log.ents), want.index)
  376. }
  377. }
  378. }
  379. }
  380. func TestLeaderAppResp(t *testing.T) {
  381. tests := []struct {
  382. index int
  383. wmsgNum int
  384. windex int
  385. wcommitted int
  386. }{
  387. {-1, 1, 1, 0}, // bad resp; leader does not commit; reply with log entries
  388. {2, 2, 2, 2}, // good resp; leader commits; broadcast with commit index
  389. }
  390. for i, tt := range tests {
  391. // sm term is 1 after it becomes the leader.
  392. // thus the last log term must be 1 to be committed.
  393. sm := &stateMachine{addr: 0, k: 3, log: &log{ents: []Entry{{}, {Term: 0}, {Term: 1}}}}
  394. sm.becomeCandidate()
  395. sm.becomeLeader()
  396. sm.Step(Message{From: 1, Type: msgAppResp, Index: tt.index, Term: sm.term})
  397. msgs := sm.Msgs()
  398. if len(msgs) != tt.wmsgNum {
  399. t.Errorf("#%d msgNum = %d, want %d", i, len(msgs), tt.wmsgNum)
  400. }
  401. for j, msg := range msgs {
  402. if msg.Index != tt.windex {
  403. t.Errorf("#%d.%d index = %d, want %d", i, j, msg.Index, tt.windex)
  404. }
  405. if msg.Commit != tt.wcommitted {
  406. t.Errorf("#%d.%d commit = %d, want %d", i, j, msg.Commit, tt.wcommitted)
  407. }
  408. }
  409. }
  410. }
  411. func ents(terms ...int) *stateMachine {
  412. ents := []Entry{{}}
  413. for _, term := range terms {
  414. ents = append(ents, Entry{Term: term})
  415. }
  416. sm := &stateMachine{log: &log{ents: ents}}
  417. sm.reset()
  418. return sm
  419. }
  420. type network struct {
  421. peers []Interface
  422. dropm map[connem]float64
  423. }
  424. // newNetwork initializes a network from peers. A nil node will be replaced
  425. // with a new *stateMachine. A *stateMachine will get its k, addr.
  426. func newNetwork(peers ...Interface) *network {
  427. for addr, p := range peers {
  428. switch v := p.(type) {
  429. case nil:
  430. sm := newStateMachine(len(peers), addr)
  431. peers[addr] = sm
  432. case *stateMachine:
  433. v.k = len(peers)
  434. v.addr = addr
  435. }
  436. }
  437. return &network{peers: peers, dropm: make(map[connem]float64)}
  438. }
  439. func (nw *network) send(msgs ...Message) {
  440. for len(msgs) > 0 {
  441. m := msgs[0]
  442. p := nw.peers[m.To]
  443. p.Step(m)
  444. msgs = append(msgs[1:], nw.filter(p.Msgs())...)
  445. }
  446. }
  447. func (nw *network) drop(from, to int, perc float64) {
  448. nw.dropm[connem{from, to}] = perc
  449. }
  450. func (nw *network) isolate(addr int) {
  451. for i := 0; i < len(nw.peers); i++ {
  452. if i != addr {
  453. nw.drop(addr, i, 1.0)
  454. nw.drop(i, addr, 1.0)
  455. }
  456. }
  457. }
  458. func (nw *network) recover() {
  459. nw.dropm = make(map[connem]float64)
  460. }
  461. func (nw *network) filter(msgs []Message) []Message {
  462. mm := make([]Message, 0)
  463. for _, m := range msgs {
  464. switch m.Type {
  465. case msgHup:
  466. // hups never go over the network, so don't drop them but panic
  467. panic("unexpected msgHup")
  468. default:
  469. perc := nw.dropm[connem{m.From, m.To}]
  470. if n := rand.Float64(); n < perc {
  471. continue
  472. }
  473. }
  474. mm = append(mm, m)
  475. }
  476. return mm
  477. }
  478. type connem struct {
  479. from, to int
  480. }
  481. type blackHole struct{}
  482. func (blackHole) Step(Message) {}
  483. func (blackHole) Msgs() []Message { return nil }
  484. var nopStepper = &blackHole{}