server_test.go 36 KB


  1. // Copyright 2015 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package etcdserver
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "io/ioutil"
  19. "os"
  20. "path"
  21. "reflect"
  22. "strconv"
  23. "testing"
  24. "time"
  25. "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
  26. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  27. "github.com/coreos/etcd/pkg/idutil"
  28. "github.com/coreos/etcd/pkg/pbutil"
  29. "github.com/coreos/etcd/pkg/testutil"
  30. "github.com/coreos/etcd/pkg/types"
  31. "github.com/coreos/etcd/pkg/wait"
  32. "github.com/coreos/etcd/raft"
  33. "github.com/coreos/etcd/raft/raftpb"
  34. "github.com/coreos/etcd/rafthttp"
  35. dstorage "github.com/coreos/etcd/storage"
  36. "github.com/coreos/etcd/store"
  37. )
  38. // TestDoLocalAction tests requests which do not need to go through raft to be applied,
  39. // and are served through local data.
  40. func TestDoLocalAction(t *testing.T) {
  41. tests := []struct {
  42. req pb.Request
  43. wresp Response
  44. werr error
  45. wactions []testutil.Action
  46. }{
  47. {
  48. pb.Request{Method: "GET", ID: 1, Wait: true},
  49. Response{Watcher: store.NewNopWatcher()}, nil, []testutil.Action{{Name: "Watch"}},
  50. },
  51. {
  52. pb.Request{Method: "GET", ID: 1},
  53. Response{Event: &store.Event{}}, nil,
  54. []testutil.Action{
  55. {
  56. Name: "Get",
  57. Params: []interface{}{"", false, false},
  58. },
  59. },
  60. },
  61. {
  62. pb.Request{Method: "HEAD", ID: 1},
  63. Response{Event: &store.Event{}}, nil,
  64. []testutil.Action{
  65. {
  66. Name: "Get",
  67. Params: []interface{}{"", false, false},
  68. },
  69. },
  70. },
  71. {
  72. pb.Request{Method: "BADMETHOD", ID: 1},
  73. Response{}, ErrUnknownMethod, []testutil.Action{},
  74. },
  75. }
  76. for i, tt := range tests {
  77. st := store.NewRecorder()
  78. srv := &EtcdServer{
  79. store: st,
  80. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  81. }
  82. resp, err := srv.Do(context.TODO(), tt.req)
  83. if err != tt.werr {
  84. t.Fatalf("#%d: err = %+v, want %+v", i, err, tt.werr)
  85. }
  86. if !reflect.DeepEqual(resp, tt.wresp) {
  87. t.Errorf("#%d: resp = %+v, want %+v", i, resp, tt.wresp)
  88. }
  89. gaction := st.Action()
  90. if !reflect.DeepEqual(gaction, tt.wactions) {
  91. t.Errorf("#%d: action = %+v, want %+v", i, gaction, tt.wactions)
  92. }
  93. }
  94. }
  95. // TestDoBadLocalAction tests server requests which do not need to go through consensus,
  96. // and return errors when they fetch from local data.
  97. func TestDoBadLocalAction(t *testing.T) {
  98. storeErr := fmt.Errorf("bah")
  99. tests := []struct {
  100. req pb.Request
  101. wactions []testutil.Action
  102. }{
  103. {
  104. pb.Request{Method: "GET", ID: 1, Wait: true},
  105. []testutil.Action{{Name: "Watch"}},
  106. },
  107. {
  108. pb.Request{Method: "GET", ID: 1},
  109. []testutil.Action{
  110. {
  111. Name: "Get",
  112. Params: []interface{}{"", false, false},
  113. },
  114. },
  115. },
  116. {
  117. pb.Request{Method: "HEAD", ID: 1},
  118. []testutil.Action{
  119. {
  120. Name: "Get",
  121. Params: []interface{}{"", false, false},
  122. },
  123. },
  124. },
  125. }
  126. for i, tt := range tests {
  127. st := store.NewErrRecorder(storeErr)
  128. srv := &EtcdServer{
  129. store: st,
  130. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  131. }
  132. resp, err := srv.Do(context.Background(), tt.req)
  133. if err != storeErr {
  134. t.Fatalf("#%d: err = %+v, want %+v", i, err, storeErr)
  135. }
  136. if !reflect.DeepEqual(resp, Response{}) {
  137. t.Errorf("#%d: resp = %+v, want %+v", i, resp, Response{})
  138. }
  139. gaction := st.Action()
  140. if !reflect.DeepEqual(gaction, tt.wactions) {
  141. t.Errorf("#%d: action = %+v, want %+v", i, gaction, tt.wactions)
  142. }
  143. }
  144. }
  145. func TestApplyRequest(t *testing.T) {
  146. tests := []struct {
  147. req pb.Request
  148. wresp Response
  149. wactions []testutil.Action
  150. }{
  151. // POST ==> Create
  152. {
  153. pb.Request{Method: "POST", ID: 1},
  154. Response{Event: &store.Event{}},
  155. []testutil.Action{
  156. {
  157. Name: "Create",
  158. Params: []interface{}{"", false, "", true, time.Time{}},
  159. },
  160. },
  161. },
  162. // POST ==> Create, with expiration
  163. {
  164. pb.Request{Method: "POST", ID: 1, Expiration: 1337},
  165. Response{Event: &store.Event{}},
  166. []testutil.Action{
  167. {
  168. Name: "Create",
  169. Params: []interface{}{"", false, "", true, time.Unix(0, 1337)},
  170. },
  171. },
  172. },
  173. // POST ==> Create, with dir
  174. {
  175. pb.Request{Method: "POST", ID: 1, Dir: true},
  176. Response{Event: &store.Event{}},
  177. []testutil.Action{
  178. {
  179. Name: "Create",
  180. Params: []interface{}{"", true, "", true, time.Time{}},
  181. },
  182. },
  183. },
  184. // PUT ==> Set
  185. {
  186. pb.Request{Method: "PUT", ID: 1},
  187. Response{Event: &store.Event{}},
  188. []testutil.Action{
  189. {
  190. Name: "Set",
  191. Params: []interface{}{"", false, "", time.Time{}},
  192. },
  193. },
  194. },
  195. // PUT ==> Set, with dir
  196. {
  197. pb.Request{Method: "PUT", ID: 1, Dir: true},
  198. Response{Event: &store.Event{}},
  199. []testutil.Action{
  200. {
  201. Name: "Set",
  202. Params: []interface{}{"", true, "", time.Time{}},
  203. },
  204. },
  205. },
  206. // PUT with PrevExist=true ==> Update
  207. {
  208. pb.Request{Method: "PUT", ID: 1, PrevExist: pbutil.Boolp(true)},
  209. Response{Event: &store.Event{}},
  210. []testutil.Action{
  211. {
  212. Name: "Update",
  213. Params: []interface{}{"", "", time.Time{}},
  214. },
  215. },
  216. },
  217. // PUT with PrevExist=false ==> Create
  218. {
  219. pb.Request{Method: "PUT", ID: 1, PrevExist: pbutil.Boolp(false)},
  220. Response{Event: &store.Event{}},
  221. []testutil.Action{
  222. {
  223. Name: "Create",
  224. Params: []interface{}{"", false, "", false, time.Time{}},
  225. },
  226. },
  227. },
  228. // PUT with PrevExist=true *and* PrevIndex set ==> CompareAndSwap
  229. {
  230. pb.Request{Method: "PUT", ID: 1, PrevExist: pbutil.Boolp(true), PrevIndex: 1},
  231. Response{Event: &store.Event{}},
  232. []testutil.Action{
  233. {
  234. Name: "CompareAndSwap",
  235. Params: []interface{}{"", "", uint64(1), "", time.Time{}},
  236. },
  237. },
  238. },
  239. // PUT with PrevExist=false *and* PrevIndex set ==> Create
  240. {
  241. pb.Request{Method: "PUT", ID: 1, PrevExist: pbutil.Boolp(false), PrevIndex: 1},
  242. Response{Event: &store.Event{}},
  243. []testutil.Action{
  244. {
  245. Name: "Create",
  246. Params: []interface{}{"", false, "", false, time.Time{}},
  247. },
  248. },
  249. },
  250. // PUT with PrevIndex set ==> CompareAndSwap
  251. {
  252. pb.Request{Method: "PUT", ID: 1, PrevIndex: 1},
  253. Response{Event: &store.Event{}},
  254. []testutil.Action{
  255. {
  256. Name: "CompareAndSwap",
  257. Params: []interface{}{"", "", uint64(1), "", time.Time{}},
  258. },
  259. },
  260. },
  261. // PUT with PrevValue set ==> CompareAndSwap
  262. {
  263. pb.Request{Method: "PUT", ID: 1, PrevValue: "bar"},
  264. Response{Event: &store.Event{}},
  265. []testutil.Action{
  266. {
  267. Name: "CompareAndSwap",
  268. Params: []interface{}{"", "bar", uint64(0), "", time.Time{}},
  269. },
  270. },
  271. },
  272. // PUT with PrevIndex and PrevValue set ==> CompareAndSwap
  273. {
  274. pb.Request{Method: "PUT", ID: 1, PrevIndex: 1, PrevValue: "bar"},
  275. Response{Event: &store.Event{}},
  276. []testutil.Action{
  277. {
  278. Name: "CompareAndSwap",
  279. Params: []interface{}{"", "bar", uint64(1), "", time.Time{}},
  280. },
  281. },
  282. },
  283. // DELETE ==> Delete
  284. {
  285. pb.Request{Method: "DELETE", ID: 1},
  286. Response{Event: &store.Event{}},
  287. []testutil.Action{
  288. {
  289. Name: "Delete",
  290. Params: []interface{}{"", false, false},
  291. },
  292. },
  293. },
  294. // DELETE with PrevIndex set ==> CompareAndDelete
  295. {
  296. pb.Request{Method: "DELETE", ID: 1, PrevIndex: 1},
  297. Response{Event: &store.Event{}},
  298. []testutil.Action{
  299. {
  300. Name: "CompareAndDelete",
  301. Params: []interface{}{"", "", uint64(1)},
  302. },
  303. },
  304. },
  305. // DELETE with PrevValue set ==> CompareAndDelete
  306. {
  307. pb.Request{Method: "DELETE", ID: 1, PrevValue: "bar"},
  308. Response{Event: &store.Event{}},
  309. []testutil.Action{
  310. {
  311. Name: "CompareAndDelete",
  312. Params: []interface{}{"", "bar", uint64(0)},
  313. },
  314. },
  315. },
  316. // DELETE with PrevIndex *and* PrevValue set ==> CompareAndDelete
  317. {
  318. pb.Request{Method: "DELETE", ID: 1, PrevIndex: 5, PrevValue: "bar"},
  319. Response{Event: &store.Event{}},
  320. []testutil.Action{
  321. {
  322. Name: "CompareAndDelete",
  323. Params: []interface{}{"", "bar", uint64(5)},
  324. },
  325. },
  326. },
  327. // QGET ==> Get
  328. {
  329. pb.Request{Method: "QGET", ID: 1},
  330. Response{Event: &store.Event{}},
  331. []testutil.Action{
  332. {
  333. Name: "Get",
  334. Params: []interface{}{"", false, false},
  335. },
  336. },
  337. },
  338. // SYNC ==> DeleteExpiredKeys
  339. {
  340. pb.Request{Method: "SYNC", ID: 1},
  341. Response{},
  342. []testutil.Action{
  343. {
  344. Name: "DeleteExpiredKeys",
  345. Params: []interface{}{time.Unix(0, 0)},
  346. },
  347. },
  348. },
  349. {
  350. pb.Request{Method: "SYNC", ID: 1, Time: 12345},
  351. Response{},
  352. []testutil.Action{
  353. {
  354. Name: "DeleteExpiredKeys",
  355. Params: []interface{}{time.Unix(0, 12345)},
  356. },
  357. },
  358. },
  359. // Unknown method - error
  360. {
  361. pb.Request{Method: "BADMETHOD", ID: 1},
  362. Response{err: ErrUnknownMethod},
  363. []testutil.Action{},
  364. },
  365. }
  366. for i, tt := range tests {
  367. st := store.NewRecorder()
  368. srv := &EtcdServer{store: st}
  369. resp := srv.applyRequest(tt.req)
  370. if !reflect.DeepEqual(resp, tt.wresp) {
  371. t.Errorf("#%d: resp = %+v, want %+v", i, resp, tt.wresp)
  372. }
  373. gaction := st.Action()
  374. if !reflect.DeepEqual(gaction, tt.wactions) {
  375. t.Errorf("#%d: action = %#v, want %#v", i, gaction, tt.wactions)
  376. }
  377. }
  378. }
  379. func TestApplyRequestOnAdminMemberAttributes(t *testing.T) {
  380. cl := newTestCluster([]*Member{{ID: 1}})
  381. srv := &EtcdServer{
  382. store: store.NewRecorder(),
  383. cluster: cl,
  384. }
  385. req := pb.Request{
  386. Method: "PUT",
  387. ID: 1,
  388. Path: path.Join(storeMembersPrefix, strconv.FormatUint(1, 16), attributesSuffix),
  389. Val: `{"Name":"abc","ClientURLs":["http://127.0.0.1:2379"]}`,
  390. }
  391. srv.applyRequest(req)
  392. w := Attributes{Name: "abc", ClientURLs: []string{"http://127.0.0.1:2379"}}
  393. if g := cl.Member(1).Attributes; !reflect.DeepEqual(g, w) {
  394. t.Errorf("attributes = %v, want %v", g, w)
  395. }
  396. }
  397. func TestApplyConfChangeError(t *testing.T) {
  398. cl := newCluster("")
  399. cl.SetStore(store.New())
  400. for i := 1; i <= 4; i++ {
  401. cl.AddMember(&Member{ID: types.ID(i)})
  402. }
  403. cl.RemoveMember(4)
  404. tests := []struct {
  405. cc raftpb.ConfChange
  406. werr error
  407. }{
  408. {
  409. raftpb.ConfChange{
  410. Type: raftpb.ConfChangeAddNode,
  411. NodeID: 4,
  412. },
  413. ErrIDRemoved,
  414. },
  415. {
  416. raftpb.ConfChange{
  417. Type: raftpb.ConfChangeUpdateNode,
  418. NodeID: 4,
  419. },
  420. ErrIDRemoved,
  421. },
  422. {
  423. raftpb.ConfChange{
  424. Type: raftpb.ConfChangeAddNode,
  425. NodeID: 1,
  426. },
  427. ErrIDExists,
  428. },
  429. {
  430. raftpb.ConfChange{
  431. Type: raftpb.ConfChangeRemoveNode,
  432. NodeID: 5,
  433. },
  434. ErrIDNotFound,
  435. },
  436. }
  437. for i, tt := range tests {
  438. n := newNodeRecorder()
  439. srv := &EtcdServer{
  440. r: raftNode{Node: n},
  441. cluster: cl,
  442. cfg: &ServerConfig{},
  443. }
  444. _, err := srv.applyConfChange(tt.cc, nil)
  445. if err != tt.werr {
  446. t.Errorf("#%d: applyConfChange error = %v, want %v", i, err, tt.werr)
  447. }
  448. cc := raftpb.ConfChange{Type: tt.cc.Type, NodeID: raft.None}
  449. w := []testutil.Action{
  450. {
  451. Name: "ApplyConfChange",
  452. Params: []interface{}{cc},
  453. },
  454. }
  455. if g, _ := n.Wait(1); !reflect.DeepEqual(g, w) {
  456. t.Errorf("#%d: action = %+v, want %+v", i, g, w)
  457. }
  458. }
  459. }
  460. func TestApplyConfChangeShouldStop(t *testing.T) {
  461. cl := newCluster("")
  462. cl.SetStore(store.New())
  463. for i := 1; i <= 3; i++ {
  464. cl.AddMember(&Member{ID: types.ID(i)})
  465. }
  466. srv := &EtcdServer{
  467. id: 1,
  468. r: raftNode{
  469. Node: newNodeNop(),
  470. transport: rafthttp.NewNopTransporter(),
  471. },
  472. cluster: cl,
  473. }
  474. cc := raftpb.ConfChange{
  475. Type: raftpb.ConfChangeRemoveNode,
  476. NodeID: 2,
  477. }
  478. // remove non-local member
  479. shouldStop, err := srv.applyConfChange(cc, &raftpb.ConfState{})
  480. if err != nil {
  481. t.Fatalf("unexpected error %v", err)
  482. }
  483. if shouldStop != false {
  484. t.Errorf("shouldStop = %t, want %t", shouldStop, false)
  485. }
  486. // remove local member
  487. cc.NodeID = 1
  488. shouldStop, err = srv.applyConfChange(cc, &raftpb.ConfState{})
  489. if err != nil {
  490. t.Fatalf("unexpected error %v", err)
  491. }
  492. if shouldStop != true {
  493. t.Errorf("shouldStop = %t, want %t", shouldStop, true)
  494. }
  495. }
  496. // TestApplyMultiConfChangeShouldStop ensures that apply will return shouldStop
  497. // if the local member is removed along with other conf updates.
  498. func TestApplyMultiConfChangeShouldStop(t *testing.T) {
  499. cl := newCluster("")
  500. cl.SetStore(store.New())
  501. for i := 1; i <= 5; i++ {
  502. cl.AddMember(&Member{ID: types.ID(i)})
  503. }
  504. srv := &EtcdServer{
  505. id: 2,
  506. r: raftNode{
  507. Node: newNodeNop(),
  508. transport: rafthttp.NewNopTransporter(),
  509. },
  510. cluster: cl,
  511. w: wait.New(),
  512. }
  513. ents := []raftpb.Entry{}
  514. for i := 1; i <= 4; i++ {
  515. ent := raftpb.Entry{
  516. Term: 1,
  517. Index: uint64(i),
  518. Type: raftpb.EntryConfChange,
  519. Data: pbutil.MustMarshal(
  520. &raftpb.ConfChange{
  521. Type: raftpb.ConfChangeRemoveNode,
  522. NodeID: uint64(i)}),
  523. }
  524. ents = append(ents, ent)
  525. }
  526. _, shouldStop := srv.apply(ents, &raftpb.ConfState{})
  527. if shouldStop == false {
  528. t.Errorf("shouldStop = %t, want %t", shouldStop, true)
  529. }
  530. }
  531. func TestDoProposal(t *testing.T) {
  532. tests := []pb.Request{
  533. {Method: "POST", ID: 1},
  534. {Method: "PUT", ID: 1},
  535. {Method: "DELETE", ID: 1},
  536. {Method: "GET", ID: 1, Quorum: true},
  537. }
  538. for i, tt := range tests {
  539. st := store.NewRecorder()
  540. srv := &EtcdServer{
  541. cfg: &ServerConfig{TickMs: 1},
  542. r: raftNode{
  543. Node: newNodeCommitter(),
  544. storage: &storageRecorder{},
  545. raftStorage: raft.NewMemoryStorage(),
  546. transport: rafthttp.NewNopTransporter(),
  547. },
  548. store: st,
  549. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  550. }
  551. srv.start()
  552. resp, err := srv.Do(context.Background(), tt)
  553. srv.Stop()
  554. action := st.Action()
  555. if len(action) != 1 {
  556. t.Errorf("#%d: len(action) = %d, want 1", i, len(action))
  557. }
  558. if err != nil {
  559. t.Fatalf("#%d: err = %v, want nil", i, err)
  560. }
  561. wresp := Response{Event: &store.Event{}}
  562. if !reflect.DeepEqual(resp, wresp) {
  563. t.Errorf("#%d: resp = %v, want %v", i, resp, wresp)
  564. }
  565. }
  566. }
  567. func TestDoProposalCancelled(t *testing.T) {
  568. wait := wait.NewRecorder()
  569. srv := &EtcdServer{
  570. cfg: &ServerConfig{TickMs: 1},
  571. r: raftNode{Node: newNodeNop()},
  572. w: wait,
  573. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  574. }
  575. ctx, cancel := context.WithCancel(context.Background())
  576. cancel()
  577. _, err := srv.Do(ctx, pb.Request{Method: "PUT"})
  578. if err != ErrCanceled {
  579. t.Fatalf("err = %v, want %v", err, ErrCanceled)
  580. }
  581. w := []testutil.Action{{Name: "Register"}, {Name: "Trigger"}}
  582. if !reflect.DeepEqual(wait.Action(), w) {
  583. t.Errorf("wait.action = %+v, want %+v", wait.Action(), w)
  584. }
  585. }
  586. func TestDoProposalTimeout(t *testing.T) {
  587. srv := &EtcdServer{
  588. cfg: &ServerConfig{TickMs: 1},
  589. r: raftNode{Node: newNodeNop()},
  590. w: wait.NewNop(),
  591. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  592. }
  593. ctx, _ := context.WithTimeout(context.Background(), 0)
  594. _, err := srv.Do(ctx, pb.Request{Method: "PUT"})
  595. if err != ErrTimeout {
  596. t.Fatalf("err = %v, want %v", err, ErrTimeout)
  597. }
  598. }
  599. func TestDoProposalStopped(t *testing.T) {
  600. srv := &EtcdServer{
  601. cfg: &ServerConfig{TickMs: 1},
  602. r: raftNode{Node: newNodeNop()},
  603. w: wait.NewNop(),
  604. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  605. }
  606. srv.done = make(chan struct{})
  607. close(srv.done)
  608. _, err := srv.Do(context.Background(), pb.Request{Method: "PUT", ID: 1})
  609. if err != ErrStopped {
  610. t.Errorf("err = %v, want %v", err, ErrStopped)
  611. }
  612. }
  613. // TestSync tests sync 1. is nonblocking 2. proposes SYNC request.
  614. func TestSync(t *testing.T) {
  615. n := newNodeRecorder()
  616. srv := &EtcdServer{
  617. r: raftNode{Node: n},
  618. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  619. }
  620. // check that sync is non-blocking
  621. done := make(chan struct{})
  622. go func() {
  623. srv.sync(10 * time.Second)
  624. done <- struct{}{}
  625. }()
  626. select {
  627. case <-done:
  628. case <-time.After(time.Second):
  629. t.Fatal("sync should be non-blocking but did not return after 1s!")
  630. }
  631. action, _ := n.Wait(1)
  632. if len(action) != 1 {
  633. t.Fatalf("len(action) = %d, want 1", len(action))
  634. }
  635. if action[0].Name != "Propose" {
  636. t.Fatalf("action = %s, want Propose", action[0].Name)
  637. }
  638. data := action[0].Params[0].([]byte)
  639. var r pb.Request
  640. if err := r.Unmarshal(data); err != nil {
  641. t.Fatalf("unmarshal request error: %v", err)
  642. }
  643. if r.Method != "SYNC" {
  644. t.Errorf("method = %s, want SYNC", r.Method)
  645. }
  646. }
  647. // TestSyncTimeout tests the case that sync 1. is non-blocking 2. cancel request
  648. // after timeout
  649. func TestSyncTimeout(t *testing.T) {
  650. n := newProposalBlockerRecorder()
  651. srv := &EtcdServer{
  652. r: raftNode{Node: n},
  653. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  654. }
  655. // check that sync is non-blocking
  656. done := make(chan struct{})
  657. go func() {
  658. srv.sync(0)
  659. done <- struct{}{}
  660. }()
  661. select {
  662. case <-done:
  663. case <-time.After(time.Second):
  664. t.Fatal("sync should be non-blocking but did not return after 1s!")
  665. }
  666. w := []testutil.Action{{Name: "Propose blocked"}}
  667. if g, _ := n.Wait(1); !reflect.DeepEqual(g, w) {
  668. t.Errorf("action = %v, want %v", g, w)
  669. }
  670. }
  671. // TODO: TestNoSyncWhenNoLeader
  672. // TestSyncTrigger tests that the server proposes a SYNC request when its sync timer ticks
  673. func TestSyncTrigger(t *testing.T) {
  674. n := newReadyNode()
  675. st := make(chan time.Time, 1)
  676. srv := &EtcdServer{
  677. cfg: &ServerConfig{TickMs: 1},
  678. r: raftNode{
  679. Node: n,
  680. raftStorage: raft.NewMemoryStorage(),
  681. transport: rafthttp.NewNopTransporter(),
  682. storage: &storageRecorder{},
  683. },
  684. store: store.NewNop(),
  685. SyncTicker: st,
  686. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  687. }
  688. // trigger the server to become a leader and accept sync requests
  689. go func() {
  690. srv.start()
  691. n.readyc <- raft.Ready{
  692. SoftState: &raft.SoftState{
  693. RaftState: raft.StateLeader,
  694. },
  695. }
  696. // trigger a sync request
  697. st <- time.Time{}
  698. }()
  699. action, _ := n.Wait(1)
  700. go srv.Stop()
  701. if len(action) != 1 {
  702. t.Fatalf("len(action) = %d, want 1", len(action))
  703. }
  704. if action[0].Name != "Propose" {
  705. t.Fatalf("action = %s, want Propose", action[0].Name)
  706. }
  707. data := action[0].Params[0].([]byte)
  708. var req pb.Request
  709. if err := req.Unmarshal(data); err != nil {
  710. t.Fatalf("error unmarshalling data: %v", err)
  711. }
  712. if req.Method != "SYNC" {
  713. t.Fatalf("unexpected proposed request: %#v", req.Method)
  714. }
  715. // wait on stop message
  716. <-n.Chan()
  717. }
  718. // snapshot should snapshot the store and cut the persistent
  719. func TestSnapshot(t *testing.T) {
  720. s := raft.NewMemoryStorage()
  721. s.Append([]raftpb.Entry{{Index: 1}})
  722. st := store.NewRecorder()
  723. p := &storageRecorder{}
  724. srv := &EtcdServer{
  725. cfg: &ServerConfig{},
  726. r: raftNode{
  727. Node: newNodeNop(),
  728. raftStorage: s,
  729. storage: p,
  730. },
  731. store: st,
  732. }
  733. srv.snapshot(1, raftpb.ConfState{Nodes: []uint64{1}})
  734. gaction, _ := st.Wait(2)
  735. if len(gaction) != 2 {
  736. t.Fatalf("len(action) = %d, want 1", len(gaction))
  737. }
  738. if !reflect.DeepEqual(gaction[0], testutil.Action{Name: "Clone"}) {
  739. t.Errorf("action = %s, want Clone", gaction[0])
  740. }
  741. if !reflect.DeepEqual(gaction[1], testutil.Action{Name: "SaveNoCopy"}) {
  742. t.Errorf("action = %s, want SaveNoCopy", gaction[1])
  743. }
  744. gaction = p.Action()
  745. if len(gaction) != 1 {
  746. t.Fatalf("len(action) = %d, want 1", len(gaction))
  747. }
  748. if !reflect.DeepEqual(gaction[0], testutil.Action{Name: "SaveSnap"}) {
  749. t.Errorf("action = %s, want SaveSnap", gaction[0])
  750. }
  751. }
  752. // Applied > SnapCount should trigger a SaveSnap event
  753. func TestTriggerSnap(t *testing.T) {
  754. snapc := 10
  755. st := store.NewRecorder()
  756. p := &storageRecorder{}
  757. srv := &EtcdServer{
  758. cfg: &ServerConfig{TickMs: 1},
  759. snapCount: uint64(snapc),
  760. r: raftNode{
  761. Node: newNodeCommitter(),
  762. raftStorage: raft.NewMemoryStorage(),
  763. storage: p,
  764. transport: rafthttp.NewNopTransporter(),
  765. },
  766. store: st,
  767. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  768. }
  769. srv.start()
  770. for i := 0; i < snapc+1; i++ {
  771. srv.Do(context.Background(), pb.Request{Method: "PUT"})
  772. }
  773. wcnt := 2 + snapc
  774. gaction, _ := p.Wait(wcnt)
  775. srv.Stop()
  776. // each operation is recorded as a Save
  777. // (SnapCount+1) * Puts + SaveSnap = (SnapCount+1) * Save + SaveSnap
  778. if len(gaction) != wcnt {
  779. t.Fatalf("len(action) = %d, want %d", len(gaction), wcnt)
  780. }
  781. if !reflect.DeepEqual(gaction[wcnt-1], testutil.Action{Name: "SaveSnap"}) {
  782. t.Errorf("action = %s, want SaveSnap", gaction[wcnt-1])
  783. }
  784. }
  785. // TestConcurrentApplyAndSnapshotV3 will send out snapshots concurrently with
  786. // proposals.
  787. func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
  788. const (
  789. // snapshots that may queue up at once without dropping
  790. maxInFlightMsgSnap = 16
  791. )
  792. n := newNopReadyNode()
  793. cl := newCluster("abc")
  794. cl.SetStore(store.New())
  795. testdir, err := ioutil.TempDir(os.TempDir(), "testsnapdir")
  796. if err != nil {
  797. t.Fatalf("Couldn't open tempdir (%v)", err)
  798. }
  799. defer os.RemoveAll(testdir)
  800. if err := os.MkdirAll(testdir+"/member/snap", 0755); err != nil {
  801. t.Fatalf("Couldn't make snap dir (%v)", err)
  802. }
  803. rs := raft.NewMemoryStorage()
  804. tr, snapDoneC := rafthttp.NewSnapTransporter(testdir)
  805. s := &EtcdServer{
  806. cfg: &ServerConfig{
  807. V3demo: true,
  808. DataDir: testdir,
  809. },
  810. r: raftNode{
  811. Node: n,
  812. transport: tr,
  813. storage: &storageRecorder{dbPath: testdir},
  814. raftStorage: rs,
  815. },
  816. store: cl.store,
  817. cluster: cl,
  818. msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap),
  819. }
  820. s.kv = dstorage.New(
  821. path.Join(testdir, "testdb.db"),
  822. &s.consistIndex)
  823. s.start()
  824. defer s.Stop()
  825. // submit applied entries and snap entries
  826. idx := uint64(0)
  827. outdated := 0
  828. accepted := 0
  829. for k := 1; k <= 101; k++ {
  830. idx++
  831. ch := s.w.Register(uint64(idx))
  832. req := &pb.Request{Method: "QGET", ID: uint64(idx)}
  833. ent := raftpb.Entry{Index: uint64(idx), Data: pbutil.MustMarshal(req)}
  834. ready := raft.Ready{Entries: []raftpb.Entry{ent}}
  835. n.readyc <- ready
  836. ready = raft.Ready{CommittedEntries: []raftpb.Entry{ent}}
  837. n.readyc <- ready
  838. // "idx" applied
  839. <-ch
  840. // one snapshot for every two messages
  841. if k%2 != 0 {
  842. continue
  843. }
  844. n.readyc <- raft.Ready{Messages: []raftpb.Message{{Type: raftpb.MsgSnap}}}
  845. // get the snapshot sent by the transport
  846. snapMsg := <-snapDoneC
  847. // If the snapshot trails applied records, recovery will panic
  848. // since there's no allocated snapshot at the place of the
  849. // snapshot record. This only happens when the applier and the
  850. // snapshot sender get out of sync.
  851. if snapMsg.Snapshot.Metadata.Index == idx {
  852. idx++
  853. snapMsg.Snapshot.Metadata.Index = idx
  854. ready = raft.Ready{Snapshot: snapMsg.Snapshot}
  855. n.readyc <- ready
  856. accepted++
  857. } else {
  858. outdated++
  859. }
  860. // don't wait for the snapshot to complete, move to next message
  861. }
  862. if accepted != 50 {
  863. t.Errorf("accepted=%v, want 50", accepted)
  864. }
  865. if outdated != 0 {
  866. t.Errorf("outdated=%v, want 0", outdated)
  867. }
  868. }
  869. // TestRecvSnapshot tests when it receives a snapshot from raft leader,
  870. // it should trigger storage.SaveSnap and also store.Recover.
  871. func TestRecvSnapshot(t *testing.T) {
  872. n := newNopReadyNode()
  873. st := store.NewRecorder()
  874. p := &storageRecorder{}
  875. cl := newCluster("abc")
  876. cl.SetStore(store.New())
  877. s := &EtcdServer{
  878. cfg: &ServerConfig{},
  879. r: raftNode{
  880. Node: n,
  881. transport: rafthttp.NewNopTransporter(),
  882. storage: p,
  883. raftStorage: raft.NewMemoryStorage(),
  884. },
  885. store: st,
  886. cluster: cl,
  887. }
  888. s.start()
  889. n.readyc <- raft.Ready{Snapshot: raftpb.Snapshot{Metadata: raftpb.SnapshotMetadata{Index: 1}}}
  890. // wait for actions happened on the storage
  891. for len(p.Action()) == 0 {
  892. time.Sleep(10 * time.Millisecond)
  893. }
  894. s.Stop()
  895. wactions := []testutil.Action{{Name: "Recovery"}}
  896. if g := st.Action(); !reflect.DeepEqual(g, wactions) {
  897. t.Errorf("store action = %v, want %v", g, wactions)
  898. }
  899. wactions = []testutil.Action{{Name: "SaveSnap"}, {Name: "Save"}}
  900. if g := p.Action(); !reflect.DeepEqual(g, wactions) {
  901. t.Errorf("storage action = %v, want %v", g, wactions)
  902. }
  903. }
  904. // TestApplySnapshotAndCommittedEntries tests that server applies snapshot
  905. // first and then committed entries.
  906. func TestApplySnapshotAndCommittedEntries(t *testing.T) {
  907. n := newNopReadyNode()
  908. st := store.NewRecorder()
  909. cl := newCluster("abc")
  910. cl.SetStore(store.New())
  911. storage := raft.NewMemoryStorage()
  912. s := &EtcdServer{
  913. cfg: &ServerConfig{},
  914. r: raftNode{
  915. Node: n,
  916. storage: &storageRecorder{},
  917. raftStorage: storage,
  918. transport: rafthttp.NewNopTransporter(),
  919. },
  920. store: st,
  921. cluster: cl,
  922. }
  923. s.start()
  924. req := &pb.Request{Method: "QGET"}
  925. n.readyc <- raft.Ready{
  926. Snapshot: raftpb.Snapshot{Metadata: raftpb.SnapshotMetadata{Index: 1}},
  927. CommittedEntries: []raftpb.Entry{
  928. {Index: 2, Data: pbutil.MustMarshal(req)},
  929. },
  930. }
  931. // make goroutines move forward to receive snapshot
  932. actions, _ := st.Wait(2)
  933. s.Stop()
  934. if len(actions) != 2 {
  935. t.Fatalf("len(action) = %d, want 2", len(actions))
  936. }
  937. if actions[0].Name != "Recovery" {
  938. t.Errorf("actions[0] = %s, want %s", actions[0].Name, "Recovery")
  939. }
  940. if actions[1].Name != "Get" {
  941. t.Errorf("actions[1] = %s, want %s", actions[1].Name, "Get")
  942. }
  943. }
  944. // TestAddMember tests AddMember can propose and perform node addition.
  945. func TestAddMember(t *testing.T) {
  946. n := newNodeConfChangeCommitterRecorder()
  947. n.readyc <- raft.Ready{
  948. SoftState: &raft.SoftState{RaftState: raft.StateLeader},
  949. }
  950. cl := newTestCluster(nil)
  951. st := store.New()
  952. cl.SetStore(st)
  953. s := &EtcdServer{
  954. r: raftNode{
  955. Node: n,
  956. raftStorage: raft.NewMemoryStorage(),
  957. storage: &storageRecorder{},
  958. transport: rafthttp.NewNopTransporter(),
  959. },
  960. cfg: &ServerConfig{},
  961. store: st,
  962. cluster: cl,
  963. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  964. }
  965. s.start()
  966. m := Member{ID: 1234, RaftAttributes: RaftAttributes{PeerURLs: []string{"foo"}}}
  967. err := s.AddMember(context.TODO(), m)
  968. gaction := n.Action()
  969. s.Stop()
  970. if err != nil {
  971. t.Fatalf("AddMember error: %v", err)
  972. }
  973. wactions := []testutil.Action{{Name: "ProposeConfChange:ConfChangeAddNode"}, {Name: "ApplyConfChange:ConfChangeAddNode"}}
  974. if !reflect.DeepEqual(gaction, wactions) {
  975. t.Errorf("action = %v, want %v", gaction, wactions)
  976. }
  977. if cl.Member(1234) == nil {
  978. t.Errorf("member with id 1234 is not added")
  979. }
  980. }
  981. // TestRemoveMember tests RemoveMember can propose and perform node removal.
  982. func TestRemoveMember(t *testing.T) {
  983. n := newNodeConfChangeCommitterRecorder()
  984. n.readyc <- raft.Ready{
  985. SoftState: &raft.SoftState{RaftState: raft.StateLeader},
  986. }
  987. cl := newTestCluster(nil)
  988. st := store.New()
  989. cl.SetStore(store.New())
  990. cl.AddMember(&Member{ID: 1234})
  991. s := &EtcdServer{
  992. r: raftNode{
  993. Node: n,
  994. raftStorage: raft.NewMemoryStorage(),
  995. storage: &storageRecorder{},
  996. transport: rafthttp.NewNopTransporter(),
  997. },
  998. cfg: &ServerConfig{},
  999. store: st,
  1000. cluster: cl,
  1001. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  1002. }
  1003. s.start()
  1004. err := s.RemoveMember(context.TODO(), 1234)
  1005. gaction := n.Action()
  1006. s.Stop()
  1007. if err != nil {
  1008. t.Fatalf("RemoveMember error: %v", err)
  1009. }
  1010. wactions := []testutil.Action{{Name: "ProposeConfChange:ConfChangeRemoveNode"}, {Name: "ApplyConfChange:ConfChangeRemoveNode"}}
  1011. if !reflect.DeepEqual(gaction, wactions) {
  1012. t.Errorf("action = %v, want %v", gaction, wactions)
  1013. }
  1014. if cl.Member(1234) != nil {
  1015. t.Errorf("member with id 1234 is not removed")
  1016. }
  1017. }
  1018. // TestUpdateMember tests RemoveMember can propose and perform node update.
  1019. func TestUpdateMember(t *testing.T) {
  1020. n := newNodeConfChangeCommitterRecorder()
  1021. n.readyc <- raft.Ready{
  1022. SoftState: &raft.SoftState{RaftState: raft.StateLeader},
  1023. }
  1024. cl := newTestCluster(nil)
  1025. st := store.New()
  1026. cl.SetStore(st)
  1027. cl.AddMember(&Member{ID: 1234})
  1028. s := &EtcdServer{
  1029. r: raftNode{
  1030. Node: n,
  1031. raftStorage: raft.NewMemoryStorage(),
  1032. storage: &storageRecorder{},
  1033. transport: rafthttp.NewNopTransporter(),
  1034. },
  1035. store: st,
  1036. cluster: cl,
  1037. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  1038. }
  1039. s.start()
  1040. wm := Member{ID: 1234, RaftAttributes: RaftAttributes{PeerURLs: []string{"http://127.0.0.1:1"}}}
  1041. err := s.UpdateMember(context.TODO(), wm)
  1042. gaction := n.Action()
  1043. s.Stop()
  1044. if err != nil {
  1045. t.Fatalf("UpdateMember error: %v", err)
  1046. }
  1047. wactions := []testutil.Action{{Name: "ProposeConfChange:ConfChangeUpdateNode"}, {Name: "ApplyConfChange:ConfChangeUpdateNode"}}
  1048. if !reflect.DeepEqual(gaction, wactions) {
  1049. t.Errorf("action = %v, want %v", gaction, wactions)
  1050. }
  1051. if !reflect.DeepEqual(cl.Member(1234), &wm) {
  1052. t.Errorf("member = %v, want %v", cl.Member(1234), &wm)
  1053. }
  1054. }
  1055. // TODO: test server could stop itself when being removed
  1056. func TestPublish(t *testing.T) {
  1057. n := newNodeRecorder()
  1058. ch := make(chan interface{}, 1)
  1059. // simulate that request has gone through consensus
  1060. ch <- Response{}
  1061. w := wait.NewWithResponse(ch)
  1062. srv := &EtcdServer{
  1063. cfg: &ServerConfig{TickMs: 1},
  1064. id: 1,
  1065. r: raftNode{Node: n},
  1066. attributes: Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}},
  1067. cluster: &cluster{},
  1068. w: w,
  1069. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  1070. }
  1071. srv.publish(time.Hour)
  1072. action := n.Action()
  1073. if len(action) != 1 {
  1074. t.Fatalf("len(action) = %d, want 1", len(action))
  1075. }
  1076. if action[0].Name != "Propose" {
  1077. t.Fatalf("action = %s, want Propose", action[0].Name)
  1078. }
  1079. data := action[0].Params[0].([]byte)
  1080. var r pb.Request
  1081. if err := r.Unmarshal(data); err != nil {
  1082. t.Fatalf("unmarshal request error: %v", err)
  1083. }
  1084. if r.Method != "PUT" {
  1085. t.Errorf("method = %s, want PUT", r.Method)
  1086. }
  1087. wm := Member{ID: 1, Attributes: Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}}}
  1088. if wpath := path.Join(memberStoreKey(wm.ID), attributesSuffix); r.Path != wpath {
  1089. t.Errorf("path = %s, want %s", r.Path, wpath)
  1090. }
  1091. var gattr Attributes
  1092. if err := json.Unmarshal([]byte(r.Val), &gattr); err != nil {
  1093. t.Fatalf("unmarshal val error: %v", err)
  1094. }
  1095. if !reflect.DeepEqual(gattr, wm.Attributes) {
  1096. t.Errorf("member = %v, want %v", gattr, wm.Attributes)
  1097. }
  1098. }
  1099. // TestPublishStopped tests that publish will be stopped if server is stopped.
  1100. func TestPublishStopped(t *testing.T) {
  1101. srv := &EtcdServer{
  1102. cfg: &ServerConfig{TickMs: 1},
  1103. r: raftNode{
  1104. Node: newNodeNop(),
  1105. transport: rafthttp.NewNopTransporter(),
  1106. },
  1107. cluster: &cluster{},
  1108. w: wait.NewNop(),
  1109. done: make(chan struct{}),
  1110. stop: make(chan struct{}),
  1111. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  1112. }
  1113. close(srv.done)
  1114. srv.publish(time.Hour)
  1115. }
  1116. // TestPublishRetry tests that publish will keep retry until success.
  1117. func TestPublishRetry(t *testing.T) {
  1118. n := newNodeRecorder()
  1119. srv := &EtcdServer{
  1120. cfg: &ServerConfig{TickMs: 1},
  1121. r: raftNode{Node: n},
  1122. w: wait.NewNop(),
  1123. done: make(chan struct{}),
  1124. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  1125. }
  1126. // TODO: use fakeClockwork
  1127. time.AfterFunc(10*time.Millisecond, func() { close(srv.done) })
  1128. srv.publish(10 * time.Nanosecond)
  1129. action := n.Action()
  1130. // multiple Proposes
  1131. if cnt := len(action); cnt < 2 {
  1132. t.Errorf("len(action) = %d, want >= 2", cnt)
  1133. }
  1134. }
  1135. func TestUpdateVersion(t *testing.T) {
  1136. n := newNodeRecorder()
  1137. ch := make(chan interface{}, 1)
  1138. // simulate that request has gone through consensus
  1139. ch <- Response{}
  1140. w := wait.NewWithResponse(ch)
  1141. srv := &EtcdServer{
  1142. id: 1,
  1143. cfg: &ServerConfig{TickMs: 1},
  1144. r: raftNode{Node: n},
  1145. attributes: Attributes{Name: "node1", ClientURLs: []string{"http://node1.com"}},
  1146. cluster: &cluster{},
  1147. w: w,
  1148. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  1149. }
  1150. srv.updateClusterVersion("2.0.0")
  1151. action := n.Action()
  1152. if len(action) != 1 {
  1153. t.Fatalf("len(action) = %d, want 1", len(action))
  1154. }
  1155. if action[0].Name != "Propose" {
  1156. t.Fatalf("action = %s, want Propose", action[0].Name)
  1157. }
  1158. data := action[0].Params[0].([]byte)
  1159. var r pb.Request
  1160. if err := r.Unmarshal(data); err != nil {
  1161. t.Fatalf("unmarshal request error: %v", err)
  1162. }
  1163. if r.Method != "PUT" {
  1164. t.Errorf("method = %s, want PUT", r.Method)
  1165. }
  1166. if wpath := path.Join(StoreClusterPrefix, "version"); r.Path != wpath {
  1167. t.Errorf("path = %s, want %s", r.Path, wpath)
  1168. }
  1169. if r.Val != "2.0.0" {
  1170. t.Errorf("val = %s, want %s", r.Val, "2.0.0")
  1171. }
  1172. }
  1173. func TestStopNotify(t *testing.T) {
  1174. s := &EtcdServer{
  1175. stop: make(chan struct{}),
  1176. done: make(chan struct{}),
  1177. }
  1178. go func() {
  1179. <-s.stop
  1180. close(s.done)
  1181. }()
  1182. notifier := s.StopNotify()
  1183. select {
  1184. case <-notifier:
  1185. t.Fatalf("received unexpected stop notification")
  1186. default:
  1187. }
  1188. s.Stop()
  1189. select {
  1190. case <-notifier:
  1191. default:
  1192. t.Fatalf("cannot receive stop notification")
  1193. }
  1194. }
  1195. func TestGetOtherPeerURLs(t *testing.T) {
  1196. tests := []struct {
  1197. membs []*Member
  1198. self string
  1199. wurls []string
  1200. }{
  1201. {
  1202. []*Member{
  1203. newTestMember(1, []string{"http://10.0.0.1"}, "a", nil),
  1204. },
  1205. "a",
  1206. []string{},
  1207. },
  1208. {
  1209. []*Member{
  1210. newTestMember(1, []string{"http://10.0.0.1"}, "a", nil),
  1211. newTestMember(2, []string{"http://10.0.0.2"}, "b", nil),
  1212. newTestMember(3, []string{"http://10.0.0.3"}, "c", nil),
  1213. },
  1214. "a",
  1215. []string{"http://10.0.0.2", "http://10.0.0.3"},
  1216. },
  1217. {
  1218. []*Member{
  1219. newTestMember(1, []string{"http://10.0.0.1"}, "a", nil),
  1220. newTestMember(3, []string{"http://10.0.0.3"}, "c", nil),
  1221. newTestMember(2, []string{"http://10.0.0.2"}, "b", nil),
  1222. },
  1223. "a",
  1224. []string{"http://10.0.0.2", "http://10.0.0.3"},
  1225. },
  1226. }
  1227. for i, tt := range tests {
  1228. cl := newClusterFromMembers("", types.ID(0), tt.membs)
  1229. urls := getRemotePeerURLs(cl, tt.self)
  1230. if !reflect.DeepEqual(urls, tt.wurls) {
  1231. t.Errorf("#%d: urls = %+v, want %+v", i, urls, tt.wurls)
  1232. }
  1233. }
  1234. }
  1235. type nodeRecorder struct{ testutil.Recorder }
  1236. func newNodeRecorder() *nodeRecorder { return &nodeRecorder{&testutil.RecorderBuffered{}} }
  1237. func newNodeNop() raft.Node { return newNodeRecorder() }
  1238. func (n *nodeRecorder) Tick() { n.Record(testutil.Action{Name: "Tick"}) }
  1239. func (n *nodeRecorder) Campaign(ctx context.Context) error {
  1240. n.Record(testutil.Action{Name: "Campaign"})
  1241. return nil
  1242. }
  1243. func (n *nodeRecorder) Propose(ctx context.Context, data []byte) error {
  1244. n.Record(testutil.Action{Name: "Propose", Params: []interface{}{data}})
  1245. return nil
  1246. }
  1247. func (n *nodeRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error {
  1248. n.Record(testutil.Action{Name: "ProposeConfChange"})
  1249. return nil
  1250. }
  1251. func (n *nodeRecorder) Step(ctx context.Context, msg raftpb.Message) error {
  1252. n.Record(testutil.Action{Name: "Step"})
  1253. return nil
  1254. }
  1255. func (n *nodeRecorder) Status() raft.Status { return raft.Status{} }
  1256. func (n *nodeRecorder) Ready() <-chan raft.Ready { return nil }
  1257. func (n *nodeRecorder) Advance() {}
  1258. func (n *nodeRecorder) ApplyConfChange(conf raftpb.ConfChange) *raftpb.ConfState {
  1259. n.Record(testutil.Action{Name: "ApplyConfChange", Params: []interface{}{conf}})
  1260. return &raftpb.ConfState{}
  1261. }
  1262. func (n *nodeRecorder) Stop() {
  1263. n.Record(testutil.Action{Name: "Stop"})
  1264. }
  1265. func (n *nodeRecorder) ReportUnreachable(id uint64) {}
  1266. func (n *nodeRecorder) ReportSnapshot(id uint64, status raft.SnapshotStatus) {}
  1267. func (n *nodeRecorder) Compact(index uint64, nodes []uint64, d []byte) {
  1268. n.Record(testutil.Action{Name: "Compact"})
  1269. }
  1270. type nodeProposalBlockerRecorder struct {
  1271. nodeRecorder
  1272. }
  1273. func newProposalBlockerRecorder() *nodeProposalBlockerRecorder {
  1274. return &nodeProposalBlockerRecorder{*newNodeRecorder()}
  1275. }
  1276. func (n *nodeProposalBlockerRecorder) Propose(ctx context.Context, data []byte) error {
  1277. <-ctx.Done()
  1278. n.Record(testutil.Action{Name: "Propose blocked"})
  1279. return nil
  1280. }
  1281. // readyNode is a nodeRecorder with a user-writeable ready channel
  1282. type readyNode struct {
  1283. nodeRecorder
  1284. readyc chan raft.Ready
  1285. }
  1286. func newReadyNode() *readyNode {
  1287. return &readyNode{
  1288. nodeRecorder{testutil.NewRecorderStream()},
  1289. make(chan raft.Ready, 1)}
  1290. }
  1291. func newNopReadyNode() *readyNode {
  1292. return &readyNode{*newNodeRecorder(), make(chan raft.Ready, 1)}
  1293. }
  1294. func (n *readyNode) Ready() <-chan raft.Ready { return n.readyc }
  1295. type nodeConfChangeCommitterRecorder struct {
  1296. readyNode
  1297. index uint64
  1298. }
  1299. func newNodeConfChangeCommitterRecorder() *nodeConfChangeCommitterRecorder {
  1300. return &nodeConfChangeCommitterRecorder{*newNopReadyNode(), 0}
  1301. }
  1302. func (n *nodeConfChangeCommitterRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error {
  1303. data, err := conf.Marshal()
  1304. if err != nil {
  1305. return err
  1306. }
  1307. n.index++
  1308. n.Record(testutil.Action{Name: "ProposeConfChange:" + conf.Type.String()})
  1309. n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Index: n.index, Type: raftpb.EntryConfChange, Data: data}}}
  1310. return nil
  1311. }
  1312. func (n *nodeConfChangeCommitterRecorder) Ready() <-chan raft.Ready {
  1313. return n.readyc
  1314. }
  1315. func (n *nodeConfChangeCommitterRecorder) ApplyConfChange(conf raftpb.ConfChange) *raftpb.ConfState {
  1316. n.Record(testutil.Action{Name: "ApplyConfChange:" + conf.Type.String()})
  1317. return &raftpb.ConfState{}
  1318. }
  1319. // nodeCommitter commits proposed data immediately.
  1320. type nodeCommitter struct {
  1321. readyNode
  1322. index uint64
  1323. }
  1324. func newNodeCommitter() raft.Node {
  1325. return &nodeCommitter{*newNopReadyNode(), 0}
  1326. }
  1327. func (n *nodeCommitter) Propose(ctx context.Context, data []byte) error {
  1328. n.index++
  1329. ents := []raftpb.Entry{{Index: n.index, Data: data}}
  1330. n.readyc <- raft.Ready{
  1331. Entries: ents,
  1332. CommittedEntries: ents,
  1333. }
  1334. return nil
  1335. }