multinode_test.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573
  1. // Copyright 2015 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package raft
  15. import (
  16. "bytes"
  17. "reflect"
  18. "testing"
  19. "time"
  20. "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
  21. "github.com/coreos/etcd/raft/raftpb"
  22. )
  23. // TestMultiNodeStep ensures that multiNode.Step sends MsgProp to propc
  24. // chan and other kinds of messages to recvc chan.
  25. func TestMultiNodeStep(t *testing.T) {
  26. for i, msgn := range raftpb.MessageType_name {
  27. mn := &multiNode{
  28. propc: make(chan multiMessage, 1),
  29. recvc: make(chan multiMessage, 1),
  30. }
  31. msgt := raftpb.MessageType(i)
  32. mn.Step(context.TODO(), 1, raftpb.Message{Type: msgt})
  33. // Proposal goes to proc chan. Others go to recvc chan.
  34. if msgt == raftpb.MsgProp {
  35. select {
  36. case <-mn.propc:
  37. default:
  38. t.Errorf("%d: cannot receive %s on propc chan", msgt, msgn)
  39. }
  40. } else {
  41. if msgt == raftpb.MsgBeat || msgt == raftpb.MsgHup || msgt == raftpb.MsgUnreachable || msgt == raftpb.MsgSnapStatus || msgt == raftpb.MsgCheckQuorum {
  42. select {
  43. case <-mn.recvc:
  44. t.Errorf("%d: step should ignore %s", msgt, msgn)
  45. default:
  46. }
  47. } else {
  48. select {
  49. case <-mn.recvc:
  50. default:
  51. t.Errorf("%d: cannot receive %s on recvc chan", msgt, msgn)
  52. }
  53. }
  54. }
  55. }
  56. }
  57. // Cancel and Stop should unblock Step()
  58. func TestMultiNodeStepUnblock(t *testing.T) {
  59. // a node without buffer to block step
  60. mn := &multiNode{
  61. propc: make(chan multiMessage),
  62. done: make(chan struct{}),
  63. }
  64. ctx, cancel := context.WithCancel(context.Background())
  65. stopFunc := func() { close(mn.done) }
  66. tests := []struct {
  67. unblock func()
  68. werr error
  69. }{
  70. {stopFunc, ErrStopped},
  71. {cancel, context.Canceled},
  72. }
  73. for i, tt := range tests {
  74. errc := make(chan error, 1)
  75. go func() {
  76. err := mn.Step(ctx, 1, raftpb.Message{Type: raftpb.MsgProp})
  77. errc <- err
  78. }()
  79. tt.unblock()
  80. select {
  81. case err := <-errc:
  82. if err != tt.werr {
  83. t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
  84. }
  85. //clean up side-effect
  86. if ctx.Err() != nil {
  87. ctx = context.TODO()
  88. }
  89. select {
  90. case <-mn.done:
  91. mn.done = make(chan struct{})
  92. default:
  93. }
  94. case <-time.After(time.Millisecond * 100):
  95. t.Errorf("#%d: failed to unblock step", i)
  96. }
  97. }
  98. }
  99. // TestMultiNodePropose ensures that node.Propose sends the given proposal to the underlying raft.
  100. func TestMultiNodePropose(t *testing.T) {
  101. mn := newMultiNode(1)
  102. go mn.run()
  103. s := NewMemoryStorage()
  104. mn.CreateGroup(1, newTestConfig(1, nil, 10, 1, s), []Peer{{ID: 1}})
  105. mn.Campaign(context.TODO(), 1)
  106. proposed := false
  107. for {
  108. rds := <-mn.Ready()
  109. rd := rds[1]
  110. s.Append(rd.Entries)
  111. // Once we are the leader, propose a command.
  112. if !proposed && rd.SoftState.Lead == mn.id {
  113. mn.Propose(context.TODO(), 1, []byte("somedata"))
  114. proposed = true
  115. }
  116. mn.Advance(rds)
  117. // Exit when we have three entries: one ConfChange, one no-op for the election,
  118. // and our proposed command.
  119. lastIndex, err := s.LastIndex()
  120. if err != nil {
  121. t.Fatal(err)
  122. }
  123. if lastIndex >= 3 {
  124. break
  125. }
  126. }
  127. mn.Stop()
  128. lastIndex, err := s.LastIndex()
  129. if err != nil {
  130. t.Fatal(err)
  131. }
  132. entries, err := s.Entries(lastIndex, lastIndex+1, noLimit)
  133. if err != nil {
  134. t.Fatal(err)
  135. }
  136. if len(entries) != 1 {
  137. t.Fatalf("len(entries) = %d, want %d", len(entries), 1)
  138. }
  139. if !bytes.Equal(entries[0].Data, []byte("somedata")) {
  140. t.Errorf("entries[0].Data = %v, want %v", entries[0].Data, []byte("somedata"))
  141. }
  142. }
  143. // TestMultiNodeProposeConfig ensures that multiNode.ProposeConfChange
  144. // sends the given configuration proposal to the underlying raft.
  145. func TestMultiNodeProposeConfig(t *testing.T) {
  146. mn := newMultiNode(1)
  147. go mn.run()
  148. s := NewMemoryStorage()
  149. mn.CreateGroup(1, newTestConfig(1, nil, 10, 1, s), []Peer{{ID: 1}})
  150. mn.Campaign(context.TODO(), 1)
  151. proposed := false
  152. var lastIndex uint64
  153. var ccdata []byte
  154. for {
  155. rds := <-mn.Ready()
  156. rd := rds[1]
  157. s.Append(rd.Entries)
  158. // change the step function to appendStep until this raft becomes leader
  159. if !proposed && rd.SoftState.Lead == mn.id {
  160. cc := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1}
  161. var err error
  162. ccdata, err = cc.Marshal()
  163. if err != nil {
  164. t.Fatal(err)
  165. }
  166. mn.ProposeConfChange(context.TODO(), 1, cc)
  167. proposed = true
  168. }
  169. mn.Advance(rds)
  170. var err error
  171. lastIndex, err = s.LastIndex()
  172. if err != nil {
  173. t.Fatal(err)
  174. }
  175. if lastIndex >= 3 {
  176. break
  177. }
  178. }
  179. mn.Stop()
  180. entries, err := s.Entries(lastIndex, lastIndex+1, noLimit)
  181. if err != nil {
  182. t.Fatal(err)
  183. }
  184. if len(entries) != 1 {
  185. t.Fatalf("len(entries) = %d, want %d", len(entries), 1)
  186. }
  187. if entries[0].Type != raftpb.EntryConfChange {
  188. t.Fatalf("type = %v, want %v", entries[0].Type, raftpb.EntryConfChange)
  189. }
  190. if !bytes.Equal(entries[0].Data, ccdata) {
  191. t.Errorf("data = %v, want %v", entries[0].Data, ccdata)
  192. }
  193. }
  194. // TestProposeUnknownGroup ensures that we gracefully handle proposals
  195. // for groups we don't know about (which can happen on a former leader
  196. // that has been removed from the group).
  197. //
  198. // It is analogous to TestBlockProposal from node_test.go but in
  199. // MultiNode we cannot block proposals based on individual group
  200. // leader status.
  201. func TestProposeUnknownGroup(t *testing.T) {
  202. mn := newMultiNode(1)
  203. go mn.run()
  204. defer mn.Stop()
  205. // A nil error from Propose() doesn't mean much. In this case the
  206. // proposal will be dropped on the floor because we don't know
  207. // anything about group 42. This is a very crude test that mainly
  208. // guarantees that we don't panic in this case.
  209. if err := mn.Propose(context.TODO(), 42, []byte("somedata")); err != nil {
  210. t.Errorf("err = %v, want nil", err)
  211. }
  212. }
  213. // TestProposeAfterRemoveLeader ensures that we gracefully handle
  214. // proposals that are attempted after a leader has been removed from
  215. // the active configuration, but before that leader has called
  216. // MultiNode.RemoveGroup.
  217. func TestProposeAfterRemoveLeader(t *testing.T) {
  218. ctx, cancel := context.WithCancel(context.Background())
  219. defer cancel()
  220. mn := newMultiNode(1)
  221. go mn.run()
  222. defer mn.Stop()
  223. storage := NewMemoryStorage()
  224. if err := mn.CreateGroup(1, newTestConfig(1, nil, 10, 1, storage),
  225. []Peer{{ID: 1}}); err != nil {
  226. t.Fatal(err)
  227. }
  228. if err := mn.Campaign(ctx, 1); err != nil {
  229. t.Fatal(err)
  230. }
  231. if err := mn.ProposeConfChange(ctx, 1, raftpb.ConfChange{
  232. Type: raftpb.ConfChangeRemoveNode,
  233. NodeID: 1,
  234. }); err != nil {
  235. t.Fatal(err)
  236. }
  237. gs := <-mn.Ready()
  238. g := gs[1]
  239. if err := storage.Append(g.Entries); err != nil {
  240. t.Fatal(err)
  241. }
  242. for _, e := range g.CommittedEntries {
  243. if e.Type == raftpb.EntryConfChange {
  244. var cc raftpb.ConfChange
  245. if err := cc.Unmarshal(e.Data); err != nil {
  246. t.Fatal(err)
  247. }
  248. mn.ApplyConfChange(1, cc)
  249. }
  250. }
  251. mn.Advance(gs)
  252. if err := mn.Propose(ctx, 1, []byte("somedata")); err != nil {
  253. t.Errorf("err = %v, want nil", err)
  254. }
  255. }
  256. // TestNodeTick from node_test.go has no equivalent in multiNode because
  257. // it reaches into the raft object which is not exposed.
  258. // TestMultiNodeStop ensures that multiNode.Stop() blocks until the node has stopped
  259. // processing, and that it is idempotent
  260. func TestMultiNodeStop(t *testing.T) {
  261. mn := newMultiNode(1)
  262. donec := make(chan struct{})
  263. go func() {
  264. mn.run()
  265. close(donec)
  266. }()
  267. mn.Tick()
  268. mn.Stop()
  269. select {
  270. case <-donec:
  271. case <-time.After(time.Second):
  272. t.Fatalf("timed out waiting for node to stop!")
  273. }
  274. // Further ticks should have no effect, the node is stopped.
  275. // There is no way to verify this in multinode but at least we can test
  276. // it doesn't block or panic.
  277. mn.Tick()
  278. // Subsequent Stops should have no effect.
  279. mn.Stop()
  280. }
  281. // TestMultiNodeStart ensures that a node can be started correctly. The node should
  282. // start with correct configuration change entries, and can accept and commit
  283. // proposals.
  284. func TestMultiNodeStart(t *testing.T) {
  285. ctx, cancel := context.WithCancel(context.Background())
  286. defer cancel()
  287. cc := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1}
  288. ccdata, err := cc.Marshal()
  289. if err != nil {
  290. t.Fatalf("unexpected marshal error: %v", err)
  291. }
  292. wants := []Ready{
  293. {
  294. SoftState: &SoftState{Lead: 1, RaftState: StateLeader},
  295. HardState: raftpb.HardState{Term: 2, Commit: 2, Vote: 1},
  296. Entries: []raftpb.Entry{
  297. {Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata},
  298. {Term: 2, Index: 2},
  299. },
  300. CommittedEntries: []raftpb.Entry{
  301. {Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata},
  302. {Term: 2, Index: 2},
  303. },
  304. },
  305. {
  306. HardState: raftpb.HardState{Term: 2, Commit: 3, Vote: 1},
  307. Entries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
  308. CommittedEntries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
  309. },
  310. }
  311. mn := StartMultiNode(1)
  312. storage := NewMemoryStorage()
  313. mn.CreateGroup(1, newTestConfig(1, nil, 10, 1, storage), []Peer{{ID: 1}})
  314. mn.Campaign(ctx, 1)
  315. gs := <-mn.Ready()
  316. g := gs[1]
  317. if !reflect.DeepEqual(g, wants[0]) {
  318. t.Fatalf("#%d: g = %+v,\n w %+v", 1, g, wants[0])
  319. } else {
  320. storage.Append(g.Entries)
  321. mn.Advance(gs)
  322. }
  323. mn.Propose(ctx, 1, []byte("foo"))
  324. if gs2 := <-mn.Ready(); !reflect.DeepEqual(gs2[1], wants[1]) {
  325. t.Errorf("#%d: g = %+v,\n w %+v", 2, gs2[1], wants[1])
  326. } else {
  327. storage.Append(gs2[1].Entries)
  328. mn.Advance(gs2)
  329. }
  330. select {
  331. case rd := <-mn.Ready():
  332. t.Errorf("unexpected Ready: %+v", rd)
  333. case <-time.After(time.Millisecond):
  334. }
  335. }
  336. func TestMultiNodeRestart(t *testing.T) {
  337. entries := []raftpb.Entry{
  338. {Term: 1, Index: 1},
  339. {Term: 1, Index: 2, Data: []byte("foo")},
  340. }
  341. st := raftpb.HardState{Term: 1, Commit: 1}
  342. want := Ready{
  343. HardState: emptyState,
  344. // commit up to index commit index in st
  345. CommittedEntries: entries[:st.Commit],
  346. }
  347. storage := NewMemoryStorage()
  348. storage.SetHardState(st)
  349. storage.Append(entries)
  350. mn := StartMultiNode(1)
  351. mn.CreateGroup(1, newTestConfig(1, nil, 10, 1, storage), nil)
  352. gs := <-mn.Ready()
  353. if !reflect.DeepEqual(gs[1], want) {
  354. t.Errorf("g = %+v,\n w %+v", gs[1], want)
  355. }
  356. mn.Advance(gs)
  357. select {
  358. case rd := <-mn.Ready():
  359. t.Errorf("unexpected Ready: %+v", rd)
  360. case <-time.After(time.Millisecond):
  361. }
  362. mn.Stop()
  363. }
  364. func TestMultiNodeRestartFromSnapshot(t *testing.T) {
  365. snap := raftpb.Snapshot{
  366. Metadata: raftpb.SnapshotMetadata{
  367. ConfState: raftpb.ConfState{Nodes: []uint64{1, 2}},
  368. Index: 2,
  369. Term: 1,
  370. },
  371. }
  372. entries := []raftpb.Entry{
  373. {Term: 1, Index: 3, Data: []byte("foo")},
  374. }
  375. st := raftpb.HardState{Term: 1, Commit: 3}
  376. want := Ready{
  377. HardState: emptyState,
  378. // commit up to index commit index in st
  379. CommittedEntries: entries,
  380. }
  381. s := NewMemoryStorage()
  382. s.SetHardState(st)
  383. s.ApplySnapshot(snap)
  384. s.Append(entries)
  385. mn := StartMultiNode(1)
  386. mn.CreateGroup(1, newTestConfig(1, nil, 10, 1, s), nil)
  387. if gs := <-mn.Ready(); !reflect.DeepEqual(gs[1], want) {
  388. t.Errorf("g = %+v,\n w %+v", gs[1], want)
  389. } else {
  390. mn.Advance(gs)
  391. }
  392. select {
  393. case rd := <-mn.Ready():
  394. t.Errorf("unexpected Ready: %+v", rd)
  395. case <-time.After(time.Millisecond):
  396. }
  397. }
  398. func TestMultiNodeAdvance(t *testing.T) {
  399. ctx, cancel := context.WithCancel(context.Background())
  400. defer cancel()
  401. storage := NewMemoryStorage()
  402. mn := StartMultiNode(1)
  403. mn.CreateGroup(1, newTestConfig(1, nil, 10, 1, storage), []Peer{{ID: 1}})
  404. mn.Campaign(ctx, 1)
  405. rd1 := <-mn.Ready()
  406. mn.Propose(ctx, 1, []byte("foo"))
  407. select {
  408. case rd2 := <-mn.Ready():
  409. t.Fatalf("unexpected Ready before Advance: %+v", rd2)
  410. case <-time.After(time.Millisecond):
  411. }
  412. storage.Append(rd1[1].Entries)
  413. mn.Advance(rd1)
  414. select {
  415. case <-mn.Ready():
  416. case <-time.After(100 * time.Millisecond):
  417. t.Errorf("expect Ready after Advance, but there is no Ready available")
  418. }
  419. }
  420. func TestMultiNodeStatus(t *testing.T) {
  421. storage := NewMemoryStorage()
  422. mn := StartMultiNode(1)
  423. err := mn.CreateGroup(1, newTestConfig(1, nil, 10, 1, storage), []Peer{{ID: 1}})
  424. if err != nil {
  425. t.Fatal(err)
  426. }
  427. status := mn.Status(1)
  428. if status == nil {
  429. t.Errorf("expected status struct, got nil")
  430. }
  431. status = mn.Status(2)
  432. if status != nil {
  433. t.Errorf("expected nil status, got %+v", status)
  434. }
  435. }
  436. // TestMultiNodePerGroupID tests that MultiNode may have a different
  437. // node ID for each group, if and only if the Config.ID field is
  438. // filled in when calling CreateGroup.
  439. func TestMultiNodePerGroupID(t *testing.T) {
  440. storage := NewMemoryStorage()
  441. mn := StartMultiNode(0)
  442. // Maps group ID to node ID.
  443. groups := map[uint64]uint64{
  444. 1: 10,
  445. 2: 20,
  446. }
  447. // Create two groups.
  448. for g, nodeID := range groups {
  449. err := mn.CreateGroup(g, newTestConfig(nodeID, nil, 10, 1, storage),
  450. []Peer{{ID: nodeID}, {ID: nodeID + 1}, {ID: nodeID + 2}})
  451. if err != nil {
  452. t.Fatal(err)
  453. }
  454. }
  455. // Campaign on both groups.
  456. for g := range groups {
  457. err := mn.Campaign(context.Background(), g)
  458. if err != nil {
  459. t.Fatal(err)
  460. }
  461. }
  462. // All outgoing messages (two MsgVotes for each group) should have
  463. // the correct From IDs.
  464. var rd map[uint64]Ready
  465. select {
  466. case rd = <-mn.Ready():
  467. case <-time.After(100 * time.Millisecond):
  468. t.Fatal("timed out waiting for ready")
  469. }
  470. for g, nodeID := range groups {
  471. if len(rd[g].Messages) != 2 {
  472. t.Errorf("expected 2 messages in group %d; got %d", g, len(rd[g].Messages))
  473. }
  474. for _, m := range rd[g].Messages {
  475. if m.From != nodeID {
  476. t.Errorf("expected %s message in group %d to have From: %d; got %d",
  477. m.Type, g, nodeID, m.From)
  478. }
  479. }
  480. }
  481. mn.Advance(rd)
  482. // Become a follower in both groups.
  483. for g, nodeID := range groups {
  484. err := mn.Step(context.Background(), g, raftpb.Message{
  485. Type: raftpb.MsgHeartbeat,
  486. To: nodeID,
  487. From: nodeID + 1,
  488. })
  489. if err != nil {
  490. t.Fatal(err)
  491. }
  492. }
  493. // Propose a command on each group (Propose is tested separately
  494. // because proposals in follower mode go through a different code path).
  495. for g := range groups {
  496. err := mn.Propose(context.Background(), g, []byte("foo"))
  497. if err != nil {
  498. t.Fatal(err)
  499. }
  500. }
  501. // Validate that all outgoing messages (heartbeat response and
  502. // proposal) have the correct From IDs.
  503. select {
  504. case rd = <-mn.Ready():
  505. case <-time.After(100 * time.Millisecond):
  506. t.Fatal("timed out waiting for ready")
  507. }
  508. for g, nodeID := range groups {
  509. if len(rd[g].Messages) != 2 {
  510. t.Errorf("expected 2 messages in group %d; got %d", g, len(rd[g].Messages))
  511. }
  512. for _, m := range rd[g].Messages {
  513. if m.From != nodeID {
  514. t.Errorf("expected %s message in group %d to have From: %d; got %d",
  515. m.Type, g, nodeID, m.From)
  516. }
  517. }
  518. }
  519. mn.Advance(rd)
  520. }