node_test.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package raft
  14. import (
  15. "reflect"
  16. "testing"
  17. "time"
  18. "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
  19. "github.com/coreos/etcd/pkg/testutil"
  20. "github.com/coreos/etcd/raft/raftpb"
  21. )
  22. // TestNodeStep ensures that node.Step sends msgProp to propc chan
  23. // and other kinds of messages to recvc chan.
  24. func TestNodeStep(t *testing.T) {
  25. for i, msgn := range raftpb.MessageType_name {
  26. n := &node{
  27. propc: make(chan raftpb.Message, 1),
  28. recvc: make(chan raftpb.Message, 1),
  29. }
  30. msgt := raftpb.MessageType(i)
  31. n.Step(context.TODO(), raftpb.Message{Type: msgt})
  32. // Proposal goes to proc chan. Others go to recvc chan.
  33. if msgt == raftpb.MsgProp {
  34. select {
  35. case <-n.propc:
  36. default:
  37. t.Errorf("%d: cannot receive %s on propc chan", msgt, msgn)
  38. }
  39. } else {
  40. if msgt == raftpb.MsgBeat || msgt == raftpb.MsgHup {
  41. select {
  42. case <-n.recvc:
  43. t.Errorf("%d: step should ignore %s", msgt, msgn)
  44. default:
  45. }
  46. } else {
  47. select {
  48. case <-n.recvc:
  49. default:
  50. t.Errorf("%d: cannot receive %s on recvc chan", msgt, msgn)
  51. }
  52. }
  53. }
  54. }
  55. }
  56. // Cancel and Stop should unblock Step()
  57. func TestNodeStepUnblock(t *testing.T) {
  58. // a node without buffer to block step
  59. n := &node{
  60. propc: make(chan raftpb.Message),
  61. done: make(chan struct{}),
  62. }
  63. ctx, cancel := context.WithCancel(context.Background())
  64. stopFunc := func() { close(n.done) }
  65. tests := []struct {
  66. unblock func()
  67. werr error
  68. }{
  69. {stopFunc, ErrStopped},
  70. {cancel, context.Canceled},
  71. }
  72. for i, tt := range tests {
  73. errc := make(chan error, 1)
  74. go func() {
  75. err := n.Step(ctx, raftpb.Message{Type: raftpb.MsgProp})
  76. errc <- err
  77. }()
  78. tt.unblock()
  79. select {
  80. case err := <-errc:
  81. if err != tt.werr {
  82. t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
  83. }
  84. //clean up side-effect
  85. if ctx.Err() != nil {
  86. ctx = context.TODO()
  87. }
  88. select {
  89. case <-n.done:
  90. n.done = make(chan struct{})
  91. default:
  92. }
  93. case <-time.After(time.Millisecond * 100):
  94. t.Errorf("#%d: failed to unblock step", i)
  95. }
  96. }
  97. }
  98. // TestNodePropose ensures that node.Propose sends the given proposal to the underlying raft.
  99. func TestNodePropose(t *testing.T) {
  100. msgs := []raftpb.Message{}
  101. appendStep := func(r *raft, m raftpb.Message) {
  102. msgs = append(msgs, m)
  103. }
  104. n := newNode()
  105. s := NewMemoryStorage()
  106. r := newRaft(1, []uint64{1}, 10, 1, s)
  107. go n.run(r)
  108. n.Campaign(context.TODO())
  109. for {
  110. rd := <-n.Ready()
  111. s.Append(rd.Entries)
  112. // change the step function to appendStep until this raft becomes leader
  113. if rd.SoftState.Lead == r.id {
  114. r.step = appendStep
  115. n.Advance()
  116. break
  117. }
  118. n.Advance()
  119. }
  120. n.Propose(context.TODO(), []byte("somedata"))
  121. n.Stop()
  122. if len(msgs) != 1 {
  123. t.Fatalf("len(msgs) = %d, want %d", len(msgs), 1)
  124. }
  125. if msgs[0].Type != raftpb.MsgProp {
  126. t.Errorf("msg type = %d, want %d", msgs[0].Type, raftpb.MsgProp)
  127. }
  128. if !reflect.DeepEqual(msgs[0].Entries[0].Data, []byte("somedata")) {
  129. t.Errorf("data = %v, want %v", msgs[0].Entries[0].Data, []byte("somedata"))
  130. }
  131. }
  132. // TestNodeProposeConfig ensures that node.ProposeConfChange sends the given configuration proposal
  133. // to the underlying raft.
  134. func TestNodeProposeConfig(t *testing.T) {
  135. msgs := []raftpb.Message{}
  136. appendStep := func(r *raft, m raftpb.Message) {
  137. msgs = append(msgs, m)
  138. }
  139. n := newNode()
  140. s := NewMemoryStorage()
  141. r := newRaft(1, []uint64{1}, 10, 1, s)
  142. go n.run(r)
  143. n.Campaign(context.TODO())
  144. for {
  145. rd := <-n.Ready()
  146. s.Append(rd.Entries)
  147. // change the step function to appendStep until this raft becomes leader
  148. if rd.SoftState.Lead == r.id {
  149. r.step = appendStep
  150. n.Advance()
  151. break
  152. }
  153. n.Advance()
  154. }
  155. cc := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1}
  156. ccdata, err := cc.Marshal()
  157. if err != nil {
  158. t.Fatal(err)
  159. }
  160. n.ProposeConfChange(context.TODO(), cc)
  161. n.Stop()
  162. if len(msgs) != 1 {
  163. t.Fatalf("len(msgs) = %d, want %d", len(msgs), 1)
  164. }
  165. if msgs[0].Type != raftpb.MsgProp {
  166. t.Errorf("msg type = %d, want %d", msgs[0].Type, raftpb.MsgProp)
  167. }
  168. if !reflect.DeepEqual(msgs[0].Entries[0].Data, ccdata) {
  169. t.Errorf("data = %v, want %v", msgs[0].Entries[0].Data, ccdata)
  170. }
  171. }
  172. // TestBlockProposal ensures that node will block proposal when it does not
  173. // know who is the current leader; node will accept proposal when it knows
  174. // who is the current leader.
  175. func TestBlockProposal(t *testing.T) {
  176. n := newNode()
  177. r := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
  178. go n.run(r)
  179. defer n.Stop()
  180. errc := make(chan error, 1)
  181. go func() {
  182. errc <- n.Propose(context.TODO(), []byte("somedata"))
  183. }()
  184. testutil.ForceGosched()
  185. select {
  186. case err := <-errc:
  187. t.Errorf("err = %v, want blocking", err)
  188. default:
  189. }
  190. n.Campaign(context.TODO())
  191. testutil.ForceGosched()
  192. select {
  193. case err := <-errc:
  194. if err != nil {
  195. t.Errorf("err = %v, want %v", err, nil)
  196. }
  197. default:
  198. t.Errorf("blocking proposal, want unblocking")
  199. }
  200. }
  201. // TestNodeTick ensures that node.Tick() will increase the
  202. // elapsed of the underlying raft state machine.
  203. func TestNodeTick(t *testing.T) {
  204. n := newNode()
  205. s := NewMemoryStorage()
  206. r := newRaft(1, []uint64{1}, 10, 1, s)
  207. go n.run(r)
  208. elapsed := r.elapsed
  209. n.Tick()
  210. n.Stop()
  211. if r.elapsed != elapsed+1 {
  212. t.Errorf("elapsed = %d, want %d", r.elapsed, elapsed+1)
  213. }
  214. }
  215. // TestNodeStop ensures that node.Stop() blocks until the node has stopped
  216. // processing, and that it is idempotent
  217. func TestNodeStop(t *testing.T) {
  218. n := newNode()
  219. s := NewMemoryStorage()
  220. r := newRaft(1, []uint64{1}, 10, 1, s)
  221. donec := make(chan struct{})
  222. go func() {
  223. n.run(r)
  224. close(donec)
  225. }()
  226. elapsed := r.elapsed
  227. n.Tick()
  228. n.Stop()
  229. select {
  230. case <-donec:
  231. case <-time.After(time.Second):
  232. t.Fatalf("timed out waiting for node to stop!")
  233. }
  234. if r.elapsed != elapsed+1 {
  235. t.Errorf("elapsed = %d, want %d", r.elapsed, elapsed+1)
  236. }
  237. // Further ticks should have no effect, the node is stopped.
  238. n.Tick()
  239. if r.elapsed != elapsed+1 {
  240. t.Errorf("elapsed = %d, want %d", r.elapsed, elapsed+1)
  241. }
  242. // Subsequent Stops should have no effect.
  243. n.Stop()
  244. }
  245. func TestReadyContainUpdates(t *testing.T) {
  246. tests := []struct {
  247. rd Ready
  248. wcontain bool
  249. }{
  250. {Ready{}, false},
  251. {Ready{SoftState: &SoftState{Lead: 1}}, true},
  252. {Ready{HardState: raftpb.HardState{Vote: 1}}, true},
  253. {Ready{Entries: make([]raftpb.Entry, 1, 1)}, true},
  254. {Ready{CommittedEntries: make([]raftpb.Entry, 1, 1)}, true},
  255. {Ready{Messages: make([]raftpb.Message, 1, 1)}, true},
  256. {Ready{Snapshot: raftpb.Snapshot{Metadata: raftpb.SnapshotMetadata{Index: 1}}}, true},
  257. }
  258. for i, tt := range tests {
  259. if g := tt.rd.containsUpdates(); g != tt.wcontain {
  260. t.Errorf("#%d: containUpdates = %v, want %v", i, g, tt.wcontain)
  261. }
  262. }
  263. }
  264. // TestNodeStart ensures that a node can be started correctly. The node should
  265. // start with correct configuration change entries, and can accept and commit
  266. // proposals.
  267. func TestNodeStart(t *testing.T) {
  268. ctx, cancel := context.WithCancel(context.Background())
  269. defer cancel()
  270. cc := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1}
  271. ccdata, err := cc.Marshal()
  272. if err != nil {
  273. t.Fatalf("unexpected marshal error: %v", err)
  274. }
  275. wants := []Ready{
  276. {
  277. SoftState: &SoftState{Lead: 1, RaftState: StateLeader},
  278. HardState: raftpb.HardState{Term: 2, Commit: 2},
  279. Entries: []raftpb.Entry{
  280. {Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata},
  281. {Term: 2, Index: 2},
  282. },
  283. CommittedEntries: []raftpb.Entry{
  284. {Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata},
  285. {Term: 2, Index: 2},
  286. },
  287. },
  288. {
  289. HardState: raftpb.HardState{Term: 2, Commit: 3},
  290. Entries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
  291. CommittedEntries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
  292. },
  293. }
  294. storage := NewMemoryStorage()
  295. n := StartNode(1, []Peer{{ID: 1}}, 10, 1, storage)
  296. n.Campaign(ctx)
  297. g := <-n.Ready()
  298. if !reflect.DeepEqual(g, wants[0]) {
  299. t.Fatalf("#%d: g = %+v,\n w %+v", 1, g, wants[0])
  300. } else {
  301. storage.Append(g.Entries)
  302. n.Advance()
  303. }
  304. n.Propose(ctx, []byte("foo"))
  305. if g := <-n.Ready(); !reflect.DeepEqual(g, wants[1]) {
  306. t.Errorf("#%d: g = %+v,\n w %+v", 2, g, wants[1])
  307. } else {
  308. storage.Append(g.Entries)
  309. n.Advance()
  310. }
  311. select {
  312. case rd := <-n.Ready():
  313. t.Errorf("unexpected Ready: %+v", rd)
  314. case <-time.After(time.Millisecond):
  315. }
  316. }
  317. func TestNodeRestart(t *testing.T) {
  318. entries := []raftpb.Entry{
  319. {Term: 1, Index: 1},
  320. {Term: 1, Index: 2, Data: []byte("foo")},
  321. }
  322. st := raftpb.HardState{Term: 1, Commit: 1}
  323. want := Ready{
  324. HardState: emptyState,
  325. // commit up to index commit index in st
  326. CommittedEntries: entries[:st.Commit],
  327. }
  328. storage := NewMemoryStorage()
  329. storage.SetHardState(st)
  330. storage.Append(entries)
  331. n := RestartNode(1, 10, 1, storage)
  332. if g := <-n.Ready(); !reflect.DeepEqual(g, want) {
  333. t.Errorf("g = %+v,\n w %+v", g, want)
  334. }
  335. n.Advance()
  336. select {
  337. case rd := <-n.Ready():
  338. t.Errorf("unexpected Ready: %+v", rd)
  339. case <-time.After(time.Millisecond):
  340. }
  341. }
  342. func TestNodeRestartFromSnapshot(t *testing.T) {
  343. snap := raftpb.Snapshot{
  344. Metadata: raftpb.SnapshotMetadata{
  345. ConfState: raftpb.ConfState{Nodes: []uint64{1, 2}},
  346. Index: 2,
  347. Term: 1,
  348. },
  349. }
  350. entries := []raftpb.Entry{
  351. {Term: 1, Index: 3, Data: []byte("foo")},
  352. }
  353. st := raftpb.HardState{Term: 1, Commit: 3}
  354. want := Ready{
  355. HardState: emptyState,
  356. // commit up to index commit index in st
  357. CommittedEntries: entries,
  358. }
  359. s := NewMemoryStorage()
  360. s.SetHardState(st)
  361. s.ApplySnapshot(snap)
  362. s.Append(entries)
  363. n := RestartNode(1, 10, 1, s)
  364. if g := <-n.Ready(); !reflect.DeepEqual(g, want) {
  365. t.Errorf("g = %+v,\n w %+v", g, want)
  366. } else {
  367. n.Advance()
  368. }
  369. select {
  370. case rd := <-n.Ready():
  371. t.Errorf("unexpected Ready: %+v", rd)
  372. case <-time.After(time.Millisecond):
  373. }
  374. }
  375. func TestNodeAdvance(t *testing.T) {
  376. ctx, cancel := context.WithCancel(context.Background())
  377. defer cancel()
  378. storage := NewMemoryStorage()
  379. n := StartNode(1, []Peer{{ID: 1}}, 10, 1, storage)
  380. n.Campaign(ctx)
  381. <-n.Ready()
  382. n.Propose(ctx, []byte("foo"))
  383. var rd Ready
  384. select {
  385. case rd = <-n.Ready():
  386. t.Fatalf("unexpected Ready before Advance: %+v", rd)
  387. case <-time.After(time.Millisecond):
  388. }
  389. storage.Append(rd.Entries)
  390. n.Advance()
  391. select {
  392. case <-n.Ready():
  393. case <-time.After(time.Millisecond):
  394. t.Errorf("expect Ready after Advance, but there is no Ready available")
  395. }
  396. }
  397. func TestSoftStateEqual(t *testing.T) {
  398. tests := []struct {
  399. st *SoftState
  400. we bool
  401. }{
  402. {&SoftState{}, true},
  403. {&SoftState{Lead: 1}, false},
  404. {&SoftState{RaftState: StateLeader}, false},
  405. }
  406. for i, tt := range tests {
  407. if g := tt.st.equal(&SoftState{}); g != tt.we {
  408. t.Errorf("#%d, equal = %v, want %v", i, g, tt.we)
  409. }
  410. }
  411. }
  412. func TestIsHardStateEqual(t *testing.T) {
  413. tests := []struct {
  414. st raftpb.HardState
  415. we bool
  416. }{
  417. {emptyState, true},
  418. {raftpb.HardState{Vote: 1}, false},
  419. {raftpb.HardState{Commit: 1}, false},
  420. {raftpb.HardState{Term: 1}, false},
  421. }
  422. for i, tt := range tests {
  423. if isHardStateEqual(tt.st, emptyState) != tt.we {
  424. t.Errorf("#%d, equal = %v, want %v", i, isHardStateEqual(tt.st, emptyState), tt.we)
  425. }
  426. }
  427. }