raft_test.go 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431
  1. package raft
  2. import (
  3. "fmt"
  4. "reflect"
  5. "testing"
  6. )
  7. var defaultLog = []Entry{{}}
  8. func TestLeaderElection(t *testing.T) {
  9. tests := []struct {
  10. *network
  11. state stateType
  12. }{
  13. {newNetwork(nil, nil, nil), stateLeader},
  14. {newNetwork(nil, nil, nopStepper), stateLeader},
  15. {newNetwork(nil, nopStepper, nopStepper), stateCandidate},
  16. {newNetwork(nil, nopStepper, nopStepper, nil), stateCandidate},
  17. {newNetwork(nil, nopStepper, nopStepper, nil, nil), stateLeader},
  18. // three nodes are have logs further along than 0
  19. {
  20. newNetwork(
  21. nil,
  22. &stateMachine{log: []Entry{{}, {Term: 1}}},
  23. &stateMachine{log: []Entry{{}, {Term: 2}}},
  24. &stateMachine{log: []Entry{{}, {Term: 1}, {Term: 3}}},
  25. nil,
  26. ),
  27. stateFollower,
  28. },
  29. // logs converge
  30. {
  31. newNetwork(
  32. &stateMachine{log: []Entry{{}, {Term: 1}}},
  33. nil,
  34. &stateMachine{log: []Entry{{}, {Term: 2}}},
  35. &stateMachine{log: []Entry{{}, {Term: 1}}},
  36. nil,
  37. ),
  38. stateLeader,
  39. },
  40. }
  41. for i, tt := range tests {
  42. tt.step(Message{To: 0, Type: msgHup})
  43. sm := tt.network.ss[0].(*stateMachine)
  44. if sm.state != tt.state {
  45. t.Errorf("#%d: state = %s, want %s", i, sm.state, tt.state)
  46. }
  47. if g := sm.term; g != 1 {
  48. t.Errorf("#%d: term = %d, want %d", i, g, 1)
  49. }
  50. }
  51. }
  52. func TestDualingCandidates(t *testing.T) {
  53. a := &stateMachine{log: defaultLog}
  54. c := &stateMachine{log: defaultLog}
  55. tt := newNetwork(a, nil, c)
  56. heal := false
  57. next := stepperFunc(func(m Message) {
  58. if heal {
  59. tt.step(m)
  60. }
  61. })
  62. a.next = next
  63. c.next = next
  64. tt.tee = stepperFunc(func(m Message) {
  65. t.Logf("m = %+v", m)
  66. })
  67. tt.step(Message{To: 0, Type: msgHup})
  68. tt.step(Message{To: 2, Type: msgHup})
  69. t.Log("healing")
  70. heal = true
  71. tt.step(Message{To: 2, Type: msgHup})
  72. tests := []struct {
  73. sm *stateMachine
  74. state stateType
  75. term int
  76. }{
  77. {a, stateFollower, 2},
  78. {c, stateLeader, 2},
  79. }
  80. for i, tt := range tests {
  81. if g := tt.sm.state; g != tt.state {
  82. t.Errorf("#%d: state = %s, want %s", i, g, tt.state)
  83. }
  84. if g := tt.sm.term; g != tt.term {
  85. t.Errorf("#%d: term = %d, want %d", i, g, tt.term)
  86. }
  87. }
  88. if g := diffLogs(defaultLog, tt.logs()); g != nil {
  89. for _, diff := range g {
  90. t.Errorf("bag log:\n%s", diff)
  91. }
  92. }
  93. }
  94. func TestCandidateConcede(t *testing.T) {
  95. a := &stateMachine{log: defaultLog}
  96. tt := newNetwork(a, nil, nil)
  97. tt.tee = stepperFunc(func(m Message) {
  98. t.Logf("m = %+v", m)
  99. })
  100. a.next = nopStepper
  101. tt.step(Message{To: 0, Type: msgHup})
  102. tt.step(Message{To: 2, Type: msgHup})
  103. // heal the partition
  104. a.next = tt
  105. data := []byte("force follower")
  106. // send a proposal to 2 to flush out a msgApp to 0
  107. tt.step(Message{To: 2, Type: msgProp, Data: data})
  108. if g := a.state; g != stateFollower {
  109. t.Errorf("state = %s, want %s", g, stateFollower)
  110. }
  111. if g := a.term; g != 1 {
  112. t.Errorf("term = %d, want %d", g, 1)
  113. }
  114. wantLog := []Entry{{}, {Term: 1, Data: data}}
  115. if g := diffLogs(wantLog, tt.logs()); g != nil {
  116. for _, diff := range g {
  117. t.Errorf("bag log:\n%s", diff)
  118. }
  119. }
  120. }
  121. func TestOldMessages(t *testing.T) {
  122. tt := newNetwork(nil, nil, nil)
  123. // make 0 leader @ term 3
  124. tt.step(Message{To: 0, Type: msgHup})
  125. tt.step(Message{To: 0, Type: msgHup})
  126. tt.step(Message{To: 0, Type: msgHup})
  127. // pretend we're an old leader trying to make progress
  128. tt.step(Message{To: 0, Type: msgApp, Term: 1, Entries: []Entry{{Term: 1}}})
  129. if g := diffLogs(defaultLog, tt.logs()); g != nil {
  130. for _, diff := range g {
  131. t.Errorf("bag log:\n%s", diff)
  132. }
  133. }
  134. }
  135. // TestOldMessagesReply - optimization - reply with new term.
  136. func TestProposal(t *testing.T) {
  137. tests := []struct {
  138. *network
  139. success bool
  140. }{
  141. {newNetwork(nil, nil, nil), true},
  142. {newNetwork(nil, nil, nopStepper), true},
  143. {newNetwork(nil, nopStepper, nopStepper), false},
  144. {newNetwork(nil, nopStepper, nopStepper, nil), false},
  145. {newNetwork(nil, nopStepper, nopStepper, nil, nil), true},
  146. }
  147. for i, tt := range tests {
  148. tt.tee = stepperFunc(func(m Message) {
  149. t.Logf("#%d: m = %+v", i, m)
  150. })
  151. step := stepperFunc(func(m Message) {
  152. defer func() {
  153. // only recover is we expect it to panic so
  154. // panics we don't expect go up.
  155. if !tt.success {
  156. e := recover()
  157. if e != nil {
  158. t.Logf("#%d: err: %s", i, e)
  159. }
  160. }
  161. }()
  162. tt.step(m)
  163. })
  164. data := []byte("somedata")
  165. // promote 0 the leader
  166. step(Message{To: 0, Type: msgHup})
  167. step(Message{To: 0, Type: msgProp, Data: data})
  168. var wantLog []Entry
  169. if tt.success {
  170. wantLog = []Entry{{}, {Term: 1, Data: data}}
  171. } else {
  172. wantLog = defaultLog
  173. }
  174. if g := diffLogs(wantLog, tt.logs()); g != nil {
  175. for _, diff := range g {
  176. t.Errorf("#%d: diff:%s", i, diff)
  177. }
  178. }
  179. sm := tt.network.ss[0].(*stateMachine)
  180. if g := sm.term; g != 1 {
  181. t.Errorf("#%d: term = %d, want %d", i, g, 1)
  182. }
  183. }
  184. }
  185. func TestProposalByProxy(t *testing.T) {
  186. data := []byte("somedata")
  187. tests := []*network{
  188. newNetwork(nil, nil, nil),
  189. newNetwork(nil, nil, nopStepper),
  190. }
  191. for i, tt := range tests {
  192. tt.tee = stepperFunc(func(m Message) {
  193. t.Logf("#%d: m = %+v", i, m)
  194. })
  195. // promote 0 the leader
  196. tt.step(Message{To: 0, Type: msgHup})
  197. // propose via follower
  198. tt.step(Message{To: 1, Type: msgProp, Data: []byte("somedata")})
  199. wantLog := []Entry{{}, {Term: 1, Data: data}}
  200. if g := diffLogs(wantLog, tt.logs()); g != nil {
  201. for _, diff := range g {
  202. t.Errorf("#%d: bad entry: %s", i, diff)
  203. }
  204. }
  205. sm := tt.ss[0].(*stateMachine)
  206. if g := sm.term; g != 1 {
  207. t.Errorf("#%d: term = %d, want %d", i, g, 1)
  208. }
  209. }
  210. }
  211. func TestVote(t *testing.T) {
  212. tests := []struct {
  213. i, term int
  214. w int
  215. }{
  216. {0, 0, -1},
  217. {0, 1, -1},
  218. {0, 2, -1},
  219. {0, 3, 2},
  220. {1, 0, -1},
  221. {1, 1, -1},
  222. {1, 2, -1},
  223. {1, 3, 2},
  224. {2, 0, -1},
  225. {2, 1, -1},
  226. {2, 2, 2},
  227. {2, 3, 2},
  228. {3, 0, -1},
  229. {3, 1, -1},
  230. {3, 2, 2},
  231. {3, 3, 2},
  232. }
  233. for i, tt := range tests {
  234. called := false
  235. sm := &stateMachine{log: []Entry{{}, {Term: 2}, {Term: 2}}}
  236. sm.next = stepperFunc(func(m Message) {
  237. called = true
  238. if m.Index != tt.w {
  239. t.Errorf("#%d, m.Index = %d, want %d", i, m.Index, tt.w)
  240. }
  241. })
  242. sm.step(Message{Type: msgVote, Index: tt.i, LogTerm: tt.term})
  243. if !called {
  244. t.Fatal("#%d: not called", i)
  245. }
  246. }
  247. }
  248. func TestLogDiff(t *testing.T) {
  249. a := []Entry{{}, {Term: 1}, {Term: 2}}
  250. b := []Entry{{}, {Term: 1}, {Term: 2}}
  251. c := []Entry{{}, {Term: 2}}
  252. d := []Entry(nil)
  253. w := []diff{
  254. diff{1, []*Entry{{Term: 1}, {Term: 1}, {Term: 2}, nilLogEntry}},
  255. diff{2, []*Entry{{Term: 2}, {Term: 2}, noEntry, nilLogEntry}},
  256. }
  257. if g := diffLogs(a, [][]Entry{b, c, d}); !reflect.DeepEqual(w, g) {
  258. t.Errorf("g = %s", g)
  259. t.Errorf("want %s", w)
  260. }
  261. }
  262. type network struct {
  263. tee stepper
  264. ss []stepper
  265. }
  266. // newNetwork initializes a network from nodes. A nil node will be replaced
  267. // with a new *stateMachine. A *stateMachine will get its k, addr, and next
  268. // fields set.
  269. func newNetwork(nodes ...stepper) *network {
  270. nt := &network{ss: nodes}
  271. for i, n := range nodes {
  272. switch v := n.(type) {
  273. case nil:
  274. nt.ss[i] = newStateMachine(len(nodes), i, nt)
  275. case *stateMachine:
  276. v.k = len(nodes)
  277. v.addr = i
  278. if v.next == nil {
  279. v.next = nt
  280. }
  281. default:
  282. nt.ss[i] = v
  283. }
  284. }
  285. return nt
  286. }
  287. func (nt network) step(m Message) {
  288. if nt.tee != nil {
  289. nt.tee.step(m)
  290. }
  291. nt.ss[m.To].step(m)
  292. }
  293. // logs returns all logs in nt prepended with want. If a node is not a
  294. // *stateMachine, its log will be nil.
  295. func (nt network) logs() [][]Entry {
  296. ls := make([][]Entry, len(nt.ss))
  297. for i, node := range nt.ss {
  298. if sm, ok := node.(*stateMachine); ok {
  299. ls[i] = sm.log
  300. }
  301. }
  302. return ls
  303. }
  304. type diff struct {
  305. i int
  306. ents []*Entry // pointers so they can be nil for N/A
  307. }
  308. var noEntry = &Entry{}
  309. var nilLogEntry = &Entry{}
  310. func (d diff) String() string {
  311. s := fmt.Sprintf("[%d] ", d.i)
  312. for i, e := range d.ents {
  313. switch e {
  314. case nilLogEntry:
  315. s += fmt.Sprintf("o")
  316. case noEntry:
  317. s += fmt.Sprintf("-")
  318. case nil:
  319. s += fmt.Sprintf("<nil>")
  320. default:
  321. s += fmt.Sprintf("<%d:%q>", e.Term, string(e.Data))
  322. }
  323. if i != len(d.ents)-1 {
  324. s += "\t\t"
  325. }
  326. }
  327. return s
  328. }
  329. func diffLogs(base []Entry, logs [][]Entry) []diff {
  330. var (
  331. d []diff
  332. max int
  333. )
  334. logs = append([][]Entry{base}, logs...)
  335. for _, log := range logs {
  336. if l := len(log); l > max {
  337. max = l
  338. }
  339. }
  340. ediff := func(i int) (result []*Entry) {
  341. e := make([]*Entry, len(logs))
  342. found := false
  343. for j, log := range logs {
  344. if log == nil {
  345. e[j] = nilLogEntry
  346. continue
  347. }
  348. if len(log) <= i {
  349. e[j] = noEntry
  350. found = true
  351. continue
  352. }
  353. e[j] = &log[i]
  354. if j > 0 {
  355. switch prev := e[j-1]; {
  356. case prev == nilLogEntry:
  357. case prev == noEntry:
  358. case !reflect.DeepEqual(prev, e[j]):
  359. found = true
  360. }
  361. }
  362. }
  363. if found {
  364. return e
  365. }
  366. return nil
  367. }
  368. for i := 0; i < max; i++ {
  369. if e := ediff(i); e != nil {
  370. d = append(d, diff{i, e})
  371. }
  372. }
  373. return d
  374. }
  375. type stepperFunc func(Message)
  376. func (f stepperFunc) step(m Message) { f(m) }
  377. var nopStepper = stepperFunc(func(Message) {})
  378. type nextStepperFunc func(Message, stepper)