server_test.go 51 KB


  1. // Copyright 2015 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package etcdserver
  15. import (
  16. "context"
  17. "encoding/json"
  18. "fmt"
  19. "io/ioutil"
  20. "net/http"
  21. "os"
  22. "path"
  23. "path/filepath"
  24. "reflect"
  25. "sync"
  26. "testing"
  27. "time"
  28. "go.etcd.io/etcd/etcdserver/api/membership"
  29. "go.etcd.io/etcd/etcdserver/api/rafthttp"
  30. "go.etcd.io/etcd/etcdserver/api/snap"
  31. "go.etcd.io/etcd/etcdserver/api/v2store"
  32. pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
  33. "go.etcd.io/etcd/lease"
  34. "go.etcd.io/etcd/mvcc"
  35. "go.etcd.io/etcd/mvcc/backend"
  36. "go.etcd.io/etcd/pkg/fileutil"
  37. "go.etcd.io/etcd/pkg/idutil"
  38. "go.etcd.io/etcd/pkg/mock/mockstorage"
  39. "go.etcd.io/etcd/pkg/mock/mockstore"
  40. "go.etcd.io/etcd/pkg/mock/mockwait"
  41. "go.etcd.io/etcd/pkg/pbutil"
  42. "go.etcd.io/etcd/pkg/testutil"
  43. "go.etcd.io/etcd/pkg/types"
  44. "go.etcd.io/etcd/pkg/wait"
  45. "go.etcd.io/etcd/raft"
  46. "go.etcd.io/etcd/raft/raftpb"
  47. "go.uber.org/zap"
  48. )
  49. // TestDoLocalAction tests requests which do not need to go through raft to be applied,
  50. // and are served through local data.
  51. func TestDoLocalAction(t *testing.T) {
  52. tests := []struct {
  53. req pb.Request
  54. wresp Response
  55. werr error
  56. wactions []testutil.Action
  57. }{
  58. {
  59. pb.Request{Method: "GET", ID: 1, Wait: true},
  60. Response{Watcher: v2store.NewNopWatcher()}, nil, []testutil.Action{{Name: "Watch"}},
  61. },
  62. {
  63. pb.Request{Method: "GET", ID: 1},
  64. Response{Event: &v2store.Event{}}, nil,
  65. []testutil.Action{
  66. {
  67. Name: "Get",
  68. Params: []interface{}{"", false, false},
  69. },
  70. },
  71. },
  72. {
  73. pb.Request{Method: "HEAD", ID: 1},
  74. Response{Event: &v2store.Event{}}, nil,
  75. []testutil.Action{
  76. {
  77. Name: "Get",
  78. Params: []interface{}{"", false, false},
  79. },
  80. },
  81. },
  82. {
  83. pb.Request{Method: "BADMETHOD", ID: 1},
  84. Response{}, ErrUnknownMethod, []testutil.Action{},
  85. },
  86. }
  87. for i, tt := range tests {
  88. st := mockstore.NewRecorder()
  89. srv := &EtcdServer{
  90. lgMu: new(sync.RWMutex),
  91. lg: zap.NewExample(),
  92. v2store: st,
  93. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  94. }
  95. resp, err := srv.Do(context.TODO(), tt.req)
  96. if err != tt.werr {
  97. t.Fatalf("#%d: err = %+v, want %+v", i, err, tt.werr)
  98. }
  99. if !reflect.DeepEqual(resp, tt.wresp) {
  100. t.Errorf("#%d: resp = %+v, want %+v", i, resp, tt.wresp)
  101. }
  102. gaction := st.Action()
  103. if !reflect.DeepEqual(gaction, tt.wactions) {
  104. t.Errorf("#%d: action = %+v, want %+v", i, gaction, tt.wactions)
  105. }
  106. }
  107. }
  108. // TestDoBadLocalAction tests server requests which do not need to go through consensus,
  109. // and return errors when they fetch from local data.
  110. func TestDoBadLocalAction(t *testing.T) {
  111. storeErr := fmt.Errorf("bah")
  112. tests := []struct {
  113. req pb.Request
  114. wactions []testutil.Action
  115. }{
  116. {
  117. pb.Request{Method: "GET", ID: 1, Wait: true},
  118. []testutil.Action{{Name: "Watch"}},
  119. },
  120. {
  121. pb.Request{Method: "GET", ID: 1},
  122. []testutil.Action{
  123. {
  124. Name: "Get",
  125. Params: []interface{}{"", false, false},
  126. },
  127. },
  128. },
  129. {
  130. pb.Request{Method: "HEAD", ID: 1},
  131. []testutil.Action{
  132. {
  133. Name: "Get",
  134. Params: []interface{}{"", false, false},
  135. },
  136. },
  137. },
  138. }
  139. for i, tt := range tests {
  140. st := mockstore.NewErrRecorder(storeErr)
  141. srv := &EtcdServer{
  142. lgMu: new(sync.RWMutex),
  143. lg: zap.NewExample(),
  144. v2store: st,
  145. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  146. }
  147. resp, err := srv.Do(context.Background(), tt.req)
  148. if err != storeErr {
  149. t.Fatalf("#%d: err = %+v, want %+v", i, err, storeErr)
  150. }
  151. if !reflect.DeepEqual(resp, Response{}) {
  152. t.Errorf("#%d: resp = %+v, want %+v", i, resp, Response{})
  153. }
  154. gaction := st.Action()
  155. if !reflect.DeepEqual(gaction, tt.wactions) {
  156. t.Errorf("#%d: action = %+v, want %+v", i, gaction, tt.wactions)
  157. }
  158. }
  159. }
  160. // TestApplyRepeat tests that server handles repeat raft messages gracefully
  161. func TestApplyRepeat(t *testing.T) {
  162. n := newNodeConfChangeCommitterStream()
  163. n.readyc <- raft.Ready{
  164. SoftState: &raft.SoftState{RaftState: raft.StateLeader},
  165. }
  166. cl := newTestCluster(nil)
  167. st := v2store.New()
  168. cl.SetStore(v2store.New())
  169. cl.AddMember(&membership.Member{ID: 1234})
  170. r := newRaftNode(raftNodeConfig{
  171. lg: zap.NewExample(),
  172. Node: n,
  173. raftStorage: raft.NewMemoryStorage(),
  174. storage: mockstorage.NewStorageRecorder(""),
  175. transport: newNopTransporter(),
  176. })
  177. s := &EtcdServer{
  178. lgMu: new(sync.RWMutex),
  179. lg: zap.NewExample(),
  180. r: *r,
  181. v2store: st,
  182. cluster: cl,
  183. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  184. SyncTicker: &time.Ticker{},
  185. }
  186. s.applyV2 = &applierV2store{store: s.v2store, cluster: s.cluster}
  187. s.start()
  188. req := &pb.Request{Method: "QGET", ID: uint64(1)}
  189. ents := []raftpb.Entry{{Index: 1, Data: pbutil.MustMarshal(req)}}
  190. n.readyc <- raft.Ready{CommittedEntries: ents}
  191. // dup msg
  192. n.readyc <- raft.Ready{CommittedEntries: ents}
  193. // use a conf change to block until dup msgs are all processed
  194. cc := &raftpb.ConfChange{Type: raftpb.ConfChangeRemoveNode, NodeID: 2}
  195. ents = []raftpb.Entry{{
  196. Index: 2,
  197. Type: raftpb.EntryConfChange,
  198. Data: pbutil.MustMarshal(cc),
  199. }}
  200. n.readyc <- raft.Ready{CommittedEntries: ents}
  201. // wait for conf change message
  202. act, err := n.Wait(1)
  203. // wait for stop message (async to avoid deadlock)
  204. stopc := make(chan error)
  205. go func() {
  206. _, werr := n.Wait(1)
  207. stopc <- werr
  208. }()
  209. s.Stop()
  210. // only want to confirm etcdserver won't panic; no data to check
  211. if err != nil {
  212. t.Fatal(err)
  213. }
  214. if len(act) == 0 {
  215. t.Fatalf("expected len(act)=0, got %d", len(act))
  216. }
  217. if err = <-stopc; err != nil {
  218. t.Fatalf("error on stop (%v)", err)
  219. }
  220. }
  221. func TestApplyRequest(t *testing.T) {
  222. tests := []struct {
  223. req pb.Request
  224. wresp Response
  225. wactions []testutil.Action
  226. }{
  227. // POST ==> Create
  228. {
  229. pb.Request{Method: "POST", ID: 1},
  230. Response{Event: &v2store.Event{}},
  231. []testutil.Action{
  232. {
  233. Name: "Create",
  234. Params: []interface{}{"", false, "", true, v2store.TTLOptionSet{ExpireTime: time.Time{}}},
  235. },
  236. },
  237. },
  238. // POST ==> Create, with expiration
  239. {
  240. pb.Request{Method: "POST", ID: 1, Expiration: 1337},
  241. Response{Event: &v2store.Event{}},
  242. []testutil.Action{
  243. {
  244. Name: "Create",
  245. Params: []interface{}{"", false, "", true, v2store.TTLOptionSet{ExpireTime: time.Unix(0, 1337)}},
  246. },
  247. },
  248. },
  249. // POST ==> Create, with dir
  250. {
  251. pb.Request{Method: "POST", ID: 1, Dir: true},
  252. Response{Event: &v2store.Event{}},
  253. []testutil.Action{
  254. {
  255. Name: "Create",
  256. Params: []interface{}{"", true, "", true, v2store.TTLOptionSet{ExpireTime: time.Time{}}},
  257. },
  258. },
  259. },
  260. // PUT ==> Set
  261. {
  262. pb.Request{Method: "PUT", ID: 1},
  263. Response{Event: &v2store.Event{}},
  264. []testutil.Action{
  265. {
  266. Name: "Set",
  267. Params: []interface{}{"", false, "", v2store.TTLOptionSet{ExpireTime: time.Time{}}},
  268. },
  269. },
  270. },
  271. // PUT ==> Set, with dir
  272. {
  273. pb.Request{Method: "PUT", ID: 1, Dir: true},
  274. Response{Event: &v2store.Event{}},
  275. []testutil.Action{
  276. {
  277. Name: "Set",
  278. Params: []interface{}{"", true, "", v2store.TTLOptionSet{ExpireTime: time.Time{}}},
  279. },
  280. },
  281. },
  282. // PUT with PrevExist=true ==> Update
  283. {
  284. pb.Request{Method: "PUT", ID: 1, PrevExist: pbutil.Boolp(true)},
  285. Response{Event: &v2store.Event{}},
  286. []testutil.Action{
  287. {
  288. Name: "Update",
  289. Params: []interface{}{"", "", v2store.TTLOptionSet{ExpireTime: time.Time{}}},
  290. },
  291. },
  292. },
  293. // PUT with PrevExist=false ==> Create
  294. {
  295. pb.Request{Method: "PUT", ID: 1, PrevExist: pbutil.Boolp(false)},
  296. Response{Event: &v2store.Event{}},
  297. []testutil.Action{
  298. {
  299. Name: "Create",
  300. Params: []interface{}{"", false, "", false, v2store.TTLOptionSet{ExpireTime: time.Time{}}},
  301. },
  302. },
  303. },
  304. // PUT with PrevExist=true *and* PrevIndex set ==> CompareAndSwap
  305. {
  306. pb.Request{Method: "PUT", ID: 1, PrevExist: pbutil.Boolp(true), PrevIndex: 1},
  307. Response{Event: &v2store.Event{}},
  308. []testutil.Action{
  309. {
  310. Name: "CompareAndSwap",
  311. Params: []interface{}{"", "", uint64(1), "", v2store.TTLOptionSet{ExpireTime: time.Time{}}},
  312. },
  313. },
  314. },
  315. // PUT with PrevExist=false *and* PrevIndex set ==> Create
  316. {
  317. pb.Request{Method: "PUT", ID: 1, PrevExist: pbutil.Boolp(false), PrevIndex: 1},
  318. Response{Event: &v2store.Event{}},
  319. []testutil.Action{
  320. {
  321. Name: "Create",
  322. Params: []interface{}{"", false, "", false, v2store.TTLOptionSet{ExpireTime: time.Time{}}},
  323. },
  324. },
  325. },
  326. // PUT with PrevIndex set ==> CompareAndSwap
  327. {
  328. pb.Request{Method: "PUT", ID: 1, PrevIndex: 1},
  329. Response{Event: &v2store.Event{}},
  330. []testutil.Action{
  331. {
  332. Name: "CompareAndSwap",
  333. Params: []interface{}{"", "", uint64(1), "", v2store.TTLOptionSet{ExpireTime: time.Time{}}},
  334. },
  335. },
  336. },
  337. // PUT with PrevValue set ==> CompareAndSwap
  338. {
  339. pb.Request{Method: "PUT", ID: 1, PrevValue: "bar"},
  340. Response{Event: &v2store.Event{}},
  341. []testutil.Action{
  342. {
  343. Name: "CompareAndSwap",
  344. Params: []interface{}{"", "bar", uint64(0), "", v2store.TTLOptionSet{ExpireTime: time.Time{}}},
  345. },
  346. },
  347. },
  348. // PUT with PrevIndex and PrevValue set ==> CompareAndSwap
  349. {
  350. pb.Request{Method: "PUT", ID: 1, PrevIndex: 1, PrevValue: "bar"},
  351. Response{Event: &v2store.Event{}},
  352. []testutil.Action{
  353. {
  354. Name: "CompareAndSwap",
  355. Params: []interface{}{"", "bar", uint64(1), "", v2store.TTLOptionSet{ExpireTime: time.Time{}}},
  356. },
  357. },
  358. },
  359. // DELETE ==> Delete
  360. {
  361. pb.Request{Method: "DELETE", ID: 1},
  362. Response{Event: &v2store.Event{}},
  363. []testutil.Action{
  364. {
  365. Name: "Delete",
  366. Params: []interface{}{"", false, false},
  367. },
  368. },
  369. },
  370. // DELETE with PrevIndex set ==> CompareAndDelete
  371. {
  372. pb.Request{Method: "DELETE", ID: 1, PrevIndex: 1},
  373. Response{Event: &v2store.Event{}},
  374. []testutil.Action{
  375. {
  376. Name: "CompareAndDelete",
  377. Params: []interface{}{"", "", uint64(1)},
  378. },
  379. },
  380. },
  381. // DELETE with PrevValue set ==> CompareAndDelete
  382. {
  383. pb.Request{Method: "DELETE", ID: 1, PrevValue: "bar"},
  384. Response{Event: &v2store.Event{}},
  385. []testutil.Action{
  386. {
  387. Name: "CompareAndDelete",
  388. Params: []interface{}{"", "bar", uint64(0)},
  389. },
  390. },
  391. },
  392. // DELETE with PrevIndex *and* PrevValue set ==> CompareAndDelete
  393. {
  394. pb.Request{Method: "DELETE", ID: 1, PrevIndex: 5, PrevValue: "bar"},
  395. Response{Event: &v2store.Event{}},
  396. []testutil.Action{
  397. {
  398. Name: "CompareAndDelete",
  399. Params: []interface{}{"", "bar", uint64(5)},
  400. },
  401. },
  402. },
  403. // QGET ==> Get
  404. {
  405. pb.Request{Method: "QGET", ID: 1},
  406. Response{Event: &v2store.Event{}},
  407. []testutil.Action{
  408. {
  409. Name: "Get",
  410. Params: []interface{}{"", false, false},
  411. },
  412. },
  413. },
  414. // SYNC ==> DeleteExpiredKeys
  415. {
  416. pb.Request{Method: "SYNC", ID: 1},
  417. Response{},
  418. []testutil.Action{
  419. {
  420. Name: "DeleteExpiredKeys",
  421. Params: []interface{}{time.Unix(0, 0)},
  422. },
  423. },
  424. },
  425. {
  426. pb.Request{Method: "SYNC", ID: 1, Time: 12345},
  427. Response{},
  428. []testutil.Action{
  429. {
  430. Name: "DeleteExpiredKeys",
  431. Params: []interface{}{time.Unix(0, 12345)},
  432. },
  433. },
  434. },
  435. // Unknown method - error
  436. {
  437. pb.Request{Method: "BADMETHOD", ID: 1},
  438. Response{Err: ErrUnknownMethod},
  439. []testutil.Action{},
  440. },
  441. }
  442. for i, tt := range tests {
  443. st := mockstore.NewRecorder()
  444. srv := &EtcdServer{
  445. lgMu: new(sync.RWMutex),
  446. lg: zap.NewExample(),
  447. v2store: st,
  448. }
  449. srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}
  450. resp := srv.applyV2Request((*RequestV2)(&tt.req))
  451. if !reflect.DeepEqual(resp, tt.wresp) {
  452. t.Errorf("#%d: resp = %+v, want %+v", i, resp, tt.wresp)
  453. }
  454. gaction := st.Action()
  455. if !reflect.DeepEqual(gaction, tt.wactions) {
  456. t.Errorf("#%d: action = %#v, want %#v", i, gaction, tt.wactions)
  457. }
  458. }
  459. }
  460. func TestApplyRequestOnAdminMemberAttributes(t *testing.T) {
  461. cl := newTestCluster([]*membership.Member{{ID: 1}})
  462. srv := &EtcdServer{
  463. lgMu: new(sync.RWMutex),
  464. lg: zap.NewExample(),
  465. v2store: mockstore.NewRecorder(),
  466. cluster: cl,
  467. }
  468. srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}
  469. req := pb.Request{
  470. Method: "PUT",
  471. ID: 1,
  472. Path: membership.MemberAttributesStorePath(1),
  473. Val: `{"Name":"abc","ClientURLs":["http://127.0.0.1:2379"]}`,
  474. }
  475. srv.applyV2Request((*RequestV2)(&req))
  476. w := membership.Attributes{Name: "abc", ClientURLs: []string{"http://127.0.0.1:2379"}}
  477. if g := cl.Member(1).Attributes; !reflect.DeepEqual(g, w) {
  478. t.Errorf("attributes = %v, want %v", g, w)
  479. }
  480. }
  481. func TestApplyConfChangeError(t *testing.T) {
  482. cl := membership.NewCluster(zap.NewExample(), "")
  483. cl.SetStore(v2store.New())
  484. for i := 1; i <= 4; i++ {
  485. cl.AddMember(&membership.Member{ID: types.ID(i)})
  486. }
  487. cl.RemoveMember(4)
  488. attr := membership.RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 1)}}
  489. ctx, err := json.Marshal(&membership.Member{ID: types.ID(1), RaftAttributes: attr})
  490. if err != nil {
  491. t.Fatal(err)
  492. }
  493. attr = membership.RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 4)}}
  494. ctx4, err := json.Marshal(&membership.Member{ID: types.ID(1), RaftAttributes: attr})
  495. if err != nil {
  496. t.Fatal(err)
  497. }
  498. attr = membership.RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 5)}}
  499. ctx5, err := json.Marshal(&membership.Member{ID: types.ID(1), RaftAttributes: attr})
  500. if err != nil {
  501. t.Fatal(err)
  502. }
  503. tests := []struct {
  504. cc raftpb.ConfChange
  505. werr error
  506. }{
  507. {
  508. raftpb.ConfChange{
  509. Type: raftpb.ConfChangeAddNode,
  510. NodeID: 4,
  511. Context: ctx4,
  512. },
  513. membership.ErrIDRemoved,
  514. },
  515. {
  516. raftpb.ConfChange{
  517. Type: raftpb.ConfChangeUpdateNode,
  518. NodeID: 4,
  519. Context: ctx4,
  520. },
  521. membership.ErrIDRemoved,
  522. },
  523. {
  524. raftpb.ConfChange{
  525. Type: raftpb.ConfChangeAddNode,
  526. NodeID: 1,
  527. Context: ctx,
  528. },
  529. membership.ErrIDExists,
  530. },
  531. {
  532. raftpb.ConfChange{
  533. Type: raftpb.ConfChangeRemoveNode,
  534. NodeID: 5,
  535. Context: ctx5,
  536. },
  537. membership.ErrIDNotFound,
  538. },
  539. }
  540. for i, tt := range tests {
  541. n := newNodeRecorder()
  542. srv := &EtcdServer{
  543. lgMu: new(sync.RWMutex),
  544. lg: zap.NewExample(),
  545. r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: n}),
  546. cluster: cl,
  547. }
  548. _, err := srv.applyConfChange(tt.cc, nil)
  549. if err != tt.werr {
  550. t.Errorf("#%d: applyConfChange error = %v, want %v", i, err, tt.werr)
  551. }
  552. cc := raftpb.ConfChange{Type: tt.cc.Type, NodeID: raft.None, Context: tt.cc.Context}
  553. w := []testutil.Action{
  554. {
  555. Name: "ApplyConfChange",
  556. Params: []interface{}{cc},
  557. },
  558. }
  559. if g, _ := n.Wait(1); !reflect.DeepEqual(g, w) {
  560. t.Errorf("#%d: action = %+v, want %+v", i, g, w)
  561. }
  562. }
  563. }
  564. func TestApplyConfChangeShouldStop(t *testing.T) {
  565. cl := membership.NewCluster(zap.NewExample(), "")
  566. cl.SetStore(v2store.New())
  567. for i := 1; i <= 3; i++ {
  568. cl.AddMember(&membership.Member{ID: types.ID(i)})
  569. }
  570. r := newRaftNode(raftNodeConfig{
  571. lg: zap.NewExample(),
  572. Node: newNodeNop(),
  573. transport: newNopTransporter(),
  574. })
  575. srv := &EtcdServer{
  576. lgMu: new(sync.RWMutex),
  577. lg: zap.NewExample(),
  578. id: 1,
  579. r: *r,
  580. cluster: cl,
  581. }
  582. cc := raftpb.ConfChange{
  583. Type: raftpb.ConfChangeRemoveNode,
  584. NodeID: 2,
  585. }
  586. // remove non-local member
  587. shouldStop, err := srv.applyConfChange(cc, &raftpb.ConfState{})
  588. if err != nil {
  589. t.Fatalf("unexpected error %v", err)
  590. }
  591. if shouldStop {
  592. t.Errorf("shouldStop = %t, want %t", shouldStop, false)
  593. }
  594. // remove local member
  595. cc.NodeID = 1
  596. shouldStop, err = srv.applyConfChange(cc, &raftpb.ConfState{})
  597. if err != nil {
  598. t.Fatalf("unexpected error %v", err)
  599. }
  600. if !shouldStop {
  601. t.Errorf("shouldStop = %t, want %t", shouldStop, true)
  602. }
  603. }
  604. // TestApplyConfigChangeUpdatesConsistIndex ensures a config change also updates the consistIndex
  605. // where consistIndex equals to applied index.
  606. func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {
  607. cl := membership.NewCluster(zap.NewExample(), "")
  608. cl.SetStore(v2store.New())
  609. cl.AddMember(&membership.Member{ID: types.ID(1)})
  610. r := newRaftNode(raftNodeConfig{
  611. lg: zap.NewExample(),
  612. Node: newNodeNop(),
  613. transport: newNopTransporter(),
  614. })
  615. srv := &EtcdServer{
  616. lgMu: new(sync.RWMutex),
  617. lg: zap.NewExample(),
  618. id: 1,
  619. r: *r,
  620. cluster: cl,
  621. w: wait.New(),
  622. }
  623. // create EntryConfChange entry
  624. now := time.Now()
  625. urls, err := types.NewURLs([]string{"http://whatever:123"})
  626. if err != nil {
  627. t.Fatal(err)
  628. }
  629. m := membership.NewMember("", urls, "", &now)
  630. m.ID = types.ID(2)
  631. b, err := json.Marshal(m)
  632. if err != nil {
  633. t.Fatal(err)
  634. }
  635. cc := &raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 2, Context: b}
  636. ents := []raftpb.Entry{{
  637. Index: 2,
  638. Type: raftpb.EntryConfChange,
  639. Data: pbutil.MustMarshal(cc),
  640. }}
  641. _, appliedi, _ := srv.apply(ents, &raftpb.ConfState{})
  642. consistIndex := srv.consistIndex.ConsistentIndex()
  643. if consistIndex != appliedi {
  644. t.Fatalf("consistIndex = %v, want %v", consistIndex, appliedi)
  645. }
  646. }
  647. // TestApplyMultiConfChangeShouldStop ensures that apply will return shouldStop
  648. // if the local member is removed along with other conf updates.
  649. func TestApplyMultiConfChangeShouldStop(t *testing.T) {
  650. cl := membership.NewCluster(zap.NewExample(), "")
  651. cl.SetStore(v2store.New())
  652. for i := 1; i <= 5; i++ {
  653. cl.AddMember(&membership.Member{ID: types.ID(i)})
  654. }
  655. r := newRaftNode(raftNodeConfig{
  656. lg: zap.NewExample(),
  657. Node: newNodeNop(),
  658. transport: newNopTransporter(),
  659. })
  660. srv := &EtcdServer{
  661. lgMu: new(sync.RWMutex),
  662. lg: zap.NewExample(),
  663. id: 2,
  664. r: *r,
  665. cluster: cl,
  666. w: wait.New(),
  667. }
  668. ents := []raftpb.Entry{}
  669. for i := 1; i <= 4; i++ {
  670. ent := raftpb.Entry{
  671. Term: 1,
  672. Index: uint64(i),
  673. Type: raftpb.EntryConfChange,
  674. Data: pbutil.MustMarshal(
  675. &raftpb.ConfChange{
  676. Type: raftpb.ConfChangeRemoveNode,
  677. NodeID: uint64(i)}),
  678. }
  679. ents = append(ents, ent)
  680. }
  681. _, _, shouldStop := srv.apply(ents, &raftpb.ConfState{})
  682. if !shouldStop {
  683. t.Errorf("shouldStop = %t, want %t", shouldStop, true)
  684. }
  685. }
  686. func TestDoProposal(t *testing.T) {
  687. tests := []pb.Request{
  688. {Method: "POST", ID: 1},
  689. {Method: "PUT", ID: 1},
  690. {Method: "DELETE", ID: 1},
  691. {Method: "GET", ID: 1, Quorum: true},
  692. }
  693. for i, tt := range tests {
  694. st := mockstore.NewRecorder()
  695. r := newRaftNode(raftNodeConfig{
  696. lg: zap.NewExample(),
  697. Node: newNodeCommitter(),
  698. storage: mockstorage.NewStorageRecorder(""),
  699. raftStorage: raft.NewMemoryStorage(),
  700. transport: newNopTransporter(),
  701. })
  702. srv := &EtcdServer{
  703. lgMu: new(sync.RWMutex),
  704. lg: zap.NewExample(),
  705. Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
  706. r: *r,
  707. v2store: st,
  708. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  709. SyncTicker: &time.Ticker{},
  710. }
  711. srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}
  712. srv.start()
  713. resp, err := srv.Do(context.Background(), tt)
  714. srv.Stop()
  715. action := st.Action()
  716. if len(action) != 1 {
  717. t.Errorf("#%d: len(action) = %d, want 1", i, len(action))
  718. }
  719. if err != nil {
  720. t.Fatalf("#%d: err = %v, want nil", i, err)
  721. }
  722. // resp.Index is set in Do() based on the raft state; may either be 0 or 1
  723. wresp := Response{Event: &v2store.Event{}, Index: resp.Index}
  724. if !reflect.DeepEqual(resp, wresp) {
  725. t.Errorf("#%d: resp = %v, want %v", i, resp, wresp)
  726. }
  727. }
  728. }
  729. func TestDoProposalCancelled(t *testing.T) {
  730. wt := mockwait.NewRecorder()
  731. srv := &EtcdServer{
  732. lgMu: new(sync.RWMutex),
  733. lg: zap.NewExample(),
  734. Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
  735. r: *newRaftNode(raftNodeConfig{Node: newNodeNop()}),
  736. w: wt,
  737. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  738. }
  739. srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}
  740. ctx, cancel := context.WithCancel(context.Background())
  741. cancel()
  742. _, err := srv.Do(ctx, pb.Request{Method: "PUT"})
  743. if err != ErrCanceled {
  744. t.Fatalf("err = %v, want %v", err, ErrCanceled)
  745. }
  746. w := []testutil.Action{{Name: "Register"}, {Name: "Trigger"}}
  747. if !reflect.DeepEqual(wt.Action(), w) {
  748. t.Errorf("wt.action = %+v, want %+v", wt.Action(), w)
  749. }
  750. }
  751. func TestDoProposalTimeout(t *testing.T) {
  752. srv := &EtcdServer{
  753. lgMu: new(sync.RWMutex),
  754. lg: zap.NewExample(),
  755. Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
  756. r: *newRaftNode(raftNodeConfig{Node: newNodeNop()}),
  757. w: mockwait.NewNop(),
  758. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  759. }
  760. srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}
  761. ctx, cancel := context.WithTimeout(context.Background(), 0)
  762. _, err := srv.Do(ctx, pb.Request{Method: "PUT"})
  763. cancel()
  764. if err != ErrTimeout {
  765. t.Fatalf("err = %v, want %v", err, ErrTimeout)
  766. }
  767. }
  768. func TestDoProposalStopped(t *testing.T) {
  769. srv := &EtcdServer{
  770. lgMu: new(sync.RWMutex),
  771. lg: zap.NewExample(),
  772. Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
  773. r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: newNodeNop()}),
  774. w: mockwait.NewNop(),
  775. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  776. }
  777. srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}
  778. srv.stopping = make(chan struct{})
  779. close(srv.stopping)
  780. _, err := srv.Do(context.Background(), pb.Request{Method: "PUT", ID: 1})
  781. if err != ErrStopped {
  782. t.Errorf("err = %v, want %v", err, ErrStopped)
  783. }
  784. }
  785. // TestSync tests sync 1. is nonblocking 2. proposes SYNC request.
  786. func TestSync(t *testing.T) {
  787. n := newNodeRecorder()
  788. ctx, cancel := context.WithCancel(context.TODO())
  789. srv := &EtcdServer{
  790. lgMu: new(sync.RWMutex),
  791. lg: zap.NewExample(),
  792. r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: n}),
  793. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  794. ctx: ctx,
  795. cancel: cancel,
  796. }
  797. srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}
  798. // check that sync is non-blocking
  799. done := make(chan struct{})
  800. go func() {
  801. srv.sync(10 * time.Second)
  802. done <- struct{}{}
  803. }()
  804. select {
  805. case <-done:
  806. case <-time.After(time.Second):
  807. t.Fatal("sync should be non-blocking but did not return after 1s!")
  808. }
  809. action, _ := n.Wait(1)
  810. if len(action) != 1 {
  811. t.Fatalf("len(action) = %d, want 1", len(action))
  812. }
  813. if action[0].Name != "Propose" {
  814. t.Fatalf("action = %s, want Propose", action[0].Name)
  815. }
  816. data := action[0].Params[0].([]byte)
  817. var r pb.Request
  818. if err := r.Unmarshal(data); err != nil {
  819. t.Fatalf("unmarshal request error: %v", err)
  820. }
  821. if r.Method != "SYNC" {
  822. t.Errorf("method = %s, want SYNC", r.Method)
  823. }
  824. }
  825. // TestSyncTimeout tests the case that sync 1. is non-blocking 2. cancel request
  826. // after timeout
  827. func TestSyncTimeout(t *testing.T) {
  828. n := newProposalBlockerRecorder()
  829. ctx, cancel := context.WithCancel(context.TODO())
  830. srv := &EtcdServer{
  831. lgMu: new(sync.RWMutex),
  832. lg: zap.NewExample(),
  833. r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: n}),
  834. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  835. ctx: ctx,
  836. cancel: cancel,
  837. }
  838. srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}
  839. // check that sync is non-blocking
  840. done := make(chan struct{})
  841. go func() {
  842. srv.sync(0)
  843. done <- struct{}{}
  844. }()
  845. select {
  846. case <-done:
  847. case <-time.After(time.Second):
  848. t.Fatal("sync should be non-blocking but did not return after 1s!")
  849. }
  850. w := []testutil.Action{{Name: "Propose blocked"}}
  851. if g, _ := n.Wait(1); !reflect.DeepEqual(g, w) {
  852. t.Errorf("action = %v, want %v", g, w)
  853. }
  854. }
  855. // TODO: TestNoSyncWhenNoLeader
  856. // TestSyncTrigger tests that the server proposes a SYNC request when its sync timer ticks
  857. func TestSyncTrigger(t *testing.T) {
  858. n := newReadyNode()
  859. st := make(chan time.Time, 1)
  860. tk := &time.Ticker{C: st}
  861. r := newRaftNode(raftNodeConfig{
  862. lg: zap.NewExample(),
  863. Node: n,
  864. raftStorage: raft.NewMemoryStorage(),
  865. transport: newNopTransporter(),
  866. storage: mockstorage.NewStorageRecorder(""),
  867. })
  868. srv := &EtcdServer{
  869. lgMu: new(sync.RWMutex),
  870. lg: zap.NewExample(),
  871. Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
  872. r: *r,
  873. v2store: mockstore.NewNop(),
  874. SyncTicker: tk,
  875. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  876. }
  877. // trigger the server to become a leader and accept sync requests
  878. go func() {
  879. srv.start()
  880. n.readyc <- raft.Ready{
  881. SoftState: &raft.SoftState{
  882. RaftState: raft.StateLeader,
  883. },
  884. }
  885. // trigger a sync request
  886. st <- time.Time{}
  887. }()
  888. action, _ := n.Wait(1)
  889. go srv.Stop()
  890. if len(action) != 1 {
  891. t.Fatalf("len(action) = %d, want 1", len(action))
  892. }
  893. if action[0].Name != "Propose" {
  894. t.Fatalf("action = %s, want Propose", action[0].Name)
  895. }
  896. data := action[0].Params[0].([]byte)
  897. var req pb.Request
  898. if err := req.Unmarshal(data); err != nil {
  899. t.Fatalf("error unmarshalling data: %v", err)
  900. }
  901. if req.Method != "SYNC" {
  902. t.Fatalf("unexpected proposed request: %#v", req.Method)
  903. }
  904. // wait on stop message
  905. <-n.Chan()
  906. }
  907. // snapshot should snapshot the store and cut the persistent
  908. func TestSnapshot(t *testing.T) {
  909. be, tmpPath := backend.NewDefaultTmpBackend()
  910. defer func() {
  911. os.RemoveAll(tmpPath)
  912. }()
  913. s := raft.NewMemoryStorage()
  914. s.Append([]raftpb.Entry{{Index: 1}})
  915. st := mockstore.NewRecorderStream()
  916. p := mockstorage.NewStorageRecorderStream("")
  917. r := newRaftNode(raftNodeConfig{
  918. lg: zap.NewExample(),
  919. Node: newNodeNop(),
  920. raftStorage: s,
  921. storage: p,
  922. })
  923. srv := &EtcdServer{
  924. lgMu: new(sync.RWMutex),
  925. lg: zap.NewExample(),
  926. r: *r,
  927. v2store: st,
  928. }
  929. srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &srv.consistIndex, mvcc.StoreConfig{})
  930. srv.be = be
  931. ch := make(chan struct{}, 2)
  932. go func() {
  933. gaction, _ := p.Wait(1)
  934. defer func() { ch <- struct{}{} }()
  935. if len(gaction) != 1 {
  936. t.Errorf("len(action) = %d, want 1", len(gaction))
  937. }
  938. if !reflect.DeepEqual(gaction[0], testutil.Action{Name: "SaveSnap"}) {
  939. t.Errorf("action = %s, want SaveSnap", gaction[0])
  940. }
  941. }()
  942. go func() {
  943. gaction, _ := st.Wait(2)
  944. defer func() { ch <- struct{}{} }()
  945. if len(gaction) != 2 {
  946. t.Errorf("len(action) = %d, want 2", len(gaction))
  947. }
  948. if !reflect.DeepEqual(gaction[0], testutil.Action{Name: "Clone"}) {
  949. t.Errorf("action = %s, want Clone", gaction[0])
  950. }
  951. if !reflect.DeepEqual(gaction[1], testutil.Action{Name: "SaveNoCopy"}) {
  952. t.Errorf("action = %s, want SaveNoCopy", gaction[1])
  953. }
  954. }()
  955. srv.snapshot(1, raftpb.ConfState{Voters: []uint64{1}})
  956. <-ch
  957. <-ch
  958. }
  959. // TestSnapshotOrdering ensures raft persists snapshot onto disk before
  960. // snapshot db is applied.
  961. func TestSnapshotOrdering(t *testing.T) {
  962. n := newNopReadyNode()
  963. st := v2store.New()
  964. cl := membership.NewCluster(zap.NewExample(), "abc")
  965. cl.SetStore(st)
  966. testdir, err := ioutil.TempDir(os.TempDir(), "testsnapdir")
  967. if err != nil {
  968. t.Fatalf("couldn't open tempdir (%v)", err)
  969. }
  970. defer os.RemoveAll(testdir)
  971. snapdir := filepath.Join(testdir, "member", "snap")
  972. if err := os.MkdirAll(snapdir, 0755); err != nil {
  973. t.Fatalf("couldn't make snap dir (%v)", err)
  974. }
  975. rs := raft.NewMemoryStorage()
  976. p := mockstorage.NewStorageRecorderStream(testdir)
  977. tr, snapDoneC := newSnapTransporter(snapdir)
  978. r := newRaftNode(raftNodeConfig{
  979. lg: zap.NewExample(),
  980. isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
  981. Node: n,
  982. transport: tr,
  983. storage: p,
  984. raftStorage: rs,
  985. })
  986. s := &EtcdServer{
  987. lgMu: new(sync.RWMutex),
  988. lg: zap.NewExample(),
  989. Cfg: ServerConfig{Logger: zap.NewExample(), DataDir: testdir, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
  990. r: *r,
  991. v2store: st,
  992. snapshotter: snap.New(zap.NewExample(), snapdir),
  993. cluster: cl,
  994. SyncTicker: &time.Ticker{},
  995. }
  996. s.applyV2 = &applierV2store{store: s.v2store, cluster: s.cluster}
  997. be, tmpPath := backend.NewDefaultTmpBackend()
  998. defer os.RemoveAll(tmpPath)
  999. s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &s.consistIndex, mvcc.StoreConfig{})
  1000. s.be = be
  1001. s.start()
  1002. defer s.Stop()
  1003. n.readyc <- raft.Ready{Messages: []raftpb.Message{{Type: raftpb.MsgSnap}}}
  1004. go func() {
  1005. // get the snapshot sent by the transport
  1006. snapMsg := <-snapDoneC
  1007. // Snapshot first triggers raftnode to persists the snapshot onto disk
  1008. // before renaming db snapshot file to db
  1009. snapMsg.Snapshot.Metadata.Index = 1
  1010. n.readyc <- raft.Ready{Snapshot: snapMsg.Snapshot}
  1011. }()
  1012. if ac := <-p.Chan(); ac.Name != "Save" {
  1013. t.Fatalf("expected Save, got %+v", ac)
  1014. }
  1015. if ac := <-p.Chan(); ac.Name != "Save" {
  1016. t.Fatalf("expected Save, got %+v", ac)
  1017. }
  1018. // confirm snapshot file still present before calling SaveSnap
  1019. snapPath := filepath.Join(snapdir, fmt.Sprintf("%016x.snap.db", 1))
  1020. if !fileutil.Exist(snapPath) {
  1021. t.Fatalf("expected file %q, got missing", snapPath)
  1022. }
  1023. // unblock SaveSnapshot, etcdserver now permitted to move snapshot file
  1024. if ac := <-p.Chan(); ac.Name != "SaveSnap" {
  1025. t.Fatalf("expected SaveSnap, got %+v", ac)
  1026. }
  1027. }
  1028. // Applied > SnapshotCount should trigger a SaveSnap event
  1029. func TestTriggerSnap(t *testing.T) {
  1030. be, tmpPath := backend.NewDefaultTmpBackend()
  1031. defer func() {
  1032. os.RemoveAll(tmpPath)
  1033. }()
  1034. snapc := 10
  1035. st := mockstore.NewRecorder()
  1036. p := mockstorage.NewStorageRecorderStream("")
  1037. r := newRaftNode(raftNodeConfig{
  1038. lg: zap.NewExample(),
  1039. Node: newNodeCommitter(),
  1040. raftStorage: raft.NewMemoryStorage(),
  1041. storage: p,
  1042. transport: newNopTransporter(),
  1043. })
  1044. srv := &EtcdServer{
  1045. lgMu: new(sync.RWMutex),
  1046. lg: zap.NewExample(),
  1047. Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCount: uint64(snapc), SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
  1048. r: *r,
  1049. v2store: st,
  1050. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  1051. SyncTicker: &time.Ticker{},
  1052. }
  1053. srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}
  1054. srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &srv.consistIndex, mvcc.StoreConfig{})
  1055. srv.be = be
  1056. srv.start()
  1057. donec := make(chan struct{})
  1058. go func() {
  1059. wcnt := 2 + snapc
  1060. gaction, _ := p.Wait(wcnt)
  1061. // each operation is recorded as a Save
  1062. // (SnapshotCount+1) * Puts + SaveSnap = (SnapshotCount+1) * Save + SaveSnap
  1063. if len(gaction) != wcnt {
  1064. t.Errorf("len(action) = %d, want %d", len(gaction), wcnt)
  1065. }
  1066. if !reflect.DeepEqual(gaction[wcnt-1], testutil.Action{Name: "SaveSnap"}) {
  1067. t.Errorf("action = %s, want SaveSnap", gaction[wcnt-1])
  1068. }
  1069. close(donec)
  1070. }()
  1071. for i := 0; i < snapc+1; i++ {
  1072. srv.Do(context.Background(), pb.Request{Method: "PUT"})
  1073. }
  1074. <-donec
  1075. srv.Stop()
  1076. }
  1077. // TestConcurrentApplyAndSnapshotV3 will send out snapshots concurrently with
  1078. // proposals.
  1079. func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
  1080. n := newNopReadyNode()
  1081. st := v2store.New()
  1082. cl := membership.NewCluster(zap.NewExample(), "abc")
  1083. cl.SetStore(st)
  1084. testdir, err := ioutil.TempDir(os.TempDir(), "testsnapdir")
  1085. if err != nil {
  1086. t.Fatalf("Couldn't open tempdir (%v)", err)
  1087. }
  1088. defer os.RemoveAll(testdir)
  1089. if err := os.MkdirAll(testdir+"/member/snap", 0755); err != nil {
  1090. t.Fatalf("Couldn't make snap dir (%v)", err)
  1091. }
  1092. rs := raft.NewMemoryStorage()
  1093. tr, snapDoneC := newSnapTransporter(testdir)
  1094. r := newRaftNode(raftNodeConfig{
  1095. lg: zap.NewExample(),
  1096. isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
  1097. Node: n,
  1098. transport: tr,
  1099. storage: mockstorage.NewStorageRecorder(testdir),
  1100. raftStorage: rs,
  1101. })
  1102. s := &EtcdServer{
  1103. lgMu: new(sync.RWMutex),
  1104. lg: zap.NewExample(),
  1105. Cfg: ServerConfig{Logger: zap.NewExample(), DataDir: testdir, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
  1106. r: *r,
  1107. v2store: st,
  1108. snapshotter: snap.New(zap.NewExample(), testdir),
  1109. cluster: cl,
  1110. SyncTicker: &time.Ticker{},
  1111. }
  1112. s.applyV2 = &applierV2store{store: s.v2store, cluster: s.cluster}
  1113. be, tmpPath := backend.NewDefaultTmpBackend()
  1114. defer func() {
  1115. os.RemoveAll(tmpPath)
  1116. }()
  1117. s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &s.consistIndex, mvcc.StoreConfig{})
  1118. s.be = be
  1119. s.start()
  1120. defer s.Stop()
  1121. // submit applied entries and snap entries
  1122. idx := uint64(0)
  1123. outdated := 0
  1124. accepted := 0
  1125. for k := 1; k <= 101; k++ {
  1126. idx++
  1127. ch := s.w.Register(idx)
  1128. req := &pb.Request{Method: "QGET", ID: idx}
  1129. ent := raftpb.Entry{Index: idx, Data: pbutil.MustMarshal(req)}
  1130. ready := raft.Ready{Entries: []raftpb.Entry{ent}}
  1131. n.readyc <- ready
  1132. ready = raft.Ready{CommittedEntries: []raftpb.Entry{ent}}
  1133. n.readyc <- ready
  1134. // "idx" applied
  1135. <-ch
  1136. // one snapshot for every two messages
  1137. if k%2 != 0 {
  1138. continue
  1139. }
  1140. n.readyc <- raft.Ready{Messages: []raftpb.Message{{Type: raftpb.MsgSnap}}}
  1141. // get the snapshot sent by the transport
  1142. snapMsg := <-snapDoneC
  1143. // If the snapshot trails applied records, recovery will panic
  1144. // since there's no allocated snapshot at the place of the
  1145. // snapshot record. This only happens when the applier and the
  1146. // snapshot sender get out of sync.
  1147. if snapMsg.Snapshot.Metadata.Index == idx {
  1148. idx++
  1149. snapMsg.Snapshot.Metadata.Index = idx
  1150. ready = raft.Ready{Snapshot: snapMsg.Snapshot}
  1151. n.readyc <- ready
  1152. accepted++
  1153. } else {
  1154. outdated++
  1155. }
  1156. // don't wait for the snapshot to complete, move to next message
  1157. }
  1158. if accepted != 50 {
  1159. t.Errorf("accepted=%v, want 50", accepted)
  1160. }
  1161. if outdated != 0 {
  1162. t.Errorf("outdated=%v, want 0", outdated)
  1163. }
  1164. }
  1165. // TestAddMember tests AddMember can propose and perform node addition.
  1166. func TestAddMember(t *testing.T) {
  1167. n := newNodeConfChangeCommitterRecorder()
  1168. n.readyc <- raft.Ready{
  1169. SoftState: &raft.SoftState{RaftState: raft.StateLeader},
  1170. }
  1171. cl := newTestCluster(nil)
  1172. st := v2store.New()
  1173. cl.SetStore(st)
  1174. r := newRaftNode(raftNodeConfig{
  1175. lg: zap.NewExample(),
  1176. Node: n,
  1177. raftStorage: raft.NewMemoryStorage(),
  1178. storage: mockstorage.NewStorageRecorder(""),
  1179. transport: newNopTransporter(),
  1180. })
  1181. s := &EtcdServer{
  1182. lgMu: new(sync.RWMutex),
  1183. lg: zap.NewExample(),
  1184. r: *r,
  1185. v2store: st,
  1186. cluster: cl,
  1187. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  1188. SyncTicker: &time.Ticker{},
  1189. }
  1190. s.start()
  1191. m := membership.Member{ID: 1234, RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"foo"}}}
  1192. _, err := s.AddMember(context.TODO(), m)
  1193. gaction := n.Action()
  1194. s.Stop()
  1195. if err != nil {
  1196. t.Fatalf("AddMember error: %v", err)
  1197. }
  1198. wactions := []testutil.Action{{Name: "ProposeConfChange:ConfChangeAddNode"}, {Name: "ApplyConfChange:ConfChangeAddNode"}}
  1199. if !reflect.DeepEqual(gaction, wactions) {
  1200. t.Errorf("action = %v, want %v", gaction, wactions)
  1201. }
  1202. if cl.Member(1234) == nil {
  1203. t.Errorf("member with id 1234 is not added")
  1204. }
  1205. }
  1206. // TestRemoveMember tests RemoveMember can propose and perform node removal.
  1207. func TestRemoveMember(t *testing.T) {
  1208. n := newNodeConfChangeCommitterRecorder()
  1209. n.readyc <- raft.Ready{
  1210. SoftState: &raft.SoftState{RaftState: raft.StateLeader},
  1211. }
  1212. cl := newTestCluster(nil)
  1213. st := v2store.New()
  1214. cl.SetStore(v2store.New())
  1215. cl.AddMember(&membership.Member{ID: 1234})
  1216. r := newRaftNode(raftNodeConfig{
  1217. lg: zap.NewExample(),
  1218. Node: n,
  1219. raftStorage: raft.NewMemoryStorage(),
  1220. storage: mockstorage.NewStorageRecorder(""),
  1221. transport: newNopTransporter(),
  1222. })
  1223. s := &EtcdServer{
  1224. lgMu: new(sync.RWMutex),
  1225. lg: zap.NewExample(),
  1226. r: *r,
  1227. v2store: st,
  1228. cluster: cl,
  1229. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  1230. SyncTicker: &time.Ticker{},
  1231. }
  1232. s.start()
  1233. _, err := s.RemoveMember(context.TODO(), 1234)
  1234. gaction := n.Action()
  1235. s.Stop()
  1236. if err != nil {
  1237. t.Fatalf("RemoveMember error: %v", err)
  1238. }
  1239. wactions := []testutil.Action{{Name: "ProposeConfChange:ConfChangeRemoveNode"}, {Name: "ApplyConfChange:ConfChangeRemoveNode"}}
  1240. if !reflect.DeepEqual(gaction, wactions) {
  1241. t.Errorf("action = %v, want %v", gaction, wactions)
  1242. }
  1243. if cl.Member(1234) != nil {
  1244. t.Errorf("member with id 1234 is not removed")
  1245. }
  1246. }
  1247. // TestUpdateMember tests RemoveMember can propose and perform node update.
  1248. func TestUpdateMember(t *testing.T) {
  1249. n := newNodeConfChangeCommitterRecorder()
  1250. n.readyc <- raft.Ready{
  1251. SoftState: &raft.SoftState{RaftState: raft.StateLeader},
  1252. }
  1253. cl := newTestCluster(nil)
  1254. st := v2store.New()
  1255. cl.SetStore(st)
  1256. cl.AddMember(&membership.Member{ID: 1234})
  1257. r := newRaftNode(raftNodeConfig{
  1258. lg: zap.NewExample(),
  1259. Node: n,
  1260. raftStorage: raft.NewMemoryStorage(),
  1261. storage: mockstorage.NewStorageRecorder(""),
  1262. transport: newNopTransporter(),
  1263. })
  1264. s := &EtcdServer{
  1265. lgMu: new(sync.RWMutex),
  1266. lg: zap.NewExample(),
  1267. r: *r,
  1268. v2store: st,
  1269. cluster: cl,
  1270. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  1271. SyncTicker: &time.Ticker{},
  1272. }
  1273. s.start()
  1274. wm := membership.Member{ID: 1234, RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"http://127.0.0.1:1"}}}
  1275. _, err := s.UpdateMember(context.TODO(), wm)
  1276. gaction := n.Action()
  1277. s.Stop()
  1278. if err != nil {
  1279. t.Fatalf("UpdateMember error: %v", err)
  1280. }
  1281. wactions := []testutil.Action{{Name: "ProposeConfChange:ConfChangeUpdateNode"}, {Name: "ApplyConfChange:ConfChangeUpdateNode"}}
  1282. if !reflect.DeepEqual(gaction, wactions) {
  1283. t.Errorf("action = %v, want %v", gaction, wactions)
  1284. }
  1285. if !reflect.DeepEqual(cl.Member(1234), &wm) {
  1286. t.Errorf("member = %v, want %v", cl.Member(1234), &wm)
  1287. }
  1288. }
  1289. // TODO: test server could stop itself when being removed
  1290. func TestPublish(t *testing.T) {
  1291. n := newNodeRecorder()
  1292. ch := make(chan interface{}, 1)
  1293. // simulate that request has gone through consensus
  1294. ch <- Response{}
  1295. w := wait.NewWithResponse(ch)
  1296. ctx, cancel := context.WithCancel(context.TODO())
  1297. srv := &EtcdServer{
  1298. lgMu: new(sync.RWMutex),
  1299. lg: zap.NewExample(),
  1300. readych: make(chan struct{}),
  1301. Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
  1302. id: 1,
  1303. r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: n}),
  1304. attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}},
  1305. cluster: &membership.RaftCluster{},
  1306. w: w,
  1307. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  1308. SyncTicker: &time.Ticker{},
  1309. ctx: ctx,
  1310. cancel: cancel,
  1311. }
  1312. srv.publish(time.Hour)
  1313. action := n.Action()
  1314. if len(action) != 1 {
  1315. t.Fatalf("len(action) = %d, want 1", len(action))
  1316. }
  1317. if action[0].Name != "Propose" {
  1318. t.Fatalf("action = %s, want Propose", action[0].Name)
  1319. }
  1320. data := action[0].Params[0].([]byte)
  1321. var r pb.Request
  1322. if err := r.Unmarshal(data); err != nil {
  1323. t.Fatalf("unmarshal request error: %v", err)
  1324. }
  1325. if r.Method != "PUT" {
  1326. t.Errorf("method = %s, want PUT", r.Method)
  1327. }
  1328. wm := membership.Member{ID: 1, Attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}}}
  1329. if wpath := membership.MemberAttributesStorePath(wm.ID); r.Path != wpath {
  1330. t.Errorf("path = %s, want %s", r.Path, wpath)
  1331. }
  1332. var gattr membership.Attributes
  1333. if err := json.Unmarshal([]byte(r.Val), &gattr); err != nil {
  1334. t.Fatalf("unmarshal val error: %v", err)
  1335. }
  1336. if !reflect.DeepEqual(gattr, wm.Attributes) {
  1337. t.Errorf("member = %v, want %v", gattr, wm.Attributes)
  1338. }
  1339. }
  1340. // TestPublishStopped tests that publish will be stopped if server is stopped.
  1341. func TestPublishStopped(t *testing.T) {
  1342. ctx, cancel := context.WithCancel(context.TODO())
  1343. r := newRaftNode(raftNodeConfig{
  1344. lg: zap.NewExample(),
  1345. Node: newNodeNop(),
  1346. transport: newNopTransporter(),
  1347. })
  1348. srv := &EtcdServer{
  1349. lgMu: new(sync.RWMutex),
  1350. lg: zap.NewExample(),
  1351. Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
  1352. r: *r,
  1353. cluster: &membership.RaftCluster{},
  1354. w: mockwait.NewNop(),
  1355. done: make(chan struct{}),
  1356. stopping: make(chan struct{}),
  1357. stop: make(chan struct{}),
  1358. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  1359. SyncTicker: &time.Ticker{},
  1360. ctx: ctx,
  1361. cancel: cancel,
  1362. }
  1363. close(srv.stopping)
  1364. srv.publish(time.Hour)
  1365. }
  1366. // TestPublishRetry tests that publish will keep retry until success.
  1367. func TestPublishRetry(t *testing.T) {
  1368. ctx, cancel := context.WithCancel(context.TODO())
  1369. n := newNodeRecorderStream()
  1370. srv := &EtcdServer{
  1371. lgMu: new(sync.RWMutex),
  1372. lg: zap.NewExample(),
  1373. Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
  1374. r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: n}),
  1375. w: mockwait.NewNop(),
  1376. stopping: make(chan struct{}),
  1377. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  1378. SyncTicker: &time.Ticker{},
  1379. ctx: ctx,
  1380. cancel: cancel,
  1381. }
  1382. // expect multiple proposals from retrying
  1383. ch := make(chan struct{})
  1384. go func() {
  1385. defer close(ch)
  1386. if action, err := n.Wait(2); err != nil {
  1387. t.Errorf("len(action) = %d, want >= 2 (%v)", len(action), err)
  1388. }
  1389. close(srv.stopping)
  1390. // drain remaining actions, if any, so publish can terminate
  1391. for {
  1392. select {
  1393. case <-ch:
  1394. return
  1395. default:
  1396. n.Action()
  1397. }
  1398. }
  1399. }()
  1400. srv.publish(10 * time.Nanosecond)
  1401. ch <- struct{}{}
  1402. <-ch
  1403. }
  1404. func TestUpdateVersion(t *testing.T) {
  1405. n := newNodeRecorder()
  1406. ch := make(chan interface{}, 1)
  1407. // simulate that request has gone through consensus
  1408. ch <- Response{}
  1409. w := wait.NewWithResponse(ch)
  1410. ctx, cancel := context.WithCancel(context.TODO())
  1411. srv := &EtcdServer{
  1412. lgMu: new(sync.RWMutex),
  1413. lg: zap.NewExample(),
  1414. id: 1,
  1415. Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
  1416. r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: n}),
  1417. attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://node1.com"}},
  1418. cluster: &membership.RaftCluster{},
  1419. w: w,
  1420. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  1421. SyncTicker: &time.Ticker{},
  1422. ctx: ctx,
  1423. cancel: cancel,
  1424. }
  1425. srv.updateClusterVersion("2.0.0")
  1426. action := n.Action()
  1427. if len(action) != 1 {
  1428. t.Fatalf("len(action) = %d, want 1", len(action))
  1429. }
  1430. if action[0].Name != "Propose" {
  1431. t.Fatalf("action = %s, want Propose", action[0].Name)
  1432. }
  1433. data := action[0].Params[0].([]byte)
  1434. var r pb.Request
  1435. if err := r.Unmarshal(data); err != nil {
  1436. t.Fatalf("unmarshal request error: %v", err)
  1437. }
  1438. if r.Method != "PUT" {
  1439. t.Errorf("method = %s, want PUT", r.Method)
  1440. }
  1441. if wpath := path.Join(StoreClusterPrefix, "version"); r.Path != wpath {
  1442. t.Errorf("path = %s, want %s", r.Path, wpath)
  1443. }
  1444. if r.Val != "2.0.0" {
  1445. t.Errorf("val = %s, want %s", r.Val, "2.0.0")
  1446. }
  1447. }
  1448. func TestStopNotify(t *testing.T) {
  1449. s := &EtcdServer{
  1450. lgMu: new(sync.RWMutex),
  1451. lg: zap.NewExample(),
  1452. stop: make(chan struct{}),
  1453. done: make(chan struct{}),
  1454. }
  1455. go func() {
  1456. <-s.stop
  1457. close(s.done)
  1458. }()
  1459. notifier := s.StopNotify()
  1460. select {
  1461. case <-notifier:
  1462. t.Fatalf("received unexpected stop notification")
  1463. default:
  1464. }
  1465. s.Stop()
  1466. select {
  1467. case <-notifier:
  1468. default:
  1469. t.Fatalf("cannot receive stop notification")
  1470. }
  1471. }
  1472. func TestGetOtherPeerURLs(t *testing.T) {
  1473. tests := []struct {
  1474. membs []*membership.Member
  1475. wurls []string
  1476. }{
  1477. {
  1478. []*membership.Member{
  1479. membership.NewMember("1", types.MustNewURLs([]string{"http://10.0.0.1:1"}), "a", nil),
  1480. },
  1481. []string{},
  1482. },
  1483. {
  1484. []*membership.Member{
  1485. membership.NewMember("1", types.MustNewURLs([]string{"http://10.0.0.1:1"}), "a", nil),
  1486. membership.NewMember("2", types.MustNewURLs([]string{"http://10.0.0.2:2"}), "a", nil),
  1487. membership.NewMember("3", types.MustNewURLs([]string{"http://10.0.0.3:3"}), "a", nil),
  1488. },
  1489. []string{"http://10.0.0.2:2", "http://10.0.0.3:3"},
  1490. },
  1491. {
  1492. []*membership.Member{
  1493. membership.NewMember("1", types.MustNewURLs([]string{"http://10.0.0.1:1"}), "a", nil),
  1494. membership.NewMember("3", types.MustNewURLs([]string{"http://10.0.0.3:3"}), "a", nil),
  1495. membership.NewMember("2", types.MustNewURLs([]string{"http://10.0.0.2:2"}), "a", nil),
  1496. },
  1497. []string{"http://10.0.0.2:2", "http://10.0.0.3:3"},
  1498. },
  1499. }
  1500. for i, tt := range tests {
  1501. cl := membership.NewClusterFromMembers(zap.NewExample(), "", types.ID(0), tt.membs)
  1502. self := "1"
  1503. urls := getRemotePeerURLs(cl, self)
  1504. if !reflect.DeepEqual(urls, tt.wurls) {
  1505. t.Errorf("#%d: urls = %+v, want %+v", i, urls, tt.wurls)
  1506. }
  1507. }
  1508. }
  1509. type nodeRecorder struct{ testutil.Recorder }
  1510. func newNodeRecorder() *nodeRecorder { return &nodeRecorder{&testutil.RecorderBuffered{}} }
  1511. func newNodeRecorderStream() *nodeRecorder { return &nodeRecorder{testutil.NewRecorderStream()} }
  1512. func newNodeNop() raft.Node { return newNodeRecorder() }
  1513. func (n *nodeRecorder) Tick() { n.Record(testutil.Action{Name: "Tick"}) }
  1514. func (n *nodeRecorder) Campaign(ctx context.Context) error {
  1515. n.Record(testutil.Action{Name: "Campaign"})
  1516. return nil
  1517. }
  1518. func (n *nodeRecorder) Propose(ctx context.Context, data []byte) error {
  1519. n.Record(testutil.Action{Name: "Propose", Params: []interface{}{data}})
  1520. return nil
  1521. }
  1522. func (n *nodeRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChangeI) error {
  1523. n.Record(testutil.Action{Name: "ProposeConfChange"})
  1524. return nil
  1525. }
  1526. func (n *nodeRecorder) Step(ctx context.Context, msg raftpb.Message) error {
  1527. n.Record(testutil.Action{Name: "Step"})
  1528. return nil
  1529. }
  1530. func (n *nodeRecorder) Status() raft.Status { return raft.Status{} }
  1531. func (n *nodeRecorder) Ready() <-chan raft.Ready { return nil }
  1532. func (n *nodeRecorder) TransferLeadership(ctx context.Context, lead, transferee uint64) {}
  1533. func (n *nodeRecorder) ReadIndex(ctx context.Context, rctx []byte) error { return nil }
  1534. func (n *nodeRecorder) Advance() {}
  1535. func (n *nodeRecorder) ApplyConfChange(conf raftpb.ConfChangeI) *raftpb.ConfState {
  1536. n.Record(testutil.Action{Name: "ApplyConfChange", Params: []interface{}{conf}})
  1537. return &raftpb.ConfState{}
  1538. }
  1539. func (n *nodeRecorder) Stop() {
  1540. n.Record(testutil.Action{Name: "Stop"})
  1541. }
  1542. func (n *nodeRecorder) ReportUnreachable(id uint64) {}
  1543. func (n *nodeRecorder) ReportSnapshot(id uint64, status raft.SnapshotStatus) {}
  1544. func (n *nodeRecorder) Compact(index uint64, nodes []uint64, d []byte) {
  1545. n.Record(testutil.Action{Name: "Compact"})
  1546. }
  1547. type nodeProposalBlockerRecorder struct {
  1548. nodeRecorder
  1549. }
  1550. func newProposalBlockerRecorder() *nodeProposalBlockerRecorder {
  1551. return &nodeProposalBlockerRecorder{*newNodeRecorderStream()}
  1552. }
  1553. func (n *nodeProposalBlockerRecorder) Propose(ctx context.Context, data []byte) error {
  1554. <-ctx.Done()
  1555. n.Record(testutil.Action{Name: "Propose blocked"})
  1556. return nil
  1557. }
  1558. // readyNode is a nodeRecorder with a user-writeable ready channel
  1559. type readyNode struct {
  1560. nodeRecorder
  1561. readyc chan raft.Ready
  1562. }
  1563. func newReadyNode() *readyNode {
  1564. return &readyNode{
  1565. nodeRecorder{testutil.NewRecorderStream()},
  1566. make(chan raft.Ready, 1)}
  1567. }
  1568. func newNopReadyNode() *readyNode {
  1569. return &readyNode{*newNodeRecorder(), make(chan raft.Ready, 1)}
  1570. }
  1571. func (n *readyNode) Ready() <-chan raft.Ready { return n.readyc }
  1572. type nodeConfChangeCommitterRecorder struct {
  1573. readyNode
  1574. index uint64
  1575. }
  1576. func newNodeConfChangeCommitterRecorder() *nodeConfChangeCommitterRecorder {
  1577. return &nodeConfChangeCommitterRecorder{*newNopReadyNode(), 0}
  1578. }
  1579. func newNodeConfChangeCommitterStream() *nodeConfChangeCommitterRecorder {
  1580. return &nodeConfChangeCommitterRecorder{*newReadyNode(), 0}
  1581. }
  1582. func confChangeActionName(conf raftpb.ConfChangeI) string {
  1583. var s string
  1584. if confV1, ok := conf.AsV1(); ok {
  1585. s = confV1.Type.String()
  1586. } else {
  1587. for i, chg := range conf.AsV2().Changes {
  1588. if i > 0 {
  1589. s += "/"
  1590. }
  1591. s += chg.Type.String()
  1592. }
  1593. }
  1594. return s
  1595. }
  1596. func (n *nodeConfChangeCommitterRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChangeI) error {
  1597. typ, data, err := raftpb.MarshalConfChange(conf)
  1598. if err != nil {
  1599. return err
  1600. }
  1601. n.index++
  1602. n.Record(testutil.Action{Name: "ProposeConfChange:" + confChangeActionName(conf)})
  1603. n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Index: n.index, Type: typ, Data: data}}}
  1604. return nil
  1605. }
  1606. func (n *nodeConfChangeCommitterRecorder) Ready() <-chan raft.Ready {
  1607. return n.readyc
  1608. }
  1609. func (n *nodeConfChangeCommitterRecorder) ApplyConfChange(conf raftpb.ConfChangeI) *raftpb.ConfState {
  1610. n.Record(testutil.Action{Name: "ApplyConfChange:" + confChangeActionName(conf)})
  1611. return &raftpb.ConfState{}
  1612. }
  1613. // nodeCommitter commits proposed data immediately.
  1614. type nodeCommitter struct {
  1615. readyNode
  1616. index uint64
  1617. }
  1618. func newNodeCommitter() raft.Node {
  1619. return &nodeCommitter{*newNopReadyNode(), 0}
  1620. }
  1621. func (n *nodeCommitter) Propose(ctx context.Context, data []byte) error {
  1622. n.index++
  1623. ents := []raftpb.Entry{{Index: n.index, Data: data}}
  1624. n.readyc <- raft.Ready{
  1625. Entries: ents,
  1626. CommittedEntries: ents,
  1627. }
  1628. return nil
  1629. }
  1630. func newTestCluster(membs []*membership.Member) *membership.RaftCluster {
  1631. c := membership.NewCluster(zap.NewExample(), "")
  1632. for _, m := range membs {
  1633. c.AddMember(m)
  1634. }
  1635. return c
  1636. }
  1637. type nopTransporter struct{}
  1638. func newNopTransporter() rafthttp.Transporter {
  1639. return &nopTransporter{}
  1640. }
  1641. func (s *nopTransporter) Start() error { return nil }
  1642. func (s *nopTransporter) Handler() http.Handler { return nil }
  1643. func (s *nopTransporter) Send(m []raftpb.Message) {}
  1644. func (s *nopTransporter) SendSnapshot(m snap.Message) {}
  1645. func (s *nopTransporter) AddRemote(id types.ID, us []string) {}
  1646. func (s *nopTransporter) AddPeer(id types.ID, us []string) {}
  1647. func (s *nopTransporter) RemovePeer(id types.ID) {}
  1648. func (s *nopTransporter) RemoveAllPeers() {}
  1649. func (s *nopTransporter) UpdatePeer(id types.ID, us []string) {}
  1650. func (s *nopTransporter) ActiveSince(id types.ID) time.Time { return time.Time{} }
  1651. func (s *nopTransporter) ActivePeers() int { return 0 }
  1652. func (s *nopTransporter) Stop() {}
  1653. func (s *nopTransporter) Pause() {}
  1654. func (s *nopTransporter) Resume() {}
  1655. type snapTransporter struct {
  1656. nopTransporter
  1657. snapDoneC chan snap.Message
  1658. snapDir string
  1659. }
  1660. func newSnapTransporter(snapDir string) (rafthttp.Transporter, <-chan snap.Message) {
  1661. ch := make(chan snap.Message, 1)
  1662. tr := &snapTransporter{snapDoneC: ch, snapDir: snapDir}
  1663. return tr, ch
  1664. }
  1665. func (s *snapTransporter) SendSnapshot(m snap.Message) {
  1666. ss := snap.New(zap.NewExample(), s.snapDir)
  1667. ss.SaveDBFrom(m.ReadCloser, m.Snapshot.Metadata.Index+1)
  1668. m.CloseWithError(nil)
  1669. s.snapDoneC <- m
  1670. }
  1671. type sendMsgAppRespTransporter struct {
  1672. nopTransporter
  1673. sendC chan int
  1674. }
  1675. func newSendMsgAppRespTransporter() (rafthttp.Transporter, <-chan int) {
  1676. ch := make(chan int, 1)
  1677. tr := &sendMsgAppRespTransporter{sendC: ch}
  1678. return tr, ch
  1679. }
  1680. func (s *sendMsgAppRespTransporter) Send(m []raftpb.Message) {
  1681. var send int
  1682. for _, msg := range m {
  1683. if msg.To != 0 {
  1684. send++
  1685. }
  1686. }
  1687. s.sendC <- send
  1688. }