server_test.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929
  1. package etcdserver
  2. import (
  3. "fmt"
  4. "math/rand"
  5. "reflect"
  6. "sync"
  7. "testing"
  8. "time"
  9. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  10. "github.com/coreos/etcd/raft"
  11. "github.com/coreos/etcd/raft/raftpb"
  12. "github.com/coreos/etcd/store"
  13. "github.com/coreos/etcd/testutil"
  14. "github.com/coreos/etcd/third_party/code.google.com/p/go.net/context"
  15. )
  16. func TestGetExpirationTime(t *testing.T) {
  17. tests := []struct {
  18. r pb.Request
  19. want time.Time
  20. }{
  21. {
  22. pb.Request{Expiration: 0},
  23. time.Time{},
  24. },
  25. {
  26. pb.Request{Expiration: 60000},
  27. time.Unix(0, 60000),
  28. },
  29. {
  30. pb.Request{Expiration: -60000},
  31. time.Unix(0, -60000),
  32. },
  33. }
  34. for i, tt := range tests {
  35. got := getExpirationTime(&tt.r)
  36. if !reflect.DeepEqual(tt.want, got) {
  37. t.Errorf("#%d: incorrect expiration time: want=%v got=%v", i, tt.want, got)
  38. }
  39. }
  40. }
  41. // TestDoLocalAction tests requests which do not need to go through raft to be applied,
  42. // and are served through local data.
  43. func TestDoLocalAction(t *testing.T) {
  44. tests := []struct {
  45. req pb.Request
  46. wresp Response
  47. werr error
  48. waction []string
  49. }{
  50. {
  51. pb.Request{Method: "GET", Id: 1, Wait: true},
  52. Response{Watcher: &stubWatcher{}}, nil, []string{"Watch"},
  53. },
  54. {
  55. pb.Request{Method: "GET", Id: 1},
  56. Response{Event: &store.Event{}}, nil, []string{"Get"},
  57. },
  58. {
  59. pb.Request{Method: "BADMETHOD", Id: 1},
  60. Response{}, ErrUnknownMethod, []string{},
  61. },
  62. }
  63. for i, tt := range tests {
  64. st := &storeRecorder{}
  65. srv := &EtcdServer{Store: st}
  66. resp, err := srv.Do(context.TODO(), tt.req)
  67. if err != tt.werr {
  68. t.Fatalf("#%d: err = %+v, want %+v", i, err, tt.werr)
  69. }
  70. if !reflect.DeepEqual(resp, tt.wresp) {
  71. t.Errorf("#%d: resp = %+v, want %+v", i, resp, tt.wresp)
  72. }
  73. action := st.Action()
  74. if !reflect.DeepEqual(action, tt.waction) {
  75. t.Errorf("#%d: action = %+v, want %+v", i, action, tt.waction)
  76. }
  77. }
  78. }
  79. // TestDoBadLocalAction tests server requests which do not need to go through consensus,
  80. // and return errors when they fetch from local data.
  81. func TestDoBadLocalAction(t *testing.T) {
  82. storeErr := fmt.Errorf("bah")
  83. tests := []struct {
  84. req pb.Request
  85. waction []string
  86. }{
  87. {
  88. pb.Request{Method: "GET", Id: 1, Wait: true},
  89. []string{"Watch"},
  90. },
  91. {
  92. pb.Request{Method: "GET", Id: 1},
  93. []string{"Get"},
  94. },
  95. }
  96. for i, tt := range tests {
  97. st := &errStoreRecorder{err: storeErr}
  98. srv := &EtcdServer{Store: st}
  99. resp, err := srv.Do(context.Background(), tt.req)
  100. if err != storeErr {
  101. t.Fatalf("#%d: err = %+v, want %+v", i, err, storeErr)
  102. }
  103. if !reflect.DeepEqual(resp, Response{}) {
  104. t.Errorf("#%d: resp = %+v, want %+v", i, resp, Response{})
  105. }
  106. action := st.Action()
  107. if !reflect.DeepEqual(action, tt.waction) {
  108. t.Errorf("#%d: action = %+v, want %+v", i, action, tt.waction)
  109. }
  110. }
  111. }
  112. func TestApply(t *testing.T) {
  113. tests := []struct {
  114. req pb.Request
  115. wresp Response
  116. waction []string
  117. }{
  118. {
  119. pb.Request{Method: "POST", Id: 1},
  120. Response{Event: &store.Event{}}, []string{"Create"},
  121. },
  122. {
  123. pb.Request{Method: "PUT", Id: 1, PrevExist: boolp(true), PrevIndex: 1},
  124. Response{Event: &store.Event{}}, []string{"Update"},
  125. },
  126. {
  127. pb.Request{Method: "PUT", Id: 1, PrevExist: boolp(false), PrevIndex: 1},
  128. Response{Event: &store.Event{}}, []string{"Create"},
  129. },
  130. {
  131. pb.Request{Method: "PUT", Id: 1, PrevExist: boolp(true)},
  132. Response{Event: &store.Event{}}, []string{"Update"},
  133. },
  134. {
  135. pb.Request{Method: "PUT", Id: 1, PrevExist: boolp(false)},
  136. Response{Event: &store.Event{}}, []string{"Create"},
  137. },
  138. {
  139. pb.Request{Method: "PUT", Id: 1, PrevIndex: 1},
  140. Response{Event: &store.Event{}}, []string{"CompareAndSwap"},
  141. },
  142. {
  143. pb.Request{Method: "PUT", Id: 1, PrevValue: "bar"},
  144. Response{Event: &store.Event{}}, []string{"CompareAndSwap"},
  145. },
  146. {
  147. pb.Request{Method: "PUT", Id: 1},
  148. Response{Event: &store.Event{}}, []string{"Set"},
  149. },
  150. {
  151. pb.Request{Method: "DELETE", Id: 1, PrevIndex: 1},
  152. Response{Event: &store.Event{}}, []string{"CompareAndDelete"},
  153. },
  154. {
  155. pb.Request{Method: "DELETE", Id: 1, PrevValue: "bar"},
  156. Response{Event: &store.Event{}}, []string{"CompareAndDelete"},
  157. },
  158. {
  159. pb.Request{Method: "DELETE", Id: 1},
  160. Response{Event: &store.Event{}}, []string{"Delete"},
  161. },
  162. {
  163. pb.Request{Method: "QGET", Id: 1},
  164. Response{Event: &store.Event{}}, []string{"Get"},
  165. },
  166. {
  167. pb.Request{Method: "SYNC", Id: 1},
  168. Response{}, []string{"DeleteExpiredKeys"},
  169. },
  170. {
  171. pb.Request{Method: "BADMETHOD", Id: 1},
  172. Response{err: ErrUnknownMethod}, []string{},
  173. },
  174. }
  175. for i, tt := range tests {
  176. st := &storeRecorder{}
  177. srv := &EtcdServer{Store: st}
  178. resp := srv.apply(tt.req)
  179. if !reflect.DeepEqual(resp, tt.wresp) {
  180. t.Errorf("#%d: resp = %+v, want %+v", i, resp, tt.wresp)
  181. }
  182. action := st.Action()
  183. if !reflect.DeepEqual(action, tt.waction) {
  184. t.Errorf("#%d: action = %+v, want %+v", i, action, tt.waction)
  185. }
  186. }
  187. }
  188. func TestClusterOf1(t *testing.T) { testServer(t, 1) }
  189. func TestClusterOf3(t *testing.T) { testServer(t, 3) }
  190. func testServer(t *testing.T, ns int64) {
  191. ctx, cancel := context.WithCancel(context.Background())
  192. defer cancel()
  193. ss := make([]*EtcdServer, ns)
  194. send := func(msgs []raftpb.Message) {
  195. for _, m := range msgs {
  196. t.Logf("m = %+v\n", m)
  197. ss[m.To-1].Node.Step(ctx, m)
  198. }
  199. }
  200. peers := make([]int64, ns)
  201. for i := int64(0); i < ns; i++ {
  202. peers[i] = i + 1
  203. }
  204. for i := int64(0); i < ns; i++ {
  205. id := i + 1
  206. n := raft.StartNode(id, peers, 10, 1)
  207. tk := time.NewTicker(10 * time.Millisecond)
  208. defer tk.Stop()
  209. srv := &EtcdServer{
  210. Node: n,
  211. Store: store.New(),
  212. Send: send,
  213. Storage: &storageRecorder{},
  214. Ticker: tk.C,
  215. }
  216. srv.Start()
  217. // TODO(xiangli): randomize election timeout
  218. // then remove this sleep.
  219. time.Sleep(1 * time.Millisecond)
  220. ss[i] = srv
  221. }
  222. for i := 1; i <= 10; i++ {
  223. r := pb.Request{
  224. Method: "PUT",
  225. Id: int64(i),
  226. Path: "/foo",
  227. Val: "bar",
  228. }
  229. j := rand.Intn(len(ss))
  230. t.Logf("ss = %d", j)
  231. resp, err := ss[j].Do(ctx, r)
  232. if err != nil {
  233. t.Fatal(err)
  234. }
  235. g, w := resp.Event.Node, &store.NodeExtern{
  236. Key: "/foo",
  237. ModifiedIndex: uint64(i),
  238. CreatedIndex: uint64(i),
  239. Value: stringp("bar"),
  240. }
  241. if !reflect.DeepEqual(g, w) {
  242. t.Error("value:", *g.Value)
  243. t.Errorf("g = %+v, w %+v", g, w)
  244. }
  245. }
  246. time.Sleep(10 * time.Millisecond)
  247. var last interface{}
  248. for i, sv := range ss {
  249. sv.Stop()
  250. g, _ := sv.Store.Get("/", true, true)
  251. if last != nil && !reflect.DeepEqual(last, g) {
  252. t.Errorf("server %d: Root = %#v, want %#v", i, g, last)
  253. }
  254. last = g
  255. }
  256. }
  257. func TestDoProposal(t *testing.T) {
  258. tests := []pb.Request{
  259. pb.Request{Method: "POST", Id: 1},
  260. pb.Request{Method: "PUT", Id: 1},
  261. pb.Request{Method: "DELETE", Id: 1},
  262. pb.Request{Method: "GET", Id: 1, Quorum: true},
  263. }
  264. for i, tt := range tests {
  265. ctx, _ := context.WithCancel(context.Background())
  266. n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1)
  267. st := &storeRecorder{}
  268. tk := make(chan time.Time)
  269. // this makes <-tk always successful, which accelerates internal clock
  270. close(tk)
  271. srv := &EtcdServer{
  272. Node: n,
  273. Store: st,
  274. Send: func(_ []raftpb.Message) {},
  275. Storage: &storageRecorder{},
  276. Ticker: tk,
  277. }
  278. srv.Start()
  279. resp, err := srv.Do(ctx, tt)
  280. srv.Stop()
  281. action := st.Action()
  282. if len(action) != 1 {
  283. t.Errorf("#%d: len(action) = %d, want 1", i, len(action))
  284. }
  285. if err != nil {
  286. t.Fatalf("#%d: err = %v, want nil", i, err)
  287. }
  288. wresp := Response{Event: &store.Event{}}
  289. if !reflect.DeepEqual(resp, wresp) {
  290. t.Errorf("#%d: resp = %v, want %v", i, resp, wresp)
  291. }
  292. }
  293. }
  294. func TestDoProposalCancelled(t *testing.T) {
  295. ctx, cancel := context.WithCancel(context.Background())
  296. // node cannot make any progress because there are two nodes
  297. n := raft.StartNode(0xBAD0, []int64{0xBAD0, 0xBAD1}, 10, 1)
  298. st := &storeRecorder{}
  299. wait := &waitRecorder{}
  300. srv := &EtcdServer{
  301. // TODO: use fake node for better testability
  302. Node: n,
  303. Store: st,
  304. w: wait,
  305. }
  306. done := make(chan struct{})
  307. var err error
  308. go func() {
  309. _, err = srv.Do(ctx, pb.Request{Method: "PUT", Id: 1})
  310. close(done)
  311. }()
  312. cancel()
  313. <-done
  314. action := st.Action()
  315. if len(action) != 0 {
  316. t.Errorf("len(action) = %v, want 0", len(action))
  317. }
  318. if err != context.Canceled {
  319. t.Fatalf("err = %v, want %v", err, context.Canceled)
  320. }
  321. w := []string{"Register1", "Trigger1"}
  322. if !reflect.DeepEqual(wait.action, w) {
  323. t.Errorf("wait.action = %+v, want %+v", wait.action, w)
  324. }
  325. }
  326. func TestDoProposalStopped(t *testing.T) {
  327. ctx, cancel := context.WithCancel(context.Background())
  328. defer cancel()
  329. // node cannot make any progress because there are two nodes
  330. n := raft.StartNode(0xBAD0, []int64{0xBAD0, 0xBAD1}, 10, 1)
  331. st := &storeRecorder{}
  332. tk := make(chan time.Time)
  333. // this makes <-tk always successful, which accelarates internal clock
  334. close(tk)
  335. srv := &EtcdServer{
  336. // TODO: use fake node for better testability
  337. Node: n,
  338. Store: st,
  339. Send: func(_ []raftpb.Message) {},
  340. Storage: &storageRecorder{},
  341. Ticker: tk,
  342. }
  343. srv.Start()
  344. done := make(chan struct{})
  345. var err error
  346. go func() {
  347. _, err = srv.Do(ctx, pb.Request{Method: "PUT", Id: 1})
  348. close(done)
  349. }()
  350. srv.Stop()
  351. <-done
  352. action := st.Action()
  353. if len(action) != 0 {
  354. t.Errorf("len(action) = %v, want 0", len(action))
  355. }
  356. if err != ErrStopped {
  357. t.Errorf("err = %v, want %v", err, ErrStopped)
  358. }
  359. }
  360. // TestSync tests sync 1. is nonblocking 2. sends out SYNC request.
  361. func TestSync(t *testing.T) {
  362. n := &nodeProposeDataRecorder{}
  363. srv := &EtcdServer{
  364. Node: n,
  365. }
  366. start := time.Now()
  367. srv.sync(defaultSyncTimeout)
  368. // check that sync is non-blocking
  369. if d := time.Since(start); d > time.Millisecond {
  370. t.Errorf("CallSyncTime = %v, want < %v", d, time.Millisecond)
  371. }
  372. testutil.ForceGosched()
  373. data := n.data()
  374. if len(data) != 1 {
  375. t.Fatalf("len(proposeData) = %d, want 1", len(data))
  376. }
  377. var r pb.Request
  378. if err := r.Unmarshal(data[0]); err != nil {
  379. t.Fatalf("unmarshal request error: %v", err)
  380. }
  381. if r.Method != "SYNC" {
  382. t.Errorf("method = %s, want SYNC", r.Method)
  383. }
  384. }
  385. // TestSyncTimeout tests the case that sync 1. is non-blocking 2. cancel request
  386. // after timeout
  387. func TestSyncTimeout(t *testing.T) {
  388. n := &nodeProposalBlockerRecorder{}
  389. srv := &EtcdServer{
  390. Node: n,
  391. }
  392. start := time.Now()
  393. srv.sync(0)
  394. // check that sync is non-blocking
  395. if d := time.Since(start); d > time.Millisecond {
  396. t.Errorf("CallSyncTime = %v, want < %v", d, time.Millisecond)
  397. }
  398. // give time for goroutine in sync to cancel
  399. // TODO: use fake clock
  400. testutil.ForceGosched()
  401. w := []string{"Propose blocked"}
  402. if g := n.Action(); !reflect.DeepEqual(g, w) {
  403. t.Errorf("action = %v, want %v", g, w)
  404. }
  405. }
  406. // TODO: TestNoSyncWhenNoLeader
  407. // blockingNodeProposer implements the node interface to allow users to
  408. // block until Propose has been called and then verify the Proposed data
  409. type blockingNodeProposer struct {
  410. ch chan []byte
  411. readyNode
  412. }
  413. func (n *blockingNodeProposer) Propose(_ context.Context, data []byte) error {
  414. n.ch <- data
  415. return nil
  416. }
  417. // TestSyncTrigger tests that the server proposes a SYNC request when its sync timer ticks
  418. func TestSyncTrigger(t *testing.T) {
  419. n := &blockingNodeProposer{
  420. ch: make(chan []byte),
  421. readyNode: *newReadyNode(),
  422. }
  423. st := make(chan time.Time, 1)
  424. srv := &EtcdServer{
  425. Node: n,
  426. Store: &storeRecorder{},
  427. Send: func(_ []raftpb.Message) {},
  428. Storage: &storageRecorder{},
  429. SyncTicker: st,
  430. }
  431. srv.Start()
  432. // trigger the server to become a leader and accept sync requests
  433. n.readyc <- raft.Ready{
  434. SoftState: &raft.SoftState{
  435. RaftState: raft.StateLeader,
  436. },
  437. }
  438. // trigger a sync request
  439. st <- time.Time{}
  440. var data []byte
  441. select {
  442. case <-time.After(time.Second):
  443. t.Fatalf("did not receive proposed request as expected!")
  444. case data = <-n.ch:
  445. }
  446. srv.Stop()
  447. var req pb.Request
  448. if err := req.Unmarshal(data); err != nil {
  449. t.Fatalf("error unmarshalling data: %v", err)
  450. }
  451. if req.Method != "SYNC" {
  452. t.Fatalf("unexpected proposed request: %#v", req.Method)
  453. }
  454. }
  455. // snapshot should snapshot the store and cut the persistent
  456. // TODO: node.Compact is called... we need to make the node an interface
  457. func TestSnapshot(t *testing.T) {
  458. n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1)
  459. defer n.Stop()
  460. st := &storeRecorder{}
  461. p := &storageRecorder{}
  462. s := &EtcdServer{
  463. Store: st,
  464. Storage: p,
  465. Node: n,
  466. }
  467. s.snapshot()
  468. action := st.Action()
  469. if len(action) != 1 {
  470. t.Fatalf("len(action) = %d, want 1", len(action))
  471. }
  472. if action[0] != "Save" {
  473. t.Errorf("action = %s, want Save", action[0])
  474. }
  475. action = p.Action()
  476. if len(action) != 1 {
  477. t.Fatalf("len(action) = %d, want 1", len(action))
  478. }
  479. if action[0] != "Cut" {
  480. t.Errorf("action = %s, want Cut", action[0])
  481. }
  482. }
  483. // Applied > SnapCount should trigger a SaveSnap event
  484. func TestTriggerSnap(t *testing.T) {
  485. ctx := context.Background()
  486. n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1)
  487. n.Campaign(ctx)
  488. st := &storeRecorder{}
  489. p := &storageRecorder{}
  490. s := &EtcdServer{
  491. Store: st,
  492. Send: func(_ []raftpb.Message) {},
  493. Storage: p,
  494. Node: n,
  495. SnapCount: 10,
  496. }
  497. s.Start()
  498. for i := 0; int64(i) < s.SnapCount; i++ {
  499. s.Do(ctx, pb.Request{Method: "PUT", Id: 1})
  500. }
  501. time.Sleep(time.Millisecond)
  502. s.Stop()
  503. action := p.Action()
  504. // each operation is recorded as a Save
  505. // Nop + SnapCount * Puts + Cut + SaveSnap = Save + SnapCount * Save + Cut + SaveSnap
  506. if len(action) != 3+int(s.SnapCount) {
  507. t.Fatalf("len(action) = %d, want %d", len(action), 3+int(s.SnapCount))
  508. }
  509. if action[12] != "SaveSnap" {
  510. t.Errorf("action = %s, want SaveSnap", action[12])
  511. }
  512. }
  513. // TestRecvSnapshot tests when it receives a snapshot from raft leader,
  514. // it should trigger storage.SaveSnap and also store.Recover.
  515. func TestRecvSnapshot(t *testing.T) {
  516. n := newReadyNode()
  517. st := &storeRecorder{}
  518. p := &storageRecorder{}
  519. s := &EtcdServer{
  520. Store: st,
  521. Send: func(_ []raftpb.Message) {},
  522. Storage: p,
  523. Node: n,
  524. }
  525. s.Start()
  526. n.readyc <- raft.Ready{Snapshot: raftpb.Snapshot{Index: 1}}
  527. // make goroutines move forward to receive snapshot
  528. testutil.ForceGosched()
  529. s.Stop()
  530. waction := []string{"Recovery"}
  531. if g := st.Action(); !reflect.DeepEqual(g, waction) {
  532. t.Errorf("store action = %v, want %v", g, waction)
  533. }
  534. waction = []string{"Save", "SaveSnap"}
  535. if g := p.Action(); !reflect.DeepEqual(g, waction) {
  536. t.Errorf("storage action = %v, want %v", g, waction)
  537. }
  538. }
  539. // TestRecvSlowSnapshot tests that slow snapshot will not be applied
  540. // to store.
  541. func TestRecvSlowSnapshot(t *testing.T) {
  542. n := newReadyNode()
  543. st := &storeRecorder{}
  544. s := &EtcdServer{
  545. Store: st,
  546. Send: func(_ []raftpb.Message) {},
  547. Storage: &storageRecorder{},
  548. Node: n,
  549. }
  550. s.Start()
  551. n.readyc <- raft.Ready{Snapshot: raftpb.Snapshot{Index: 1}}
  552. // make goroutines move forward to receive snapshot
  553. testutil.ForceGosched()
  554. action := st.Action()
  555. n.readyc <- raft.Ready{Snapshot: raftpb.Snapshot{Index: 1}}
  556. // make goroutines move forward to receive snapshot
  557. testutil.ForceGosched()
  558. s.Stop()
  559. if g := st.Action(); !reflect.DeepEqual(g, action) {
  560. t.Errorf("store action = %v, want %v", g, action)
  561. }
  562. }
  563. // TestAddNode tests AddNode can propose and perform node addition.
  564. func TestAddNode(t *testing.T) {
  565. n := newNodeConfChangeCommitterRecorder()
  566. s := &EtcdServer{
  567. Node: n,
  568. Store: &storeRecorder{},
  569. Send: func(_ []raftpb.Message) {},
  570. Storage: &storageRecorder{},
  571. }
  572. s.Start()
  573. s.AddNode(context.TODO(), 1, []byte("foo"))
  574. action := n.Action()
  575. s.Stop()
  576. waction := []string{"ProposeConfChange:ConfChangeAddNode", "ApplyConfChange:ConfChangeAddNode"}
  577. if !reflect.DeepEqual(action, waction) {
  578. t.Errorf("action = %v, want %v", action, waction)
  579. }
  580. }
  581. // TestRemoveNode tests RemoveNode can propose and perform node removal.
  582. func TestRemoveNode(t *testing.T) {
  583. n := newNodeConfChangeCommitterRecorder()
  584. s := &EtcdServer{
  585. Node: n,
  586. Store: &storeRecorder{},
  587. Send: func(_ []raftpb.Message) {},
  588. Storage: &storageRecorder{},
  589. }
  590. s.Start()
  591. s.RemoveNode(context.TODO(), 1)
  592. action := n.Action()
  593. s.Stop()
  594. waction := []string{"ProposeConfChange:ConfChangeRemoveNode", "ApplyConfChange:ConfChangeRemoveNode"}
  595. if !reflect.DeepEqual(action, waction) {
  596. t.Errorf("action = %v, want %v", action, waction)
  597. }
  598. }
  599. // TODO: test wait trigger correctness in multi-server case
  600. func TestGetBool(t *testing.T) {
  601. tests := []struct {
  602. b *bool
  603. wb bool
  604. wset bool
  605. }{
  606. {nil, false, false},
  607. {boolp(true), true, true},
  608. {boolp(false), false, true},
  609. }
  610. for i, tt := range tests {
  611. b, set := getBool(tt.b)
  612. if b != tt.wb {
  613. t.Errorf("#%d: value = %v, want %v", i, b, tt.wb)
  614. }
  615. if set != tt.wset {
  616. t.Errorf("#%d: set = %v, want %v", i, set, tt.wset)
  617. }
  618. }
  619. }
  620. func TestGenID(t *testing.T) {
  621. // Sanity check that the GenID function has been seeded appropriately
  622. // (math/rand is seeded with 1 by default)
  623. r := rand.NewSource(int64(1))
  624. var n int64
  625. for n == 0 {
  626. n = r.Int63()
  627. }
  628. if n == GenID() {
  629. t.Fatalf("GenID's rand seeded with 1!")
  630. }
  631. }
  632. type recorder struct {
  633. sync.Mutex
  634. action []string
  635. }
  636. func (r *recorder) record(action string) {
  637. r.Lock()
  638. r.action = append(r.action, action)
  639. r.Unlock()
  640. }
  641. func (r *recorder) Action() []string {
  642. r.Lock()
  643. cpy := make([]string, len(r.action))
  644. copy(cpy, r.action)
  645. r.Unlock()
  646. return cpy
  647. }
  648. type storeRecorder struct {
  649. recorder
  650. }
  651. func (s *storeRecorder) Version() int { return 0 }
  652. func (s *storeRecorder) Index() uint64 { return 0 }
  653. func (s *storeRecorder) Get(_ string, _, _ bool) (*store.Event, error) {
  654. s.record("Get")
  655. return &store.Event{}, nil
  656. }
  657. func (s *storeRecorder) Set(_ string, _ bool, _ string, _ time.Time) (*store.Event, error) {
  658. s.record("Set")
  659. return &store.Event{}, nil
  660. }
  661. func (s *storeRecorder) Update(_, _ string, _ time.Time) (*store.Event, error) {
  662. s.record("Update")
  663. return &store.Event{}, nil
  664. }
  665. func (s *storeRecorder) Create(_ string, _ bool, _ string, _ bool, _ time.Time) (*store.Event, error) {
  666. s.record("Create")
  667. return &store.Event{}, nil
  668. }
  669. func (s *storeRecorder) CompareAndSwap(_, _ string, _ uint64, _ string, _ time.Time) (*store.Event, error) {
  670. s.record("CompareAndSwap")
  671. return &store.Event{}, nil
  672. }
  673. func (s *storeRecorder) Delete(_ string, _, _ bool) (*store.Event, error) {
  674. s.record("Delete")
  675. return &store.Event{}, nil
  676. }
  677. func (s *storeRecorder) CompareAndDelete(_, _ string, _ uint64) (*store.Event, error) {
  678. s.record("CompareAndDelete")
  679. return &store.Event{}, nil
  680. }
  681. func (s *storeRecorder) Watch(_ string, _, _ bool, _ uint64) (store.Watcher, error) {
  682. s.record("Watch")
  683. return &stubWatcher{}, nil
  684. }
  685. func (s *storeRecorder) Save() ([]byte, error) {
  686. s.record("Save")
  687. return nil, nil
  688. }
  689. func (s *storeRecorder) Recovery(b []byte) error {
  690. s.record("Recovery")
  691. return nil
  692. }
  693. func (s *storeRecorder) TotalTransactions() uint64 { return 0 }
  694. func (s *storeRecorder) JsonStats() []byte { return nil }
  695. func (s *storeRecorder) DeleteExpiredKeys(cutoff time.Time) {
  696. s.record("DeleteExpiredKeys")
  697. }
  698. type stubWatcher struct{}
  699. func (w *stubWatcher) EventChan() chan *store.Event { return nil }
  700. func (w *stubWatcher) Remove() {}
  701. // errStoreRecorder returns an store error on Get, Watch request
  702. type errStoreRecorder struct {
  703. storeRecorder
  704. err error
  705. }
  706. func (s *errStoreRecorder) Get(_ string, _, _ bool) (*store.Event, error) {
  707. s.record("Get")
  708. return nil, s.err
  709. }
  710. func (s *errStoreRecorder) Watch(_ string, _, _ bool, _ uint64) (store.Watcher, error) {
  711. s.record("Watch")
  712. return nil, s.err
  713. }
  714. type waitRecorder struct {
  715. action []string
  716. }
  717. func (w *waitRecorder) Register(id int64) <-chan interface{} {
  718. w.action = append(w.action, fmt.Sprint("Register", id))
  719. return nil
  720. }
  721. func (w *waitRecorder) Trigger(id int64, x interface{}) {
  722. w.action = append(w.action, fmt.Sprint("Trigger", id))
  723. }
  724. func boolp(b bool) *bool { return &b }
  725. func stringp(s string) *string { return &s }
  726. type storageRecorder struct {
  727. recorder
  728. }
  729. func (p *storageRecorder) Save(st raftpb.HardState, ents []raftpb.Entry) {
  730. p.record("Save")
  731. }
  732. func (p *storageRecorder) Cut() error {
  733. p.record("Cut")
  734. return nil
  735. }
  736. func (p *storageRecorder) SaveSnap(st raftpb.Snapshot) {
  737. if raft.IsEmptySnap(st) {
  738. return
  739. }
  740. p.record("SaveSnap")
  741. }
  742. type readyNode struct {
  743. readyc chan raft.Ready
  744. }
  745. func newReadyNode() *readyNode {
  746. readyc := make(chan raft.Ready, 1)
  747. return &readyNode{readyc: readyc}
  748. }
  749. func (n *readyNode) Tick() {}
  750. func (n *readyNode) Campaign(ctx context.Context) error { return nil }
  751. func (n *readyNode) Propose(ctx context.Context, data []byte) error { return nil }
  752. func (n *readyNode) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error {
  753. return nil
  754. }
  755. func (n *readyNode) Step(ctx context.Context, msg raftpb.Message) error { return nil }
  756. func (n *readyNode) Ready() <-chan raft.Ready { return n.readyc }
  757. func (n *readyNode) ApplyConfChange(conf raftpb.ConfChange) {}
  758. func (n *readyNode) Stop() {}
  759. func (n *readyNode) Compact(d []byte) {}
  760. type nodeRecorder struct {
  761. recorder
  762. }
  763. func (n *nodeRecorder) Tick() {
  764. n.record("Tick")
  765. }
  766. func (n *nodeRecorder) Campaign(ctx context.Context) error {
  767. n.record("Campaign")
  768. return nil
  769. }
  770. func (n *nodeRecorder) Propose(ctx context.Context, data []byte) error {
  771. n.record("Propose")
  772. return nil
  773. }
  774. func (n *nodeRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error {
  775. n.record("ProposeConfChange")
  776. return nil
  777. }
  778. func (n *nodeRecorder) Step(ctx context.Context, msg raftpb.Message) error {
  779. n.record("Step")
  780. return nil
  781. }
  782. func (n *nodeRecorder) Ready() <-chan raft.Ready { return nil }
  783. func (n *nodeRecorder) ApplyConfChange(conf raftpb.ConfChange) {
  784. n.record("ApplyConfChange")
  785. }
  786. func (n *nodeRecorder) Stop() {
  787. n.record("Stop")
  788. }
  789. func (n *nodeRecorder) Compact(d []byte) {
  790. n.record("Compact")
  791. }
  792. type nodeProposeDataRecorder struct {
  793. nodeRecorder
  794. sync.Mutex
  795. d [][]byte
  796. }
  797. func (n *nodeProposeDataRecorder) data() [][]byte {
  798. n.Lock()
  799. d := n.d
  800. n.Unlock()
  801. return d
  802. }
  803. func (n *nodeProposeDataRecorder) Propose(ctx context.Context, data []byte) error {
  804. n.nodeRecorder.Propose(ctx, data)
  805. n.Lock()
  806. n.d = append(n.d, data)
  807. n.Unlock()
  808. return nil
  809. }
  810. type nodeProposalBlockerRecorder struct {
  811. nodeRecorder
  812. }
  813. func (n *nodeProposalBlockerRecorder) Propose(ctx context.Context, data []byte) error {
  814. <-ctx.Done()
  815. n.record("Propose blocked")
  816. return nil
  817. }
  818. type nodeConfChangeCommitterRecorder struct {
  819. nodeRecorder
  820. readyc chan raft.Ready
  821. }
  822. func newNodeConfChangeCommitterRecorder() *nodeConfChangeCommitterRecorder {
  823. readyc := make(chan raft.Ready, 1)
  824. readyc <- raft.Ready{SoftState: &raft.SoftState{RaftState: raft.StateLeader}}
  825. return &nodeConfChangeCommitterRecorder{readyc: readyc}
  826. }
  827. func (n *nodeConfChangeCommitterRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error {
  828. data, err := conf.Marshal()
  829. if err != nil {
  830. return err
  831. }
  832. n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Type: raftpb.EntryConfChange, Data: data}}}
  833. n.record("ProposeConfChange:" + conf.Type.String())
  834. return nil
  835. }
  836. func (n *nodeConfChangeCommitterRecorder) Ready() <-chan raft.Ready {
  837. return n.readyc
  838. }
  839. func (n *nodeConfChangeCommitterRecorder) ApplyConfChange(conf raftpb.ConfChange) {
  840. n.record("ApplyConfChange:" + conf.Type.String())
  841. }