multinode_test.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412
  1. // Copyright 2015 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package raft
  15. import (
  16. "bytes"
  17. "reflect"
  18. "testing"
  19. "time"
  20. "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
  21. "github.com/coreos/etcd/raft/raftpb"
  22. )
  23. // TestMultiNodeStep ensures that multiNode.Step sends MsgProp to propc
  24. // chan and other kinds of messages to recvc chan.
  25. func TestMultiNodeStep(t *testing.T) {
  26. for i, msgn := range raftpb.MessageType_name {
  27. mn := &multiNode{
  28. propc: make(chan multiMessage, 1),
  29. recvc: make(chan multiMessage, 1),
  30. }
  31. msgt := raftpb.MessageType(i)
  32. mn.Step(context.TODO(), 1, raftpb.Message{Type: msgt})
  33. // Proposal goes to proc chan. Others go to recvc chan.
  34. if msgt == raftpb.MsgProp {
  35. select {
  36. case <-mn.propc:
  37. default:
  38. t.Errorf("%d: cannot receive %s on propc chan", msgt, msgn)
  39. }
  40. } else {
  41. if msgt == raftpb.MsgBeat || msgt == raftpb.MsgHup || msgt == raftpb.MsgUnreachable || msgt == raftpb.MsgSnapStatus {
  42. select {
  43. case <-mn.recvc:
  44. t.Errorf("%d: step should ignore %s", msgt, msgn)
  45. default:
  46. }
  47. } else {
  48. select {
  49. case <-mn.recvc:
  50. default:
  51. t.Errorf("%d: cannot receive %s on recvc chan", msgt, msgn)
  52. }
  53. }
  54. }
  55. }
  56. }
  57. // Cancel and Stop should unblock Step()
  58. func TestMultiNodeStepUnblock(t *testing.T) {
  59. // a node without buffer to block step
  60. mn := &multiNode{
  61. propc: make(chan multiMessage),
  62. done: make(chan struct{}),
  63. }
  64. ctx, cancel := context.WithCancel(context.Background())
  65. stopFunc := func() { close(mn.done) }
  66. tests := []struct {
  67. unblock func()
  68. werr error
  69. }{
  70. {stopFunc, ErrStopped},
  71. {cancel, context.Canceled},
  72. }
  73. for i, tt := range tests {
  74. errc := make(chan error, 1)
  75. go func() {
  76. err := mn.Step(ctx, 1, raftpb.Message{Type: raftpb.MsgProp})
  77. errc <- err
  78. }()
  79. tt.unblock()
  80. select {
  81. case err := <-errc:
  82. if err != tt.werr {
  83. t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
  84. }
  85. //clean up side-effect
  86. if ctx.Err() != nil {
  87. ctx = context.TODO()
  88. }
  89. select {
  90. case <-mn.done:
  91. mn.done = make(chan struct{})
  92. default:
  93. }
  94. case <-time.After(time.Millisecond * 100):
  95. t.Errorf("#%d: failed to unblock step", i)
  96. }
  97. }
  98. }
  99. // TestMultiNodePropose ensures that node.Propose sends the given proposal to the underlying raft.
  100. func TestMultiNodePropose(t *testing.T) {
  101. mn := newMultiNode(1)
  102. go mn.run()
  103. s := NewMemoryStorage()
  104. mn.CreateGroup(1, newTestConfig(1, nil, 10, 1, s), []Peer{{ID: 1}})
  105. mn.Campaign(context.TODO(), 1)
  106. proposed := false
  107. for {
  108. rds := <-mn.Ready()
  109. rd := rds[1]
  110. s.Append(rd.Entries)
  111. // Once we are the leader, propose a command.
  112. if !proposed && rd.SoftState.Lead == mn.id {
  113. mn.Propose(context.TODO(), 1, []byte("somedata"))
  114. proposed = true
  115. }
  116. mn.Advance(rds)
  117. // Exit when we have three entries: one ConfChange, one no-op for the election,
  118. // and our proposed command.
  119. lastIndex, err := s.LastIndex()
  120. if err != nil {
  121. t.Fatal(err)
  122. }
  123. if lastIndex >= 3 {
  124. break
  125. }
  126. }
  127. mn.Stop()
  128. lastIndex, err := s.LastIndex()
  129. if err != nil {
  130. t.Fatal(err)
  131. }
  132. entries, err := s.Entries(lastIndex, lastIndex+1, noLimit)
  133. if err != nil {
  134. t.Fatal(err)
  135. }
  136. if len(entries) != 1 {
  137. t.Fatalf("len(entries) = %d, want %d", len(entries), 1)
  138. }
  139. if !bytes.Equal(entries[0].Data, []byte("somedata")) {
  140. t.Errorf("entries[0].Data = %v, want %v", entries[0].Data, []byte("somedata"))
  141. }
  142. }
  143. // TestMultiNodeProposeConfig ensures that multiNode.ProposeConfChange
  144. // sends the given configuration proposal to the underlying raft.
  145. func TestMultiNodeProposeConfig(t *testing.T) {
  146. mn := newMultiNode(1)
  147. go mn.run()
  148. s := NewMemoryStorage()
  149. mn.CreateGroup(1, newTestConfig(1, nil, 10, 1, s), []Peer{{ID: 1}})
  150. mn.Campaign(context.TODO(), 1)
  151. proposed := false
  152. var lastIndex uint64
  153. var ccdata []byte
  154. for {
  155. rds := <-mn.Ready()
  156. rd := rds[1]
  157. s.Append(rd.Entries)
  158. // change the step function to appendStep until this raft becomes leader
  159. if !proposed && rd.SoftState.Lead == mn.id {
  160. cc := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1}
  161. var err error
  162. ccdata, err = cc.Marshal()
  163. if err != nil {
  164. t.Fatal(err)
  165. }
  166. mn.ProposeConfChange(context.TODO(), 1, cc)
  167. proposed = true
  168. }
  169. mn.Advance(rds)
  170. var err error
  171. lastIndex, err = s.LastIndex()
  172. if err != nil {
  173. t.Fatal(err)
  174. }
  175. if lastIndex >= 3 {
  176. break
  177. }
  178. }
  179. mn.Stop()
  180. entries, err := s.Entries(lastIndex, lastIndex+1, noLimit)
  181. if err != nil {
  182. t.Fatal(err)
  183. }
  184. if len(entries) != 1 {
  185. t.Fatalf("len(entries) = %d, want %d", len(entries), 1)
  186. }
  187. if entries[0].Type != raftpb.EntryConfChange {
  188. t.Fatalf("type = %v, want %v", entries[0].Type, raftpb.EntryConfChange)
  189. }
  190. if !bytes.Equal(entries[0].Data, ccdata) {
  191. t.Errorf("data = %v, want %v", entries[0].Data, ccdata)
  192. }
  193. }
  194. // TestBlockProposal from node_test.go has no equivalent in multiNode
  195. // because we cannot block proposals based on individual group leader status.
  196. // TestNodeTick from node_test.go has no equivalent in multiNode because
  197. // it reaches into the raft object which is not exposed.
  198. // TestMultiNodeStop ensures that multiNode.Stop() blocks until the node has stopped
  199. // processing, and that it is idempotent
  200. func TestMultiNodeStop(t *testing.T) {
  201. mn := newMultiNode(1)
  202. donec := make(chan struct{})
  203. go func() {
  204. mn.run()
  205. close(donec)
  206. }()
  207. mn.Tick()
  208. mn.Stop()
  209. select {
  210. case <-donec:
  211. case <-time.After(time.Second):
  212. t.Fatalf("timed out waiting for node to stop!")
  213. }
  214. // Further ticks should have no effect, the node is stopped.
  215. // There is no way to verify this in multinode but at least we can test
  216. // it doesn't block or panic.
  217. mn.Tick()
  218. // Subsequent Stops should have no effect.
  219. mn.Stop()
  220. }
  221. // TestMultiNodeStart ensures that a node can be started correctly. The node should
  222. // start with correct configuration change entries, and can accept and commit
  223. // proposals.
  224. func TestMultiNodeStart(t *testing.T) {
  225. ctx, cancel := context.WithCancel(context.Background())
  226. defer cancel()
  227. cc := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1}
  228. ccdata, err := cc.Marshal()
  229. if err != nil {
  230. t.Fatalf("unexpected marshal error: %v", err)
  231. }
  232. wants := []Ready{
  233. {
  234. SoftState: &SoftState{Lead: 1, RaftState: StateLeader},
  235. HardState: raftpb.HardState{Term: 2, Commit: 2, Vote: 1},
  236. Entries: []raftpb.Entry{
  237. {Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata},
  238. {Term: 2, Index: 2},
  239. },
  240. CommittedEntries: []raftpb.Entry{
  241. {Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata},
  242. {Term: 2, Index: 2},
  243. },
  244. },
  245. {
  246. HardState: raftpb.HardState{Term: 2, Commit: 3, Vote: 1},
  247. Entries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
  248. CommittedEntries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
  249. },
  250. }
  251. mn := StartMultiNode(1)
  252. storage := NewMemoryStorage()
  253. mn.CreateGroup(1, newTestConfig(1, nil, 10, 1, storage), []Peer{{ID: 1}})
  254. mn.Campaign(ctx, 1)
  255. gs := <-mn.Ready()
  256. g := gs[1]
  257. if !reflect.DeepEqual(g, wants[0]) {
  258. t.Fatalf("#%d: g = %+v,\n w %+v", 1, g, wants[0])
  259. } else {
  260. storage.Append(g.Entries)
  261. mn.Advance(gs)
  262. }
  263. mn.Propose(ctx, 1, []byte("foo"))
  264. if gs2 := <-mn.Ready(); !reflect.DeepEqual(gs2[1], wants[1]) {
  265. t.Errorf("#%d: g = %+v,\n w %+v", 2, gs2[1], wants[1])
  266. } else {
  267. storage.Append(gs2[1].Entries)
  268. mn.Advance(gs2)
  269. }
  270. select {
  271. case rd := <-mn.Ready():
  272. t.Errorf("unexpected Ready: %+v", rd)
  273. case <-time.After(time.Millisecond):
  274. }
  275. }
  276. func TestMultiNodeRestart(t *testing.T) {
  277. entries := []raftpb.Entry{
  278. {Term: 1, Index: 1},
  279. {Term: 1, Index: 2, Data: []byte("foo")},
  280. }
  281. st := raftpb.HardState{Term: 1, Commit: 1}
  282. want := Ready{
  283. HardState: emptyState,
  284. // commit up to index commit index in st
  285. CommittedEntries: entries[:st.Commit],
  286. }
  287. storage := NewMemoryStorage()
  288. storage.SetHardState(st)
  289. storage.Append(entries)
  290. mn := StartMultiNode(1)
  291. mn.CreateGroup(1, newTestConfig(1, nil, 10, 1, storage), nil)
  292. gs := <-mn.Ready()
  293. if !reflect.DeepEqual(gs[1], want) {
  294. t.Errorf("g = %+v,\n w %+v", gs[1], want)
  295. }
  296. mn.Advance(gs)
  297. select {
  298. case rd := <-mn.Ready():
  299. t.Errorf("unexpected Ready: %+v", rd)
  300. case <-time.After(time.Millisecond):
  301. }
  302. mn.Stop()
  303. }
  304. func TestMultiNodeRestartFromSnapshot(t *testing.T) {
  305. snap := raftpb.Snapshot{
  306. Metadata: raftpb.SnapshotMetadata{
  307. ConfState: raftpb.ConfState{Nodes: []uint64{1, 2}},
  308. Index: 2,
  309. Term: 1,
  310. },
  311. }
  312. entries := []raftpb.Entry{
  313. {Term: 1, Index: 3, Data: []byte("foo")},
  314. }
  315. st := raftpb.HardState{Term: 1, Commit: 3}
  316. want := Ready{
  317. HardState: emptyState,
  318. // commit up to index commit index in st
  319. CommittedEntries: entries,
  320. }
  321. s := NewMemoryStorage()
  322. s.SetHardState(st)
  323. s.ApplySnapshot(snap)
  324. s.Append(entries)
  325. mn := StartMultiNode(1)
  326. mn.CreateGroup(1, newTestConfig(1, nil, 10, 1, s), nil)
  327. if gs := <-mn.Ready(); !reflect.DeepEqual(gs[1], want) {
  328. t.Errorf("g = %+v,\n w %+v", gs[1], want)
  329. } else {
  330. mn.Advance(gs)
  331. }
  332. select {
  333. case rd := <-mn.Ready():
  334. t.Errorf("unexpected Ready: %+v", rd)
  335. case <-time.After(time.Millisecond):
  336. }
  337. }
  338. func TestMultiNodeAdvance(t *testing.T) {
  339. ctx, cancel := context.WithCancel(context.Background())
  340. defer cancel()
  341. storage := NewMemoryStorage()
  342. mn := StartMultiNode(1)
  343. mn.CreateGroup(1, newTestConfig(1, nil, 10, 1, storage), []Peer{{ID: 1}})
  344. mn.Campaign(ctx, 1)
  345. rd1 := <-mn.Ready()
  346. mn.Propose(ctx, 1, []byte("foo"))
  347. select {
  348. case rd2 := <-mn.Ready():
  349. t.Fatalf("unexpected Ready before Advance: %+v", rd2)
  350. case <-time.After(time.Millisecond):
  351. }
  352. storage.Append(rd1[1].Entries)
  353. mn.Advance(rd1)
  354. select {
  355. case <-mn.Ready():
  356. case <-time.After(time.Millisecond):
  357. t.Errorf("expect Ready after Advance, but there is no Ready available")
  358. }
  359. }
  360. func TestMultiNodeStatus(t *testing.T) {
  361. storage := NewMemoryStorage()
  362. mn := StartMultiNode(1)
  363. err := mn.CreateGroup(1, newTestConfig(1, nil, 10, 1, storage), []Peer{{ID: 1}})
  364. if err != nil {
  365. t.Fatal(err)
  366. }
  367. status := mn.Status(1)
  368. if status == nil {
  369. t.Errorf("expected status struct, got nil")
  370. }
  371. status = mn.Status(2)
  372. if status != nil {
  373. t.Errorf("expected nil status, got %+v", status)
  374. }
  375. }