raft_test.go 37 KB


  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package raft
  14. import (
  15. "bytes"
  16. "fmt"
  17. "math"
  18. "math/rand"
  19. "reflect"
  20. "sort"
  21. "testing"
  22. pb "github.com/coreos/etcd/raft/raftpb"
  23. )
  24. // nextEnts returns the appliable entries and updates the applied index
  25. func nextEnts(r *raft, s *MemoryStorage) (ents []pb.Entry) {
  26. // Transfer all unstable entries to "stable" storage.
  27. s.Append(r.raftLog.unstableEntries())
  28. r.raftLog.stableTo(r.raftLog.lastIndex())
  29. ents = r.raftLog.nextEnts()
  30. r.raftLog.appliedTo(r.raftLog.committed)
  31. return ents
  32. }
  33. type Interface interface {
  34. Step(m pb.Message) error
  35. readMessages() []pb.Message
  36. }
  37. func (r *raft) readMessages() []pb.Message {
  38. msgs := r.msgs
  39. r.msgs = make([]pb.Message, 0)
  40. return msgs
  41. }
  42. func TestProgressMaybeDecr(t *testing.T) {
  43. tests := []struct {
  44. m uint64
  45. n uint64
  46. to uint64
  47. w bool
  48. wn uint64
  49. }{
  50. {
  51. // match != 0 is always false
  52. 1, 0, 0, false, 0,
  53. },
  54. {
  55. // match != 0 is always false
  56. 5, 10, 9, false, 10,
  57. },
  58. {
  59. // next-1 != to is always false
  60. 0, 0, 0, false, 0,
  61. },
  62. {
  63. // next-1 != to is always false
  64. 0, 10, 5, false, 10,
  65. },
  66. {
  67. // next>1 = decremented by 1
  68. 0, 10, 9, true, 9,
  69. },
  70. {
  71. // next>1 = decremented by 1
  72. 0, 2, 1, true, 1,
  73. },
  74. {
  75. // next<=1 = reset to 1
  76. 0, 1, 0, true, 1,
  77. },
  78. }
  79. for i, tt := range tests {
  80. p := &progress{
  81. match: tt.m,
  82. next: tt.n,
  83. }
  84. if g := p.maybeDecrTo(tt.to); g != tt.w {
  85. t.Errorf("#%d: maybeDecrTo=%t, want %t", i, g, tt.w)
  86. }
  87. if gm := p.match; gm != tt.m {
  88. t.Errorf("#%d: match=%d, want %d", i, gm, tt.m)
  89. }
  90. if gn := p.next; gn != tt.wn {
  91. t.Errorf("#%d: next=%d, want %d", i, gn, tt.wn)
  92. }
  93. }
  94. }
  95. func TestLeaderElection(t *testing.T) {
  96. tests := []struct {
  97. *network
  98. state StateType
  99. }{
  100. {newNetwork(nil, nil, nil), StateLeader},
  101. {newNetwork(nil, nil, nopStepper), StateLeader},
  102. {newNetwork(nil, nopStepper, nopStepper), StateCandidate},
  103. {newNetwork(nil, nopStepper, nopStepper, nil), StateCandidate},
  104. {newNetwork(nil, nopStepper, nopStepper, nil, nil), StateLeader},
  105. // three logs further along than 0
  106. {newNetwork(nil, ents(1), ents(2), ents(1, 3), nil), StateFollower},
  107. // logs converge
  108. {newNetwork(ents(1), nil, ents(2), ents(1), nil), StateLeader},
  109. }
  110. for i, tt := range tests {
  111. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  112. sm := tt.network.peers[1].(*raft)
  113. if sm.state != tt.state {
  114. t.Errorf("#%d: state = %s, want %s", i, sm.state, tt.state)
  115. }
  116. if g := sm.Term; g != 1 {
  117. t.Errorf("#%d: term = %d, want %d", i, g, 1)
  118. }
  119. }
  120. }
  121. func TestLogReplication(t *testing.T) {
  122. tests := []struct {
  123. *network
  124. msgs []pb.Message
  125. wcommitted uint64
  126. }{
  127. {
  128. newNetwork(nil, nil, nil),
  129. []pb.Message{
  130. {From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}},
  131. },
  132. 2,
  133. },
  134. {
  135. newNetwork(nil, nil, nil),
  136. []pb.Message{
  137. {From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}},
  138. {From: 1, To: 2, Type: pb.MsgHup},
  139. {From: 1, To: 2, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}},
  140. },
  141. 4,
  142. },
  143. }
  144. for i, tt := range tests {
  145. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  146. for _, m := range tt.msgs {
  147. tt.send(m)
  148. }
  149. for j, x := range tt.network.peers {
  150. sm := x.(*raft)
  151. if sm.raftLog.committed != tt.wcommitted {
  152. t.Errorf("#%d.%d: committed = %d, want %d", i, j, sm.raftLog.committed, tt.wcommitted)
  153. }
  154. ents := []pb.Entry{}
  155. for _, e := range nextEnts(sm, tt.network.storage[j]) {
  156. if e.Data != nil {
  157. ents = append(ents, e)
  158. }
  159. }
  160. props := []pb.Message{}
  161. for _, m := range tt.msgs {
  162. if m.Type == pb.MsgProp {
  163. props = append(props, m)
  164. }
  165. }
  166. for k, m := range props {
  167. if !bytes.Equal(ents[k].Data, m.Entries[0].Data) {
  168. t.Errorf("#%d.%d: data = %d, want %d", i, j, ents[k].Data, m.Entries[0].Data)
  169. }
  170. }
  171. }
  172. }
  173. }
  174. func TestSingleNodeCommit(t *testing.T) {
  175. tt := newNetwork(nil)
  176. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  177. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  178. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  179. sm := tt.peers[1].(*raft)
  180. if sm.raftLog.committed != 3 {
  181. t.Errorf("committed = %d, want %d", sm.raftLog.committed, 3)
  182. }
  183. }
  184. // TestCannotCommitWithoutNewTermEntry tests the entries cannot be committed
  185. // when leader changes, no new proposal comes in and ChangeTerm proposal is
  186. // filtered.
  187. func TestCannotCommitWithoutNewTermEntry(t *testing.T) {
  188. tt := newNetwork(nil, nil, nil, nil, nil)
  189. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  190. // 0 cannot reach 2,3,4
  191. tt.cut(1, 3)
  192. tt.cut(1, 4)
  193. tt.cut(1, 5)
  194. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  195. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  196. sm := tt.peers[1].(*raft)
  197. if sm.raftLog.committed != 1 {
  198. t.Errorf("committed = %d, want %d", sm.raftLog.committed, 1)
  199. }
  200. // network recovery
  201. tt.recover()
  202. // avoid committing ChangeTerm proposal
  203. tt.ignore(pb.MsgApp)
  204. // elect 1 as the new leader with term 2
  205. tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup})
  206. // no log entries from previous term should be committed
  207. sm = tt.peers[2].(*raft)
  208. if sm.raftLog.committed != 1 {
  209. t.Errorf("committed = %d, want %d", sm.raftLog.committed, 1)
  210. }
  211. tt.recover()
  212. // still be able to append a entry
  213. tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  214. if sm.raftLog.committed != 5 {
  215. t.Errorf("committed = %d, want %d", sm.raftLog.committed, 5)
  216. }
  217. }
  218. // TestCommitWithoutNewTermEntry tests the entries could be committed
  219. // when leader changes, no new proposal comes in.
  220. func TestCommitWithoutNewTermEntry(t *testing.T) {
  221. tt := newNetwork(nil, nil, nil, nil, nil)
  222. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  223. // 0 cannot reach 2,3,4
  224. tt.cut(1, 3)
  225. tt.cut(1, 4)
  226. tt.cut(1, 5)
  227. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  228. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  229. sm := tt.peers[1].(*raft)
  230. if sm.raftLog.committed != 1 {
  231. t.Errorf("committed = %d, want %d", sm.raftLog.committed, 1)
  232. }
  233. // network recovery
  234. tt.recover()
  235. // elect 1 as the new leader with term 2
  236. // after append a ChangeTerm entry from the current term, all entries
  237. // should be committed
  238. tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup})
  239. if sm.raftLog.committed != 4 {
  240. t.Errorf("committed = %d, want %d", sm.raftLog.committed, 4)
  241. }
  242. }
  243. func TestDuelingCandidates(t *testing.T) {
  244. a := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
  245. b := newRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
  246. c := newRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
  247. nt := newNetwork(a, b, c)
  248. nt.cut(1, 3)
  249. nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  250. nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
  251. nt.recover()
  252. nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
  253. wlog := &raftLog{
  254. storage: &MemoryStorage{ents: []pb.Entry{{}, pb.Entry{Data: nil, Term: 1, Index: 1}}},
  255. committed: 1,
  256. unstable: 2,
  257. }
  258. tests := []struct {
  259. sm *raft
  260. state StateType
  261. term uint64
  262. raftLog *raftLog
  263. }{
  264. {a, StateFollower, 2, wlog},
  265. {b, StateFollower, 2, wlog},
  266. {c, StateFollower, 2, newLog(NewMemoryStorage())},
  267. }
  268. for i, tt := range tests {
  269. if g := tt.sm.state; g != tt.state {
  270. t.Errorf("#%d: state = %s, want %s", i, g, tt.state)
  271. }
  272. if g := tt.sm.Term; g != tt.term {
  273. t.Errorf("#%d: term = %d, want %d", i, g, tt.term)
  274. }
  275. base := ltoa(tt.raftLog)
  276. if sm, ok := nt.peers[1+uint64(i)].(*raft); ok {
  277. l := ltoa(sm.raftLog)
  278. if g := diffu(base, l); g != "" {
  279. t.Errorf("#%d: diff:\n%s", i, g)
  280. }
  281. } else {
  282. t.Logf("#%d: empty log", i)
  283. }
  284. }
  285. }
  286. func TestCandidateConcede(t *testing.T) {
  287. tt := newNetwork(nil, nil, nil)
  288. tt.isolate(1)
  289. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  290. tt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
  291. // heal the partition
  292. tt.recover()
  293. data := []byte("force follower")
  294. // send a proposal to 2 to flush out a MsgApp to 0
  295. tt.send(pb.Message{From: 3, To: 3, Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
  296. a := tt.peers[1].(*raft)
  297. if g := a.state; g != StateFollower {
  298. t.Errorf("state = %s, want %s", g, StateFollower)
  299. }
  300. if g := a.Term; g != 1 {
  301. t.Errorf("term = %d, want %d", g, 1)
  302. }
  303. wantLog := ltoa(&raftLog{
  304. storage: &MemoryStorage{
  305. ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}},
  306. },
  307. unstable: 3,
  308. committed: 2,
  309. })
  310. for i, p := range tt.peers {
  311. if sm, ok := p.(*raft); ok {
  312. l := ltoa(sm.raftLog)
  313. if g := diffu(wantLog, l); g != "" {
  314. t.Errorf("#%d: diff:\n%s", i, g)
  315. }
  316. } else {
  317. t.Logf("#%d: empty log", i)
  318. }
  319. }
  320. }
  321. func TestSingleNodeCandidate(t *testing.T) {
  322. tt := newNetwork(nil)
  323. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  324. sm := tt.peers[1].(*raft)
  325. if sm.state != StateLeader {
  326. t.Errorf("state = %d, want %d", sm.state, StateLeader)
  327. }
  328. }
  329. func TestOldMessages(t *testing.T) {
  330. tt := newNetwork(nil, nil, nil)
  331. // make 0 leader @ term 3
  332. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  333. tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup})
  334. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  335. // pretend we're an old leader trying to make progress
  336. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgApp, Term: 1, Entries: []pb.Entry{{Term: 1}}})
  337. l := &raftLog{
  338. storage: &MemoryStorage{
  339. ents: []pb.Entry{
  340. {}, {Data: nil, Term: 1, Index: 1},
  341. {Data: nil, Term: 2, Index: 2}, {Data: nil, Term: 3, Index: 3},
  342. },
  343. },
  344. unstable: 4,
  345. committed: 3,
  346. }
  347. base := ltoa(l)
  348. for i, p := range tt.peers {
  349. if sm, ok := p.(*raft); ok {
  350. l := ltoa(sm.raftLog)
  351. if g := diffu(base, l); g != "" {
  352. t.Errorf("#%d: diff:\n%s", i, g)
  353. }
  354. } else {
  355. t.Logf("#%d: empty log", i)
  356. }
  357. }
  358. }
  359. // TestOldMessagesReply - optimization - reply with new term.
  360. func TestProposal(t *testing.T) {
  361. tests := []struct {
  362. *network
  363. success bool
  364. }{
  365. {newNetwork(nil, nil, nil), true},
  366. {newNetwork(nil, nil, nopStepper), true},
  367. {newNetwork(nil, nopStepper, nopStepper), false},
  368. {newNetwork(nil, nopStepper, nopStepper, nil), false},
  369. {newNetwork(nil, nopStepper, nopStepper, nil, nil), true},
  370. }
  371. for i, tt := range tests {
  372. send := func(m pb.Message) {
  373. defer func() {
  374. // only recover is we expect it to panic so
  375. // panics we don't expect go up.
  376. if !tt.success {
  377. e := recover()
  378. if e != nil {
  379. t.Logf("#%d: err: %s", i, e)
  380. }
  381. }
  382. }()
  383. tt.send(m)
  384. }
  385. data := []byte("somedata")
  386. // promote 0 the leader
  387. send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  388. send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
  389. wantLog := newLog(NewMemoryStorage())
  390. if tt.success {
  391. wantLog = &raftLog{
  392. storage: &MemoryStorage{
  393. ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}},
  394. },
  395. unstable: 3,
  396. committed: 2}
  397. }
  398. base := ltoa(wantLog)
  399. for i, p := range tt.peers {
  400. if sm, ok := p.(*raft); ok {
  401. l := ltoa(sm.raftLog)
  402. if g := diffu(base, l); g != "" {
  403. t.Errorf("#%d: diff:\n%s", i, g)
  404. }
  405. } else {
  406. t.Logf("#%d: empty log", i)
  407. }
  408. }
  409. sm := tt.network.peers[1].(*raft)
  410. if g := sm.Term; g != 1 {
  411. t.Errorf("#%d: term = %d, want %d", i, g, 1)
  412. }
  413. }
  414. }
  415. func TestProposalByProxy(t *testing.T) {
  416. data := []byte("somedata")
  417. tests := []*network{
  418. newNetwork(nil, nil, nil),
  419. newNetwork(nil, nil, nopStepper),
  420. }
  421. for i, tt := range tests {
  422. // promote 0 the leader
  423. tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  424. // propose via follower
  425. tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
  426. wantLog := &raftLog{
  427. storage: &MemoryStorage{
  428. ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Data: data, Index: 2}},
  429. },
  430. unstable: 3,
  431. committed: 2}
  432. base := ltoa(wantLog)
  433. for i, p := range tt.peers {
  434. if sm, ok := p.(*raft); ok {
  435. l := ltoa(sm.raftLog)
  436. if g := diffu(base, l); g != "" {
  437. t.Errorf("#%d: diff:\n%s", i, g)
  438. }
  439. } else {
  440. t.Logf("#%d: empty log", i)
  441. }
  442. }
  443. sm := tt.peers[1].(*raft)
  444. if g := sm.Term; g != 1 {
  445. t.Errorf("#%d: term = %d, want %d", i, g, 1)
  446. }
  447. }
  448. }
  449. func TestCompact(t *testing.T) {
  450. tests := []struct {
  451. compacti uint64
  452. nodes []uint64
  453. snapd []byte
  454. wpanic bool
  455. }{
  456. {1, []uint64{1, 2, 3}, []byte("some data"), false},
  457. {2, []uint64{1, 2, 3}, []byte("some data"), false},
  458. {4, []uint64{1, 2, 3}, []byte("some data"), true}, // compact out of range
  459. }
  460. for i, tt := range tests {
  461. func() {
  462. defer func() {
  463. if r := recover(); r != nil {
  464. if tt.wpanic != true {
  465. t.Errorf("%d: panic = %v, want %v", i, true, tt.wpanic)
  466. }
  467. }
  468. }()
  469. sm := &raft{
  470. state: StateLeader,
  471. raftLog: &raftLog{
  472. committed: 2,
  473. applied: 2,
  474. storage: &MemoryStorage{
  475. ents: []pb.Entry{{}, {Term: 1}, {Term: 1}, {Term: 1}},
  476. },
  477. },
  478. }
  479. sm.compact(tt.compacti, tt.nodes, tt.snapd)
  480. sort.Sort(uint64Slice(sm.raftLog.snapshot.Nodes))
  481. if sm.raftLog.firstIndex() != tt.compacti {
  482. t.Errorf("%d: log.firstIndex = %d, want %d", i, sm.raftLog.firstIndex(), tt.compacti)
  483. }
  484. if !reflect.DeepEqual(sm.raftLog.snapshot.Nodes, tt.nodes) {
  485. t.Errorf("%d: snap.nodes = %v, want %v", i, sm.raftLog.snapshot.Nodes, tt.nodes)
  486. }
  487. if !reflect.DeepEqual(sm.raftLog.snapshot.Data, tt.snapd) {
  488. t.Errorf("%d: snap.data = %v, want %v", i, sm.raftLog.snapshot.Data, tt.snapd)
  489. }
  490. }()
  491. }
  492. }
  493. func TestCommit(t *testing.T) {
  494. tests := []struct {
  495. matches []uint64
  496. logs []pb.Entry
  497. smTerm uint64
  498. w uint64
  499. }{
  500. // single
  501. {[]uint64{1}, []pb.Entry{{}, {Term: 1}}, 1, 1},
  502. {[]uint64{1}, []pb.Entry{{}, {Term: 1}}, 2, 0},
  503. {[]uint64{2}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 2, 2},
  504. {[]uint64{1}, []pb.Entry{{}, {Term: 2}}, 2, 1},
  505. // odd
  506. {[]uint64{2, 1, 1}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
  507. {[]uint64{2, 1, 1}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
  508. {[]uint64{2, 1, 2}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 2, 2},
  509. {[]uint64{2, 1, 2}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
  510. // even
  511. {[]uint64{2, 1, 1, 1}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
  512. {[]uint64{2, 1, 1, 1}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
  513. {[]uint64{2, 1, 1, 2}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
  514. {[]uint64{2, 1, 1, 2}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
  515. {[]uint64{2, 1, 2, 2}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 2, 2},
  516. {[]uint64{2, 1, 2, 2}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
  517. }
  518. for i, tt := range tests {
  519. prs := make(map[uint64]*progress)
  520. for j := 0; j < len(tt.matches); j++ {
  521. prs[uint64(j)] = &progress{tt.matches[j], tt.matches[j] + 1}
  522. }
  523. sm := &raft{
  524. raftLog: &raftLog{storage: &MemoryStorage{ents: tt.logs}, unstable: uint64(len(tt.logs))},
  525. prs: prs,
  526. HardState: pb.HardState{Term: tt.smTerm},
  527. }
  528. sm.maybeCommit()
  529. if g := sm.raftLog.committed; g != tt.w {
  530. t.Errorf("#%d: committed = %d, want %d", i, g, tt.w)
  531. }
  532. }
  533. }
  534. func TestIsElectionTimeout(t *testing.T) {
  535. tests := []struct {
  536. elapse int
  537. wprobability float64
  538. round bool
  539. }{
  540. {5, 0, false},
  541. {13, 0.3, true},
  542. {15, 0.5, true},
  543. {18, 0.8, true},
  544. {20, 1, false},
  545. }
  546. for i, tt := range tests {
  547. sm := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
  548. sm.elapsed = tt.elapse
  549. c := 0
  550. for j := 0; j < 10000; j++ {
  551. if sm.isElectionTimeout() {
  552. c++
  553. }
  554. }
  555. got := float64(c) / 10000.0
  556. if tt.round {
  557. got = math.Floor(got*10+0.5) / 10.0
  558. }
  559. if got != tt.wprobability {
  560. t.Errorf("#%d: possibility = %v, want %v", i, got, tt.wprobability)
  561. }
  562. }
  563. }
  564. // ensure that the Step function ignores the message from old term and does not pass it to the
  565. // acutal stepX function.
  566. func TestStepIgnoreOldTermMsg(t *testing.T) {
  567. called := false
  568. fakeStep := func(r *raft, m pb.Message) {
  569. called = true
  570. }
  571. sm := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
  572. sm.step = fakeStep
  573. sm.Term = 2
  574. sm.Step(pb.Message{Type: pb.MsgApp, Term: sm.Term - 1})
  575. if called == true {
  576. t.Errorf("stepFunc called = %v , want %v", called, false)
  577. }
  578. }
  579. // TestHandleMsgApp ensures:
  580. // 1. Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm.
  581. // 2. If an existing entry conflicts with a new one (same index but different terms),
  582. // delete the existing entry and all that follow it; append any new entries not already in the log.
  583. // 3. If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry).
  584. func TestHandleMsgApp(t *testing.T) {
  585. tests := []struct {
  586. m pb.Message
  587. wIndex uint64
  588. wCommit uint64
  589. wReject bool
  590. }{
  591. // Ensure 1
  592. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 3, Index: 2, Commit: 3}, 2, 0, true}, // previous log mismatch
  593. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 3, Index: 3, Commit: 3}, 2, 0, true}, // previous log non-exist
  594. // Ensure 2
  595. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 1, Index: 1, Commit: 1}, 2, 1, false},
  596. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 0, Index: 0, Commit: 1, Entries: []pb.Entry{{Term: 2}}}, 1, 1, false},
  597. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 3, Entries: []pb.Entry{{Term: 2}, {Term: 2}}}, 4, 3, false},
  598. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 4, Entries: []pb.Entry{{Term: 2}}}, 3, 3, false},
  599. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 1, Index: 1, Commit: 4, Entries: []pb.Entry{{Term: 2}}}, 2, 2, false},
  600. // Ensure 3
  601. {pb.Message{Type: pb.MsgApp, Term: 1, LogTerm: 1, Index: 1, Commit: 3}, 2, 1, false}, // match entry 1, commit upto last new entry 1
  602. {pb.Message{Type: pb.MsgApp, Term: 1, LogTerm: 1, Index: 1, Commit: 3, Entries: []pb.Entry{{Term: 2}}}, 2, 2, false}, // match entry 1, commit upto last new entry 2
  603. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 3}, 2, 2, false}, // match entry 2, commit upto last new entry 2
  604. {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 4}, 2, 2, false}, // commit upto log.last()
  605. }
  606. for i, tt := range tests {
  607. sm := &raft{
  608. state: StateFollower,
  609. HardState: pb.HardState{Term: 2},
  610. raftLog: &raftLog{
  611. committed: 0,
  612. storage: &MemoryStorage{ents: []pb.Entry{{}, {Term: 1}, {Term: 2}}},
  613. unstable: 3,
  614. },
  615. }
  616. sm.handleAppendEntries(tt.m)
  617. if sm.raftLog.lastIndex() != tt.wIndex {
  618. t.Errorf("#%d: lastIndex = %d, want %d", i, sm.raftLog.lastIndex(), tt.wIndex)
  619. }
  620. if sm.raftLog.committed != tt.wCommit {
  621. t.Errorf("#%d: committed = %d, want %d", i, sm.raftLog.committed, tt.wCommit)
  622. }
  623. m := sm.readMessages()
  624. if len(m) != 1 {
  625. t.Fatalf("#%d: msg = nil, want 1", i)
  626. }
  627. if m[0].Reject != tt.wReject {
  628. t.Errorf("#%d: reject = %v, want %v", i, m[0].Reject, tt.wReject)
  629. }
  630. }
  631. }
  632. func TestRecvMsgVote(t *testing.T) {
  633. tests := []struct {
  634. state StateType
  635. i, term uint64
  636. voteFor uint64
  637. wreject bool
  638. }{
  639. {StateFollower, 0, 0, None, true},
  640. {StateFollower, 0, 1, None, true},
  641. {StateFollower, 0, 2, None, true},
  642. {StateFollower, 0, 3, None, false},
  643. {StateFollower, 1, 0, None, true},
  644. {StateFollower, 1, 1, None, true},
  645. {StateFollower, 1, 2, None, true},
  646. {StateFollower, 1, 3, None, false},
  647. {StateFollower, 2, 0, None, true},
  648. {StateFollower, 2, 1, None, true},
  649. {StateFollower, 2, 2, None, false},
  650. {StateFollower, 2, 3, None, false},
  651. {StateFollower, 3, 0, None, true},
  652. {StateFollower, 3, 1, None, true},
  653. {StateFollower, 3, 2, None, false},
  654. {StateFollower, 3, 3, None, false},
  655. {StateFollower, 3, 2, 2, false},
  656. {StateFollower, 3, 2, 1, true},
  657. {StateLeader, 3, 3, 1, true},
  658. {StateCandidate, 3, 3, 1, true},
  659. }
  660. for i, tt := range tests {
  661. sm := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
  662. sm.state = tt.state
  663. switch tt.state {
  664. case StateFollower:
  665. sm.step = stepFollower
  666. case StateCandidate:
  667. sm.step = stepCandidate
  668. case StateLeader:
  669. sm.step = stepLeader
  670. }
  671. sm.HardState = pb.HardState{Vote: tt.voteFor}
  672. sm.raftLog = &raftLog{
  673. storage: &MemoryStorage{ents: []pb.Entry{{}, {Term: 2}, {Term: 2}}},
  674. unstable: 3,
  675. }
  676. sm.Step(pb.Message{Type: pb.MsgVote, From: 2, Index: tt.i, LogTerm: tt.term})
  677. msgs := sm.readMessages()
  678. if g := len(msgs); g != 1 {
  679. t.Fatalf("#%d: len(msgs) = %d, want 1", i, g)
  680. continue
  681. }
  682. if g := msgs[0].Reject; g != tt.wreject {
  683. t.Errorf("#%d, m.Reject = %v, want %v", i, g, tt.wreject)
  684. }
  685. }
  686. }
  687. func TestStateTransition(t *testing.T) {
  688. tests := []struct {
  689. from StateType
  690. to StateType
  691. wallow bool
  692. wterm uint64
  693. wlead uint64
  694. }{
  695. {StateFollower, StateFollower, true, 1, None},
  696. {StateFollower, StateCandidate, true, 1, None},
  697. {StateFollower, StateLeader, false, 0, None},
  698. {StateCandidate, StateFollower, true, 0, None},
  699. {StateCandidate, StateCandidate, true, 1, None},
  700. {StateCandidate, StateLeader, true, 0, 1},
  701. {StateLeader, StateFollower, true, 1, None},
  702. {StateLeader, StateCandidate, false, 1, None},
  703. {StateLeader, StateLeader, true, 0, 1},
  704. }
  705. for i, tt := range tests {
  706. func() {
  707. defer func() {
  708. if r := recover(); r != nil {
  709. if tt.wallow == true {
  710. t.Errorf("%d: allow = %v, want %v", i, false, true)
  711. }
  712. }
  713. }()
  714. sm := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
  715. sm.state = tt.from
  716. switch tt.to {
  717. case StateFollower:
  718. sm.becomeFollower(tt.wterm, tt.wlead)
  719. case StateCandidate:
  720. sm.becomeCandidate()
  721. case StateLeader:
  722. sm.becomeLeader()
  723. }
  724. if sm.Term != tt.wterm {
  725. t.Errorf("%d: term = %d, want %d", i, sm.Term, tt.wterm)
  726. }
  727. if sm.lead != tt.wlead {
  728. t.Errorf("%d: lead = %d, want %d", i, sm.lead, tt.wlead)
  729. }
  730. }()
  731. }
  732. }
  733. func TestAllServerStepdown(t *testing.T) {
  734. tests := []struct {
  735. state StateType
  736. wstate StateType
  737. wterm uint64
  738. windex uint64
  739. }{
  740. {StateFollower, StateFollower, 3, 1},
  741. {StateCandidate, StateFollower, 3, 1},
  742. {StateLeader, StateFollower, 3, 2},
  743. }
  744. tmsgTypes := [...]pb.MessageType{pb.MsgVote, pb.MsgApp}
  745. tterm := uint64(3)
  746. for i, tt := range tests {
  747. sm := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
  748. switch tt.state {
  749. case StateFollower:
  750. sm.becomeFollower(1, None)
  751. case StateCandidate:
  752. sm.becomeCandidate()
  753. case StateLeader:
  754. sm.becomeCandidate()
  755. sm.becomeLeader()
  756. }
  757. for j, msgType := range tmsgTypes {
  758. sm.Step(pb.Message{From: 2, Type: msgType, Term: tterm, LogTerm: tterm})
  759. if sm.state != tt.wstate {
  760. t.Errorf("#%d.%d state = %v , want %v", i, j, sm.state, tt.wstate)
  761. }
  762. if sm.Term != tt.wterm {
  763. t.Errorf("#%d.%d term = %v , want %v", i, j, sm.Term, tt.wterm)
  764. }
  765. if uint64(len(sm.raftLog.allEntries())) != tt.windex {
  766. t.Errorf("#%d.%d index = %v , want %v", i, j, len(sm.raftLog.allEntries()), tt.windex)
  767. }
  768. wlead := uint64(2)
  769. if msgType == pb.MsgVote {
  770. wlead = None
  771. }
  772. if sm.lead != wlead {
  773. t.Errorf("#%d, sm.lead = %d, want %d", i, sm.lead, None)
  774. }
  775. }
  776. }
  777. }
  778. func TestLeaderAppResp(t *testing.T) {
  779. // initial progress: match = 0; netx = 3
  780. tests := []struct {
  781. index uint64
  782. reject bool
  783. // progress
  784. wmatch uint64
  785. wnext uint64
  786. // message
  787. wmsgNum int
  788. windex uint64
  789. wcommitted uint64
  790. }{
  791. {3, true, 0, 3, 0, 0, 0}, // stale resp; no replies
  792. {2, true, 0, 2, 1, 1, 0}, // denied resp; leader does not commit; decrese next and send probing msg
  793. {2, false, 2, 3, 2, 2, 2}, // accept resp; leader commits; broadcast with commit index
  794. {0, false, 0, 3, 0, 0, 0}, // ignore heartbeat replies
  795. }
  796. for i, tt := range tests {
  797. // sm term is 1 after it becomes the leader.
  798. // thus the last log term must be 1 to be committed.
  799. sm := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
  800. sm.raftLog = &raftLog{
  801. storage: &MemoryStorage{ents: []pb.Entry{{}, {Term: 0}, {Term: 1}}},
  802. unstable: 3,
  803. }
  804. sm.becomeCandidate()
  805. sm.becomeLeader()
  806. sm.readMessages()
  807. sm.Step(pb.Message{From: 2, Type: pb.MsgAppResp, Index: tt.index, Term: sm.Term, Reject: tt.reject})
  808. p := sm.prs[2]
  809. if p.match != tt.wmatch {
  810. t.Errorf("#%d match = %d, want %d", i, p.match, tt.wmatch)
  811. }
  812. if p.next != tt.wnext {
  813. t.Errorf("#%d next = %d, want %d", i, p.next, tt.wnext)
  814. }
  815. msgs := sm.readMessages()
  816. if len(msgs) != tt.wmsgNum {
  817. t.Errorf("#%d msgNum = %d, want %d", i, len(msgs), tt.wmsgNum)
  818. }
  819. for j, msg := range msgs {
  820. if msg.Index != tt.windex {
  821. t.Errorf("#%d.%d index = %d, want %d", i, j, msg.Index, tt.windex)
  822. }
  823. if msg.Commit != tt.wcommitted {
  824. t.Errorf("#%d.%d commit = %d, want %d", i, j, msg.Commit, tt.wcommitted)
  825. }
  826. }
  827. }
  828. }
  829. // When the leader receives a heartbeat tick, it should
  830. // send a MsgApp with m.Index = 0, m.LogTerm=0 and empty entries.
  831. func TestBcastBeat(t *testing.T) {
  832. offset := uint64(1000)
  833. // make a state machine with log.offset = 1000
  834. s := pb.Snapshot{
  835. Index: offset,
  836. Term: 1,
  837. Nodes: []uint64{1, 2, 3},
  838. }
  839. sm := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
  840. sm.Term = 1
  841. sm.restore(s)
  842. sm.becomeCandidate()
  843. sm.becomeLeader()
  844. for i := 0; i < 10; i++ {
  845. sm.appendEntry(pb.Entry{})
  846. }
  847. sm.Step(pb.Message{Type: pb.MsgBeat})
  848. msgs := sm.readMessages()
  849. if len(msgs) != 2 {
  850. t.Fatalf("len(msgs) = %v, want 2", len(msgs))
  851. }
  852. tomap := map[uint64]bool{2: true, 3: true}
  853. for i, m := range msgs {
  854. if m.Type != pb.MsgApp {
  855. t.Fatalf("#%d: type = %v, want = %v", i, m.Type, pb.MsgApp)
  856. }
  857. if m.Index != 0 {
  858. t.Fatalf("#%d: prevIndex = %d, want %d", i, m.Index, 0)
  859. }
  860. if m.LogTerm != 0 {
  861. t.Fatalf("#%d: prevTerm = %d, want %d", i, m.LogTerm, 0)
  862. }
  863. if !tomap[m.To] {
  864. t.Fatalf("#%d: unexpected to %d", i, m.To)
  865. } else {
  866. delete(tomap, m.To)
  867. }
  868. if len(m.Entries) != 0 {
  869. t.Fatalf("#%d: len(entries) = %d, want 0", i, len(m.Entries))
  870. }
  871. }
  872. }
  873. // tests the output of the statemachine when receiving MsgBeat
  874. func TestRecvMsgBeat(t *testing.T) {
  875. tests := []struct {
  876. state StateType
  877. wMsg int
  878. }{
  879. {StateLeader, 2},
  880. // candidate and follower should ignore MsgBeat
  881. {StateCandidate, 0},
  882. {StateFollower, 0},
  883. }
  884. for i, tt := range tests {
  885. sm := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
  886. sm.raftLog = &raftLog{storage: &MemoryStorage{ents: []pb.Entry{{}, {Term: 0}, {Term: 1}}}}
  887. sm.Term = 1
  888. sm.state = tt.state
  889. switch tt.state {
  890. case StateFollower:
  891. sm.step = stepFollower
  892. case StateCandidate:
  893. sm.step = stepCandidate
  894. case StateLeader:
  895. sm.step = stepLeader
  896. }
  897. sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
  898. msgs := sm.readMessages()
  899. if len(msgs) != tt.wMsg {
  900. t.Errorf("%d: len(msgs) = %d, want %d", i, len(msgs), tt.wMsg)
  901. }
  902. for _, m := range msgs {
  903. if m.Type != pb.MsgApp {
  904. t.Errorf("%d: msg.type = %v, want %v", i, m.Type, pb.MsgApp)
  905. }
  906. }
  907. }
  908. }
  909. func TestRestore(t *testing.T) {
  910. s := pb.Snapshot{
  911. Index: 11, // magic number
  912. Term: 11, // magic number
  913. Nodes: []uint64{1, 2, 3},
  914. }
  915. sm := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
  916. if ok := sm.restore(s); !ok {
  917. t.Fatal("restore fail, want succeed")
  918. }
  919. if sm.raftLog.lastIndex() != s.Index {
  920. t.Errorf("log.lastIndex = %d, want %d", sm.raftLog.lastIndex(), s.Index)
  921. }
  922. if sm.raftLog.term(s.Index) != s.Term {
  923. t.Errorf("log.lastTerm = %d, want %d", sm.raftLog.term(s.Index), s.Term)
  924. }
  925. sg := sm.nodes()
  926. if !reflect.DeepEqual(sg, s.Nodes) {
  927. t.Errorf("sm.Nodes = %+v, want %+v", sg, s.Nodes)
  928. }
  929. if !reflect.DeepEqual(sm.raftLog.snapshot, s) {
  930. t.Errorf("snapshot = %+v, want %+v", sm.raftLog.snapshot, s)
  931. }
  932. if ok := sm.restore(s); ok {
  933. t.Fatal("restore succeed, want fail")
  934. }
  935. }
  936. func TestProvideSnap(t *testing.T) {
  937. s := pb.Snapshot{
  938. Index: 11, // magic number
  939. Term: 11, // magic number
  940. Nodes: []uint64{1, 2},
  941. }
  942. sm := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
  943. // restore the statemachin from a snapshot
  944. // so it has a compacted log and a snapshot
  945. sm.restore(s)
  946. sm.becomeCandidate()
  947. sm.becomeLeader()
  948. // force set the next of node 1, so that
  949. // node 1 needs a snapshot
  950. sm.prs[2].next = sm.raftLog.firstIndex()
  951. sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs[2].next - 1, Reject: true})
  952. msgs := sm.readMessages()
  953. if len(msgs) != 1 {
  954. t.Fatalf("len(msgs) = %d, want 1", len(msgs))
  955. }
  956. m := msgs[0]
  957. if m.Type != pb.MsgSnap {
  958. t.Errorf("m.Type = %v, want %v", m.Type, pb.MsgSnap)
  959. }
  960. }
  961. func TestRestoreFromSnapMsg(t *testing.T) {
  962. s := pb.Snapshot{
  963. Index: 11, // magic number
  964. Term: 11, // magic number
  965. Nodes: []uint64{1, 2},
  966. }
  967. m := pb.Message{Type: pb.MsgSnap, From: 1, Term: 2, Snapshot: s}
  968. sm := newRaft(2, []uint64{1, 2}, 10, 1, NewMemoryStorage())
  969. sm.Step(m)
  970. if !reflect.DeepEqual(sm.raftLog.snapshot, s) {
  971. t.Errorf("snapshot = %+v, want %+v", sm.raftLog.snapshot, s)
  972. }
  973. }
  974. func TestSlowNodeRestore(t *testing.T) {
  975. nt := newNetwork(nil, nil, nil)
  976. nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  977. nt.isolate(3)
  978. for j := 0; j <= 100; j++ {
  979. nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
  980. }
  981. lead := nt.peers[1].(*raft)
  982. nextEnts(lead, nt.storage[1])
  983. lead.compact(lead.raftLog.applied, lead.nodes(), nil)
  984. nt.recover()
  985. // trigger a snapshot
  986. nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
  987. follower := nt.peers[3].(*raft)
  988. if !reflect.DeepEqual(follower.raftLog.snapshot, lead.raftLog.snapshot) {
  989. t.Errorf("follower.snap = %+v, want %+v", follower.raftLog.snapshot, lead.raftLog.snapshot)
  990. }
  991. // trigger a commit
  992. nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
  993. if follower.raftLog.committed != lead.raftLog.committed {
  994. t.Errorf("follower.comitted = %d, want %d", follower.raftLog.committed, lead.raftLog.committed)
  995. }
  996. }
  997. // TestStepConfig tests that when raft step msgProp in EntryConfChange type,
  998. // it appends the entry to log and sets pendingConf to be true.
  999. func TestStepConfig(t *testing.T) {
  1000. // a raft that cannot make progress
  1001. r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
  1002. r.becomeCandidate()
  1003. r.becomeLeader()
  1004. index := r.raftLog.lastIndex()
  1005. r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
  1006. if g := r.raftLog.lastIndex(); g != index+1 {
  1007. t.Errorf("index = %d, want %d", g, index+1)
  1008. }
  1009. if r.pendingConf != true {
  1010. t.Errorf("pendingConf = %v, want true", r.pendingConf)
  1011. }
  1012. }
  1013. // TestStepIgnoreConfig tests that if raft step the second msgProp in
  1014. // EntryConfChange type when the first one is uncommitted, the node will deny
  1015. // the proposal and keep its original state.
  1016. func TestStepIgnoreConfig(t *testing.T) {
  1017. // a raft that cannot make progress
  1018. r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
  1019. r.becomeCandidate()
  1020. r.becomeLeader()
  1021. r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
  1022. index := r.raftLog.lastIndex()
  1023. pendingConf := r.pendingConf
  1024. r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
  1025. if g := r.raftLog.lastIndex(); g != index {
  1026. t.Errorf("index = %d, want %d", g, index)
  1027. }
  1028. if r.pendingConf != pendingConf {
  1029. t.Errorf("pendingConf = %v, want %v", r.pendingConf, pendingConf)
  1030. }
  1031. }
  1032. // TestRecoverPendingConfig tests that new leader recovers its pendingConf flag
  1033. // based on uncommitted entries.
  1034. func TestRecoverPendingConfig(t *testing.T) {
  1035. tests := []struct {
  1036. entType pb.EntryType
  1037. wpending bool
  1038. }{
  1039. {pb.EntryNormal, false},
  1040. {pb.EntryConfChange, true},
  1041. }
  1042. for i, tt := range tests {
  1043. r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
  1044. r.appendEntry(pb.Entry{Type: tt.entType})
  1045. r.becomeCandidate()
  1046. r.becomeLeader()
  1047. if r.pendingConf != tt.wpending {
  1048. t.Errorf("#%d: pendingConf = %v, want %v", i, r.pendingConf, tt.wpending)
  1049. }
  1050. }
  1051. }
  1052. // TestRecoverDoublePendingConfig tests that new leader will panic if
  1053. // there exist two uncommitted config entries.
  1054. func TestRecoverDoublePendingConfig(t *testing.T) {
  1055. func() {
  1056. defer func() {
  1057. if err := recover(); err == nil {
  1058. t.Errorf("expect panic, but nothing happens")
  1059. }
  1060. }()
  1061. r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
  1062. r.appendEntry(pb.Entry{Type: pb.EntryConfChange})
  1063. r.appendEntry(pb.Entry{Type: pb.EntryConfChange})
  1064. r.becomeCandidate()
  1065. r.becomeLeader()
  1066. }()
  1067. }
  1068. // TestAddNode tests that addNode could update pendingConf and nodes correctly.
  1069. func TestAddNode(t *testing.T) {
  1070. r := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
  1071. r.pendingConf = true
  1072. r.addNode(2)
  1073. if r.pendingConf != false {
  1074. t.Errorf("pendingConf = %v, want false", r.pendingConf)
  1075. }
  1076. nodes := r.nodes()
  1077. wnodes := []uint64{1, 2}
  1078. if !reflect.DeepEqual(nodes, wnodes) {
  1079. t.Errorf("nodes = %v, want %v", nodes, wnodes)
  1080. }
  1081. }
  1082. // TestRemoveNode tests that removeNode could update pendingConf, nodes and
  1083. // and removed list correctly.
  1084. func TestRemoveNode(t *testing.T) {
  1085. r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
  1086. r.pendingConf = true
  1087. r.removeNode(2)
  1088. if r.pendingConf != false {
  1089. t.Errorf("pendingConf = %v, want false", r.pendingConf)
  1090. }
  1091. w := []uint64{1}
  1092. if g := r.nodes(); !reflect.DeepEqual(g, w) {
  1093. t.Errorf("nodes = %v, want %v", g, w)
  1094. }
  1095. }
  1096. func TestPromotable(t *testing.T) {
  1097. id := uint64(1)
  1098. tests := []struct {
  1099. peers []uint64
  1100. wp bool
  1101. }{
  1102. {[]uint64{1}, true},
  1103. {[]uint64{1, 2, 3}, true},
  1104. {[]uint64{}, false},
  1105. {[]uint64{2, 3}, false},
  1106. }
  1107. for i, tt := range tests {
  1108. r := &raft{id: id, prs: make(map[uint64]*progress)}
  1109. for _, id := range tt.peers {
  1110. r.prs[id] = &progress{}
  1111. }
  1112. if g := r.promotable(); g != tt.wp {
  1113. t.Errorf("#%d: promotable = %v, want %v", i, g, tt.wp)
  1114. }
  1115. }
  1116. }
  1117. func TestRaftNodes(t *testing.T) {
  1118. tests := []struct {
  1119. ids []uint64
  1120. wids []uint64
  1121. }{
  1122. {
  1123. []uint64{1, 2, 3},
  1124. []uint64{1, 2, 3},
  1125. },
  1126. {
  1127. []uint64{3, 2, 1},
  1128. []uint64{1, 2, 3},
  1129. },
  1130. }
  1131. for i, tt := range tests {
  1132. r := newRaft(1, tt.ids, 10, 1, NewMemoryStorage())
  1133. if !reflect.DeepEqual(r.nodes(), tt.wids) {
  1134. t.Errorf("#%d: nodes = %+v, want %+v", i, r.nodes(), tt.wids)
  1135. }
  1136. }
  1137. }
  1138. func ents(terms ...uint64) *raft {
  1139. ents := []pb.Entry{{}}
  1140. for _, term := range terms {
  1141. ents = append(ents, pb.Entry{Term: term})
  1142. }
  1143. sm := &raft{
  1144. raftLog: &raftLog{
  1145. storage: &MemoryStorage{ents: ents},
  1146. unstable: uint64(len(ents)),
  1147. },
  1148. }
  1149. sm.reset(0)
  1150. return sm
  1151. }
  1152. type network struct {
  1153. peers map[uint64]Interface
  1154. storage map[uint64]*MemoryStorage
  1155. dropm map[connem]float64
  1156. ignorem map[pb.MessageType]bool
  1157. }
  1158. // newNetwork initializes a network from peers.
  1159. // A nil node will be replaced with a new *stateMachine.
  1160. // A *stateMachine will get its k, id.
  1161. // When using stateMachine, the address list is always [1, n].
  1162. func newNetwork(peers ...Interface) *network {
  1163. size := len(peers)
  1164. peerAddrs := idsBySize(size)
  1165. npeers := make(map[uint64]Interface, size)
  1166. nstorage := make(map[uint64]*MemoryStorage, size)
  1167. for i, p := range peers {
  1168. id := peerAddrs[i]
  1169. switch v := p.(type) {
  1170. case nil:
  1171. nstorage[id] = NewMemoryStorage()
  1172. sm := newRaft(id, peerAddrs, 10, 1, nstorage[id])
  1173. npeers[id] = sm
  1174. case *raft:
  1175. v.id = id
  1176. v.prs = make(map[uint64]*progress)
  1177. for i := 0; i < size; i++ {
  1178. v.prs[peerAddrs[i]] = &progress{}
  1179. }
  1180. v.reset(0)
  1181. npeers[id] = v
  1182. case *blackHole:
  1183. npeers[id] = v
  1184. default:
  1185. panic(fmt.Sprintf("unexpected state machine type: %T", p))
  1186. }
  1187. }
  1188. return &network{
  1189. peers: npeers,
  1190. storage: nstorage,
  1191. dropm: make(map[connem]float64),
  1192. ignorem: make(map[pb.MessageType]bool),
  1193. }
  1194. }
  1195. func (nw *network) send(msgs ...pb.Message) {
  1196. for len(msgs) > 0 {
  1197. m := msgs[0]
  1198. p := nw.peers[m.To]
  1199. p.Step(m)
  1200. msgs = append(msgs[1:], nw.filter(p.readMessages())...)
  1201. }
  1202. }
  1203. func (nw *network) drop(from, to uint64, perc float64) {
  1204. nw.dropm[connem{from, to}] = perc
  1205. }
  1206. func (nw *network) cut(one, other uint64) {
  1207. nw.drop(one, other, 1)
  1208. nw.drop(other, one, 1)
  1209. }
  1210. func (nw *network) isolate(id uint64) {
  1211. for i := 0; i < len(nw.peers); i++ {
  1212. nid := uint64(i) + 1
  1213. if nid != id {
  1214. nw.drop(id, nid, 1.0)
  1215. nw.drop(nid, id, 1.0)
  1216. }
  1217. }
  1218. }
  1219. func (nw *network) ignore(t pb.MessageType) {
  1220. nw.ignorem[t] = true
  1221. }
  1222. func (nw *network) recover() {
  1223. nw.dropm = make(map[connem]float64)
  1224. nw.ignorem = make(map[pb.MessageType]bool)
  1225. }
  1226. func (nw *network) filter(msgs []pb.Message) []pb.Message {
  1227. mm := []pb.Message{}
  1228. for _, m := range msgs {
  1229. if nw.ignorem[m.Type] {
  1230. continue
  1231. }
  1232. switch m.Type {
  1233. case pb.MsgHup:
  1234. // hups never go over the network, so don't drop them but panic
  1235. panic("unexpected msgHup")
  1236. default:
  1237. perc := nw.dropm[connem{m.From, m.To}]
  1238. if n := rand.Float64(); n < perc {
  1239. continue
  1240. }
  1241. }
  1242. mm = append(mm, m)
  1243. }
  1244. return mm
  1245. }
  1246. type connem struct {
  1247. from, to uint64
  1248. }
  1249. type blackHole struct{}
  1250. func (blackHole) Step(pb.Message) error { return nil }
  1251. func (blackHole) readMessages() []pb.Message { return nil }
  1252. var nopStepper = &blackHole{}
  1253. func idsBySize(size int) []uint64 {
  1254. ids := make([]uint64, size)
  1255. for i := 0; i < size; i++ {
  1256. ids[i] = 1 + uint64(i)
  1257. }
  1258. return ids
  1259. }