raft_test.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577
  1. package raft
  2. import (
  3. "bytes"
  4. "fmt"
  5. "reflect"
  6. "testing"
  7. )
  8. func TestLeaderElection(t *testing.T) {
  9. tests := []struct {
  10. *network
  11. state stateType
  12. }{
  13. {newNetwork(nil, nil, nil), stateLeader},
  14. {newNetwork(nil, nil, nopStepper), stateLeader},
  15. {newNetwork(nil, nopStepper, nopStepper), stateCandidate},
  16. {newNetwork(nil, nopStepper, nopStepper, nil), stateCandidate},
  17. {newNetwork(nil, nopStepper, nopStepper, nil, nil), stateLeader},
  18. // three nodes are have logs further along than 0
  19. {
  20. newNetwork(
  21. nil,
  22. &nsm{stateMachine{log: &log{ents: []Entry{{}, {Term: 1}}}}, nil},
  23. &nsm{stateMachine{log: &log{ents: []Entry{{}, {Term: 2}}}}, nil},
  24. &nsm{stateMachine{log: &log{ents: []Entry{{}, {Term: 1}, {Term: 3}}}}, nil},
  25. nil,
  26. ),
  27. stateFollower,
  28. },
  29. // logs converge
  30. {
  31. newNetwork(
  32. &nsm{stateMachine{log: &log{ents: []Entry{{}, {Term: 1}}}}, nil},
  33. nil,
  34. &nsm{stateMachine{log: &log{ents: []Entry{{}, {Term: 2}}}}, nil},
  35. &nsm{stateMachine{log: &log{ents: []Entry{{}, {Term: 1}}}}, nil},
  36. nil,
  37. ),
  38. stateLeader,
  39. },
  40. }
  41. for i, tt := range tests {
  42. tt.Step(Message{To: 0, Type: msgHup})
  43. sm := tt.network.ss[0].(*nsm)
  44. if sm.state != tt.state {
  45. t.Errorf("#%d: state = %s, want %s", i, sm.state, tt.state)
  46. }
  47. if g := sm.term; g != 1 {
  48. t.Errorf("#%d: term = %d, want %d", i, g, 1)
  49. }
  50. }
  51. }
  52. func TestLogReplication(t *testing.T) {
  53. tests := []struct {
  54. *network
  55. msgs []Message
  56. wcommit int
  57. }{
  58. {
  59. newNetwork(nil, nil, nil),
  60. []Message{
  61. Message{To: 0, Type: msgProp, Data: []byte("somedata")},
  62. },
  63. 1,
  64. },
  65. {
  66. newNetwork(nil, nil, nil),
  67. []Message{
  68. Message{To: 0, Type: msgProp, Data: []byte("somedata")},
  69. Message{To: 1, Type: msgHup},
  70. Message{To: 1, Type: msgProp, Data: []byte("somedata")},
  71. },
  72. 2,
  73. },
  74. }
  75. for i, tt := range tests {
  76. tt.tee = stepperFunc(func(m Message) {
  77. t.Logf("#%d: m = %+v", i, m)
  78. })
  79. tt.Step(Message{To: 0, Type: msgHup})
  80. for _, m := range tt.msgs {
  81. tt.Step(m)
  82. }
  83. for j, ism := range tt.ss {
  84. sm := ism.(*nsm)
  85. if sm.log.commit != tt.wcommit {
  86. t.Errorf("#%d.%d: commit = %d, want %d", i, j, sm.log.commit, tt.wcommit)
  87. }
  88. ents := sm.nextEnts()
  89. props := make([]Message, 0)
  90. for _, m := range tt.msgs {
  91. if m.Type == msgProp {
  92. props = append(props, m)
  93. }
  94. }
  95. for k, m := range props {
  96. if !bytes.Equal(ents[k].Data, m.Data) {
  97. t.Errorf("#%d.%d: data = %d, want %d", i, j, ents[k].Data, m.Data)
  98. }
  99. }
  100. }
  101. }
  102. }
  103. func TestDualingCandidates(t *testing.T) {
  104. a := &nsm{stateMachine{log: defaultLog()}, nil}
  105. c := &nsm{stateMachine{log: defaultLog()}, nil}
  106. tt := newNetwork(a, nil, c)
  107. heal := false
  108. next := stepperFunc(func(m Message) {
  109. if heal {
  110. tt.Step(m)
  111. }
  112. })
  113. a.next = next
  114. c.next = next
  115. tt.tee = stepperFunc(func(m Message) {
  116. t.Logf("m = %+v", m)
  117. })
  118. tt.Step(Message{To: 0, Type: msgHup})
  119. tt.Step(Message{To: 2, Type: msgHup})
  120. t.Log("healing")
  121. heal = true
  122. tt.Step(Message{To: 2, Type: msgHup})
  123. tests := []struct {
  124. sm *nsm
  125. state stateType
  126. term int
  127. }{
  128. {a, stateFollower, 2},
  129. {c, stateLeader, 2},
  130. }
  131. for i, tt := range tests {
  132. if g := tt.sm.state; g != tt.state {
  133. t.Errorf("#%d: state = %s, want %s", i, g, tt.state)
  134. }
  135. if g := tt.sm.term; g != tt.term {
  136. t.Errorf("#%d: term = %d, want %d", i, g, tt.term)
  137. }
  138. }
  139. if g := diffLogs(defaultLog().ents, tt.logs()); g != nil {
  140. for _, diff := range g {
  141. t.Errorf("bag log:\n%s", diff)
  142. }
  143. }
  144. }
  145. func TestCandidateConcede(t *testing.T) {
  146. a := &nsm{stateMachine{log: defaultLog()}, nil}
  147. tt := newNetwork(a, nil, nil)
  148. tt.tee = stepperFunc(func(m Message) {
  149. t.Logf("m = %+v", m)
  150. })
  151. a.next = nopStepper
  152. tt.Step(Message{To: 0, Type: msgHup})
  153. tt.Step(Message{To: 2, Type: msgHup})
  154. // heal the partition
  155. a.next = tt
  156. data := []byte("force follower")
  157. // send a proposal to 2 to flush out a msgApp to 0
  158. tt.Step(Message{To: 2, Type: msgProp, Data: data})
  159. if g := a.state; g != stateFollower {
  160. t.Errorf("state = %s, want %s", g, stateFollower)
  161. }
  162. if g := a.term; g != 1 {
  163. t.Errorf("term = %d, want %d", g, 1)
  164. }
  165. wantLog := []Entry{{}, {Term: 1, Data: data}}
  166. if g := diffLogs(wantLog, tt.logs()); g != nil {
  167. for _, diff := range g {
  168. t.Errorf("bag log:\n%s", diff)
  169. }
  170. }
  171. }
  172. func TestOldMessages(t *testing.T) {
  173. tt := newNetwork(nil, nil, nil)
  174. // make 0 leader @ term 3
  175. tt.Step(Message{To: 0, Type: msgHup})
  176. tt.Step(Message{To: 1, Type: msgHup})
  177. tt.Step(Message{To: 0, Type: msgHup})
  178. // pretend we're an old leader trying to make progress
  179. tt.Step(Message{To: 0, Type: msgApp, Term: 1, Entries: []Entry{{Term: 1}}})
  180. if g := diffLogs(defaultLog().ents, tt.logs()); g != nil {
  181. for _, diff := range g {
  182. t.Errorf("bag log:\n%s", diff)
  183. }
  184. }
  185. }
  186. // TestOldMessagesReply - optimization - reply with new term.
  187. func TestProposal(t *testing.T) {
  188. tests := []struct {
  189. *network
  190. success bool
  191. }{
  192. {newNetwork(nil, nil, nil), true},
  193. {newNetwork(nil, nil, nopStepper), true},
  194. {newNetwork(nil, nopStepper, nopStepper), false},
  195. {newNetwork(nil, nopStepper, nopStepper, nil), false},
  196. {newNetwork(nil, nopStepper, nopStepper, nil, nil), true},
  197. }
  198. for i, tt := range tests {
  199. tt.tee = stepperFunc(func(m Message) {
  200. t.Logf("#%d: m = %+v", i, m)
  201. })
  202. step := stepperFunc(func(m Message) {
  203. defer func() {
  204. // only recover is we expect it to panic so
  205. // panics we don't expect go up.
  206. if !tt.success {
  207. e := recover()
  208. if e != nil {
  209. t.Logf("#%d: err: %s", i, e)
  210. }
  211. }
  212. }()
  213. tt.Step(m)
  214. })
  215. data := []byte("somedata")
  216. // promote 0 the leader
  217. step(Message{To: 0, Type: msgHup})
  218. step(Message{To: 0, Type: msgProp, Data: data})
  219. var wantLog []Entry
  220. if tt.success {
  221. wantLog = []Entry{{}, {Term: 1, Data: data}}
  222. } else {
  223. wantLog = defaultLog().ents
  224. }
  225. if g := diffLogs(wantLog, tt.logs()); g != nil {
  226. for _, diff := range g {
  227. t.Errorf("#%d: diff:%s", i, diff)
  228. }
  229. }
  230. sm := tt.network.ss[0].(*nsm)
  231. if g := sm.term; g != 1 {
  232. t.Errorf("#%d: term = %d, want %d", i, g, 1)
  233. }
  234. }
  235. }
  236. func TestProposalByProxy(t *testing.T) {
  237. data := []byte("somedata")
  238. tests := []*network{
  239. newNetwork(nil, nil, nil),
  240. newNetwork(nil, nil, nopStepper),
  241. }
  242. for i, tt := range tests {
  243. tt.tee = stepperFunc(func(m Message) {
  244. t.Logf("#%d: m = %+v", i, m)
  245. })
  246. // promote 0 the leader
  247. tt.Step(Message{To: 0, Type: msgHup})
  248. // propose via follower
  249. tt.Step(Message{To: 1, Type: msgProp, Data: []byte("somedata")})
  250. wantLog := []Entry{{}, {Term: 1, Data: data}}
  251. if g := diffLogs(wantLog, tt.logs()); g != nil {
  252. for _, diff := range g {
  253. t.Errorf("#%d: bad entry: %s", i, diff)
  254. }
  255. }
  256. sm := tt.ss[0].(*nsm)
  257. if g := sm.term; g != 1 {
  258. t.Errorf("#%d: term = %d, want %d", i, g, 1)
  259. }
  260. }
  261. }
  262. func TestCommit(t *testing.T) {
  263. tests := []struct {
  264. matches []int
  265. logs []Entry
  266. smTerm int
  267. w int
  268. }{
  269. // odd
  270. {[]int{2, 1, 1}, []Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
  271. {[]int{2, 1, 1}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
  272. {[]int{2, 1, 2}, []Entry{{}, {Term: 1}, {Term: 2}}, 2, 2},
  273. {[]int{2, 1, 2}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
  274. // even
  275. {[]int{2, 1, 1, 1}, []Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
  276. {[]int{2, 1, 1, 1}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
  277. {[]int{2, 1, 1, 2}, []Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
  278. {[]int{2, 1, 1, 2}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
  279. {[]int{2, 1, 2, 2}, []Entry{{}, {Term: 1}, {Term: 2}}, 2, 2},
  280. {[]int{2, 1, 2, 2}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
  281. }
  282. for i, tt := range tests {
  283. ins := make([]*index, len(tt.matches))
  284. for j := 0; j < len(ins); j++ {
  285. ins[j] = &index{tt.matches[j], tt.matches[j] + 1}
  286. }
  287. sm := &stateMachine{log: &log{ents: tt.logs}, ins: ins, k: len(ins), term: tt.smTerm}
  288. sm.maybeCommit()
  289. if g := sm.log.commit; g != tt.w {
  290. t.Errorf("#%d: commit = %d, want %d", i, g, tt.w)
  291. }
  292. }
  293. }
  294. func TestVote(t *testing.T) {
  295. tests := []struct {
  296. i, term int
  297. w int
  298. }{
  299. {0, 0, -1},
  300. {0, 1, -1},
  301. {0, 2, -1},
  302. {0, 3, 2},
  303. {1, 0, -1},
  304. {1, 1, -1},
  305. {1, 2, -1},
  306. {1, 3, 2},
  307. {2, 0, -1},
  308. {2, 1, -1},
  309. {2, 2, 2},
  310. {2, 3, 2},
  311. {3, 0, -1},
  312. {3, 1, -1},
  313. {3, 2, 2},
  314. {3, 3, 2},
  315. }
  316. for i, tt := range tests {
  317. called := false
  318. sm := &nsm{stateMachine{log: &log{ents: []Entry{{}, {Term: 2}, {Term: 2}}}}, nil}
  319. sm.next = stepperFunc(func(m Message) {
  320. called = true
  321. if m.Index != tt.w {
  322. t.Errorf("#%d, m.Index = %d, want %d", i, m.Index, tt.w)
  323. }
  324. })
  325. sm.Step(Message{Type: msgVote, Index: tt.i, LogTerm: tt.term})
  326. if !called {
  327. t.Fatal("#%d: not called", i)
  328. }
  329. }
  330. }
  331. func TestAllServerStepdown(t *testing.T) {
  332. tests := []stateType{stateFollower, stateCandidate, stateLeader}
  333. want := struct {
  334. state stateType
  335. term int
  336. index int
  337. }{stateFollower, 3, 1}
  338. tmsgTypes := [...]messageType{msgVote, msgApp}
  339. tterm := 3
  340. for i, tt := range tests {
  341. sm := newStateMachine(3, 0)
  342. switch tt {
  343. case stateFollower:
  344. sm.becomeFollower(1, 0)
  345. case stateCandidate:
  346. sm.becomeCandidate()
  347. case stateLeader:
  348. sm.becomeCandidate()
  349. sm.becomeLeader()
  350. }
  351. for j, msgType := range tmsgTypes {
  352. sm.Step(Message{Type: msgType, Term: tterm, LogTerm: tterm})
  353. if sm.state != want.state {
  354. t.Errorf("#%d.%d state = %v , want %v", i, j, sm.state, want.state)
  355. }
  356. if sm.term != want.term {
  357. t.Errorf("#%d.%d term = %v , want %v", i, j, sm.term, want.term)
  358. }
  359. if len(sm.log.ents) != want.index {
  360. t.Errorf("#%d.%d index = %v , want %v", i, j, len(sm.log.ents), want.index)
  361. }
  362. }
  363. }
  364. }
  365. func TestLogDiff(t *testing.T) {
  366. a := []Entry{{}, {Term: 1}, {Term: 2}}
  367. b := []Entry{{}, {Term: 1}, {Term: 2}}
  368. c := []Entry{{}, {Term: 2}}
  369. d := []Entry(nil)
  370. w := []diff{
  371. diff{1, []*Entry{{Term: 1}, {Term: 1}, {Term: 2}, nilLogEntry}},
  372. diff{2, []*Entry{{Term: 2}, {Term: 2}, noEntry, nilLogEntry}},
  373. }
  374. if g := diffLogs(a, [][]Entry{b, c, d}); !reflect.DeepEqual(w, g) {
  375. t.Errorf("g = %s", g)
  376. t.Errorf("want %s", w)
  377. }
  378. }
  379. type network struct {
  380. tee Interface
  381. ss []Interface
  382. }
  383. // newNetwork initializes a network from nodes. A nil node will be replaced
  384. // with a new *stateMachine. A *stateMachine will get its k, addr, and next
  385. // fields set.
  386. func newNetwork(nodes ...Interface) *network {
  387. nt := &network{ss: nodes}
  388. for i, n := range nodes {
  389. switch v := n.(type) {
  390. case nil:
  391. nt.ss[i] = &nsm{*newStateMachine(len(nodes), i), nt}
  392. case *nsm:
  393. v.k = len(nodes)
  394. v.addr = i
  395. if v.next == nil {
  396. v.next = nt
  397. }
  398. default:
  399. nt.ss[i] = v
  400. }
  401. }
  402. return nt
  403. }
  404. func (nt network) Step(m Message) {
  405. if nt.tee != nil {
  406. nt.tee.Step(m)
  407. }
  408. nt.ss[m.To].Step(m)
  409. }
  410. // logs returns all logs in nt prepended with want. If a node is not a
  411. // *stateMachine, its log will be nil.
  412. func (nt network) logs() [][]Entry {
  413. ls := make([][]Entry, len(nt.ss))
  414. for i, node := range nt.ss {
  415. if sm, ok := node.(*nsm); ok {
  416. ls[i] = sm.log.ents
  417. }
  418. }
  419. return ls
  420. }
  421. type diff struct {
  422. i int
  423. ents []*Entry // pointers so they can be nil for N/A
  424. }
  425. var noEntry = &Entry{}
  426. var nilLogEntry = &Entry{}
  427. func (d diff) String() string {
  428. s := fmt.Sprintf("[%d] ", d.i)
  429. for i, e := range d.ents {
  430. switch e {
  431. case nilLogEntry:
  432. s += fmt.Sprintf("o")
  433. case noEntry:
  434. s += fmt.Sprintf("-")
  435. case nil:
  436. s += fmt.Sprintf("<nil>")
  437. default:
  438. s += fmt.Sprintf("<%d:%q>", e.Term, string(e.Data))
  439. }
  440. if i != len(d.ents)-1 {
  441. s += "\t\t"
  442. }
  443. }
  444. return s
  445. }
  446. func diffLogs(base []Entry, logs [][]Entry) []diff {
  447. var (
  448. d []diff
  449. max int
  450. )
  451. logs = append([][]Entry{base}, logs...)
  452. for _, log := range logs {
  453. if l := len(log); l > max {
  454. max = l
  455. }
  456. }
  457. ediff := func(i int) (result []*Entry) {
  458. e := make([]*Entry, len(logs))
  459. found := false
  460. for j, log := range logs {
  461. if log == nil {
  462. e[j] = nilLogEntry
  463. continue
  464. }
  465. if len(log) <= i {
  466. e[j] = noEntry
  467. found = true
  468. continue
  469. }
  470. e[j] = &log[i]
  471. if j > 0 {
  472. switch prev := e[j-1]; {
  473. case prev == nilLogEntry:
  474. case prev == noEntry:
  475. case !reflect.DeepEqual(prev, e[j]):
  476. found = true
  477. }
  478. }
  479. }
  480. if found {
  481. return e
  482. }
  483. return nil
  484. }
  485. for i := 0; i < max; i++ {
  486. if e := ediff(i); e != nil {
  487. d = append(d, diff{i, e})
  488. }
  489. }
  490. return d
  491. }
  492. type stepperFunc func(Message)
  493. func (f stepperFunc) Step(m Message) { f(m) }
  494. var nopStepper = stepperFunc(func(Message) {})
  495. type nsm struct {
  496. stateMachine
  497. next Interface
  498. }
  499. func (n *nsm) Step(m Message) {
  500. (&n.stateMachine).Step(m)
  501. ms := n.Msgs()
  502. for _, m := range ms {
  503. n.next.Step(m)
  504. }
  505. }
  506. func defaultLog() *log {
  507. return &log{ents: []Entry{{}}}
  508. }