server_test.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632
  1. package etcdserver
  2. import (
  3. "fmt"
  4. "math/rand"
  5. "reflect"
  6. "runtime"
  7. "sync"
  8. "testing"
  9. "time"
  10. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  11. "github.com/coreos/etcd/raft"
  12. "github.com/coreos/etcd/raft/raftpb"
  13. "github.com/coreos/etcd/store"
  14. "github.com/coreos/etcd/third_party/code.google.com/p/go.net/context"
  15. )
  16. // TestDoLocalAction tests requests which do not need to go through raft to be applied,
  17. // and are served through local data.
  18. func TestDoLocalAction(t *testing.T) {
  19. tests := []struct {
  20. req pb.Request
  21. wresp Response
  22. werr error
  23. waction []string
  24. }{
  25. {
  26. pb.Request{Method: "GET", Id: 1, Wait: true},
  27. Response{Watcher: &stubWatcher{}}, nil, []string{"Watch"},
  28. },
  29. {
  30. pb.Request{Method: "GET", Id: 1},
  31. Response{Event: &store.Event{}}, nil, []string{"Get"},
  32. },
  33. {
  34. pb.Request{Method: "BADMETHOD", Id: 1},
  35. Response{}, ErrUnknownMethod, nil,
  36. },
  37. }
  38. for i, tt := range tests {
  39. store := &storeRecorder{}
  40. srv := &EtcdServer{Store: store}
  41. resp, err := srv.Do(context.TODO(), tt.req)
  42. if err != tt.werr {
  43. t.Fatalf("#%d: err = %+v, want %+v", i, err, tt.werr)
  44. }
  45. if !reflect.DeepEqual(resp, tt.wresp) {
  46. t.Errorf("#%d: resp = %+v, want %+v", i, resp, tt.wresp)
  47. }
  48. if !reflect.DeepEqual(store.action, tt.waction) {
  49. t.Errorf("#%d: action = %+v, want %+v", i, store.action, tt.waction)
  50. }
  51. }
  52. }
  53. func TestApply(t *testing.T) {
  54. tests := []struct {
  55. req pb.Request
  56. wresp Response
  57. waction []string
  58. }{
  59. {
  60. pb.Request{Method: "POST", Id: 1},
  61. Response{Event: &store.Event{}}, []string{"Create"},
  62. },
  63. {
  64. pb.Request{Method: "PUT", Id: 1, PrevExists: boolp(true), PrevIndex: 1},
  65. Response{Event: &store.Event{}}, []string{"Update"},
  66. },
  67. {
  68. pb.Request{Method: "PUT", Id: 1, PrevExists: boolp(false), PrevIndex: 1},
  69. Response{Event: &store.Event{}}, []string{"Create"},
  70. },
  71. {
  72. pb.Request{Method: "PUT", Id: 1, PrevExists: boolp(true)},
  73. Response{Event: &store.Event{}}, []string{"Update"},
  74. },
  75. {
  76. pb.Request{Method: "PUT", Id: 1, PrevExists: boolp(false)},
  77. Response{Event: &store.Event{}}, []string{"Create"},
  78. },
  79. {
  80. pb.Request{Method: "PUT", Id: 1, PrevIndex: 1},
  81. Response{Event: &store.Event{}}, []string{"CompareAndSwap"},
  82. },
  83. {
  84. pb.Request{Method: "PUT", Id: 1, PrevValue: "bar"},
  85. Response{Event: &store.Event{}}, []string{"CompareAndSwap"},
  86. },
  87. {
  88. pb.Request{Method: "PUT", Id: 1},
  89. Response{Event: &store.Event{}}, []string{"Set"},
  90. },
  91. {
  92. pb.Request{Method: "DELETE", Id: 1, PrevIndex: 1},
  93. Response{Event: &store.Event{}}, []string{"CompareAndDelete"},
  94. },
  95. {
  96. pb.Request{Method: "DELETE", Id: 1, PrevValue: "bar"},
  97. Response{Event: &store.Event{}}, []string{"CompareAndDelete"},
  98. },
  99. {
  100. pb.Request{Method: "DELETE", Id: 1},
  101. Response{Event: &store.Event{}}, []string{"Delete"},
  102. },
  103. {
  104. pb.Request{Method: "QGET", Id: 1},
  105. Response{Event: &store.Event{}}, []string{"Get"},
  106. },
  107. {
  108. pb.Request{Method: "SYNC", Id: 1},
  109. Response{}, []string{"DeleteExpiredKeys"},
  110. },
  111. {
  112. pb.Request{Method: "BADMETHOD", Id: 1},
  113. Response{err: ErrUnknownMethod}, nil,
  114. },
  115. }
  116. for i, tt := range tests {
  117. store := &storeRecorder{}
  118. srv := &EtcdServer{Store: store}
  119. resp := srv.apply(tt.req)
  120. if !reflect.DeepEqual(resp, tt.wresp) {
  121. t.Errorf("#%d: resp = %+v, want %+v", i, resp, tt.wresp)
  122. }
  123. if !reflect.DeepEqual(store.action, tt.waction) {
  124. t.Errorf("#%d: action = %+v, want %+v", i, store.action, tt.waction)
  125. }
  126. }
  127. }
  128. func TestClusterOf1(t *testing.T) { testServer(t, 1) }
  129. func TestClusterOf3(t *testing.T) { testServer(t, 3) }
  130. func testServer(t *testing.T, ns int64) {
  131. ctx, cancel := context.WithCancel(context.Background())
  132. defer cancel()
  133. ss := make([]*EtcdServer, ns)
  134. send := func(msgs []raftpb.Message) {
  135. for _, m := range msgs {
  136. t.Logf("m = %+v\n", m)
  137. ss[m.To-1].Node.Step(ctx, m)
  138. }
  139. }
  140. peers := make([]int64, ns)
  141. for i := int64(0); i < ns; i++ {
  142. peers[i] = i + 1
  143. }
  144. for i := int64(0); i < ns; i++ {
  145. id := i + 1
  146. n := raft.StartNode(id, peers, 10, 1)
  147. tk := time.NewTicker(10 * time.Millisecond)
  148. defer tk.Stop()
  149. srv := &EtcdServer{
  150. Node: n,
  151. Store: store.New(),
  152. Send: send,
  153. Storage: &storageRecorder{},
  154. Ticker: tk.C,
  155. }
  156. srv.Start()
  157. // TODO(xiangli): randomize election timeout
  158. // then remove this sleep.
  159. time.Sleep(1 * time.Millisecond)
  160. ss[i] = srv
  161. }
  162. for i := 1; i <= 10; i++ {
  163. r := pb.Request{
  164. Method: "PUT",
  165. Id: int64(i),
  166. Path: "/foo",
  167. Val: "bar",
  168. }
  169. j := rand.Intn(len(ss))
  170. t.Logf("ss = %d", j)
  171. resp, err := ss[j].Do(ctx, r)
  172. if err != nil {
  173. t.Fatal(err)
  174. }
  175. g, w := resp.Event.Node, &store.NodeExtern{
  176. Key: "/foo",
  177. ModifiedIndex: uint64(i),
  178. CreatedIndex: uint64(i),
  179. Value: stringp("bar"),
  180. }
  181. if !reflect.DeepEqual(g, w) {
  182. t.Error("value:", *g.Value)
  183. t.Errorf("g = %+v, w %+v", g, w)
  184. }
  185. }
  186. time.Sleep(10 * time.Millisecond)
  187. var last interface{}
  188. for i, sv := range ss {
  189. sv.Stop()
  190. g, _ := sv.Store.Get("/", true, true)
  191. if last != nil && !reflect.DeepEqual(last, g) {
  192. t.Errorf("server %d: Root = %#v, want %#v", i, g, last)
  193. }
  194. last = g
  195. }
  196. }
  197. func TestDoProposal(t *testing.T) {
  198. tests := []pb.Request{
  199. pb.Request{Method: "POST", Id: 1},
  200. pb.Request{Method: "PUT", Id: 1},
  201. pb.Request{Method: "DELETE", Id: 1},
  202. pb.Request{Method: "GET", Id: 1, Quorum: true},
  203. }
  204. for i, tt := range tests {
  205. ctx, _ := context.WithCancel(context.Background())
  206. n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1)
  207. st := &storeRecorder{}
  208. tk := make(chan time.Time)
  209. // this makes <-tk always successful, which accelerates internal clock
  210. close(tk)
  211. srv := &EtcdServer{
  212. Node: n,
  213. Store: st,
  214. Send: func(_ []raftpb.Message) {},
  215. Storage: &storageRecorder{},
  216. Ticker: tk,
  217. }
  218. srv.Start()
  219. resp, err := srv.Do(ctx, tt)
  220. srv.Stop()
  221. if len(st.action) != 1 {
  222. t.Errorf("#%d: len(action) = %d, want 1", i, len(st.action))
  223. }
  224. if err != nil {
  225. t.Fatalf("#%d: err = %v, want nil", i, err)
  226. }
  227. wresp := Response{Event: &store.Event{}}
  228. if !reflect.DeepEqual(resp, wresp) {
  229. t.Errorf("#%d: resp = %v, want %v", i, resp, wresp)
  230. }
  231. }
  232. }
  233. func TestDoProposalCancelled(t *testing.T) {
  234. ctx, cancel := context.WithCancel(context.Background())
  235. // node cannot make any progress because there are two nodes
  236. n := raft.StartNode(0xBAD0, []int64{0xBAD0, 0xBAD1}, 10, 1)
  237. st := &storeRecorder{}
  238. wait := &waitRecorder{}
  239. srv := &EtcdServer{
  240. // TODO: use fake node for better testability
  241. Node: n,
  242. Store: st,
  243. w: wait,
  244. }
  245. done := make(chan struct{})
  246. var err error
  247. go func() {
  248. _, err = srv.Do(ctx, pb.Request{Method: "PUT", Id: 1})
  249. close(done)
  250. }()
  251. cancel()
  252. <-done
  253. if len(st.action) != 0 {
  254. t.Errorf("len(action) = %v, want 0", len(st.action))
  255. }
  256. if err != context.Canceled {
  257. t.Fatalf("err = %v, want %v", err, context.Canceled)
  258. }
  259. w := []string{"Register1", "Trigger1"}
  260. if !reflect.DeepEqual(wait.action, w) {
  261. t.Errorf("wait.action = %+v, want %+v", wait.action, w)
  262. }
  263. }
  264. func TestDoProposalStopped(t *testing.T) {
  265. ctx, cancel := context.WithCancel(context.Background())
  266. defer cancel()
  267. // node cannot make any progress because there are two nodes
  268. n := raft.StartNode(0xBAD0, []int64{0xBAD0, 0xBAD1}, 10, 1)
  269. st := &storeRecorder{}
  270. tk := make(chan time.Time)
  271. // this makes <-tk always successful, which accelarates internal clock
  272. close(tk)
  273. srv := &EtcdServer{
  274. // TODO: use fake node for better testability
  275. Node: n,
  276. Store: st,
  277. Send: func(_ []raftpb.Message) {},
  278. Storage: &storageRecorder{},
  279. Ticker: tk,
  280. }
  281. srv.Start()
  282. done := make(chan struct{})
  283. var err error
  284. go func() {
  285. _, err = srv.Do(ctx, pb.Request{Method: "PUT", Id: 1})
  286. close(done)
  287. }()
  288. srv.Stop()
  289. <-done
  290. if len(st.action) != 0 {
  291. t.Errorf("len(action) = %v, want 0", len(st.action))
  292. }
  293. if err != ErrStopped {
  294. t.Errorf("err = %v, want %v", err, ErrStopped)
  295. }
  296. }
  297. // TestSync tests sync 1. is nonblocking 2. sends out SYNC request.
  298. func TestSync(t *testing.T) {
  299. n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1)
  300. n.Campaign(context.TODO())
  301. select {
  302. case <-n.Ready():
  303. case <-time.After(time.Millisecond):
  304. t.Fatalf("expect to receive ready within 1ms, but fail")
  305. }
  306. srv := &EtcdServer{
  307. // TODO: use fake node for better testability
  308. Node: n,
  309. }
  310. start := time.Now()
  311. srv.sync(defaultSyncTimeout)
  312. // check that sync is non-blocking
  313. if d := time.Since(start); d > time.Millisecond {
  314. t.Errorf("CallSyncTime = %v, want < %v", d, time.Millisecond)
  315. }
  316. // give time for goroutine in sync to run
  317. // TODO: use fake clock
  318. var ready raft.Ready
  319. select {
  320. case ready = <-n.Ready():
  321. case <-time.After(time.Millisecond):
  322. t.Fatalf("expect to receive ready within 1ms, but fail")
  323. }
  324. if len(ready.CommittedEntries) != 1 {
  325. t.Fatalf("len(CommittedEntries) = %d, want 1", len(ready.CommittedEntries))
  326. }
  327. e := ready.CommittedEntries[0]
  328. var req pb.Request
  329. if err := req.Unmarshal(e.Data); err != nil {
  330. t.Fatalf("unmarshal error: %v", err)
  331. }
  332. if req.Method != "SYNC" {
  333. t.Errorf("method = %s, want SYNC", req.Method)
  334. }
  335. }
  336. // TestSyncFail tests the case that sync 1. is non-blocking 2. fails to
  337. // propose SYNC request because there is no leader
  338. func TestSyncFail(t *testing.T) {
  339. // The node is run without Tick and Campaign, so it has no leader forever.
  340. n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1)
  341. select {
  342. case <-n.Ready():
  343. case <-time.After(time.Millisecond):
  344. t.Fatalf("expect to receive ready within 1ms, but fail")
  345. }
  346. srv := &EtcdServer{
  347. // TODO: use fake node for better testability
  348. Node: n,
  349. }
  350. routineN := runtime.NumGoroutine()
  351. start := time.Now()
  352. srv.sync(time.Millisecond)
  353. // check that sync is non-blocking
  354. if d := time.Since(start); d > time.Millisecond {
  355. t.Errorf("CallSyncTime = %v, want < %v", d, time.Millisecond)
  356. }
  357. // give time for goroutine in sync to cancel
  358. // TODO: use fake clock
  359. time.Sleep(2 * time.Millisecond)
  360. if g := runtime.NumGoroutine(); g != routineN {
  361. t.Errorf("NumGoroutine = %d, want %d", g, routineN)
  362. }
  363. select {
  364. case g := <-n.Ready():
  365. t.Errorf("ready = %+v, want no", g)
  366. default:
  367. }
  368. }
  369. func TestSyncTriggerDeleteExpriedKeys(t *testing.T) {
  370. n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1)
  371. n.Campaign(context.TODO())
  372. st := &storeRecorder{}
  373. syncInterval := 5 * time.Millisecond
  374. syncTicker := time.NewTicker(syncInterval)
  375. defer syncTicker.Stop()
  376. srv := &EtcdServer{
  377. // TODO: use fake node for better testability
  378. Node: n,
  379. Store: st,
  380. Send: func(_ []raftpb.Message) {},
  381. Storage: &storageRecorder{},
  382. SyncTicker: syncTicker.C,
  383. }
  384. srv.Start()
  385. // give time for sync request to be proposed and performed
  386. // TODO: use fake clock
  387. time.Sleep(syncInterval + time.Millisecond)
  388. srv.Stop()
  389. action := st.Action()
  390. if len(action) != 1 {
  391. t.Fatalf("len(action) = %d, want 1", len(action))
  392. }
  393. if action[0] != "DeleteExpiredKeys" {
  394. t.Errorf("action = %s, want DeleteExpiredKeys", action[0])
  395. }
  396. }
  397. // snapshot should snapshot the store and cut the persistent
  398. // TODO: node.Compact is called... we need to make the node an interface
  399. func TestSnapshot(t *testing.T) {
  400. n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1)
  401. defer n.Stop()
  402. st := &storeRecorder{}
  403. p := &storageRecorder{}
  404. s := &EtcdServer{
  405. Store: st,
  406. Storage: p,
  407. Node: n,
  408. }
  409. s.snapshot()
  410. action := st.Action()
  411. if len(action) != 1 {
  412. t.Fatalf("len(action) = %d, want 1", len(action))
  413. }
  414. if action[0] != "Save" {
  415. t.Errorf("action = %s, want Save", action[0])
  416. }
  417. action = p.Action()
  418. if len(action) != 1 {
  419. t.Fatalf("len(action) = %d, want 1", len(action))
  420. }
  421. if action[0] != "Cut" {
  422. t.Errorf("action = %s, want Cut", action[0])
  423. }
  424. }
  425. // Applied > SnapCount should trigger a SaveSnap event
  426. // TODO: receive a snapshot from raft leader should also be able
  427. // to trigger snapSave and also trigger a store.Recover.
  428. // We need fake node!
  429. func TestTriggerSnap(t *testing.T) {
  430. ctx := context.Background()
  431. n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1)
  432. n.Campaign(ctx)
  433. st := &storeRecorder{}
  434. p := &storageRecorder{}
  435. s := &EtcdServer{
  436. Store: st,
  437. Send: func(_ []raftpb.Message) {},
  438. Storage: p,
  439. Node: n,
  440. SnapCount: 10,
  441. }
  442. s.Start()
  443. for i := 0; int64(i) < s.SnapCount; i++ {
  444. s.Do(ctx, pb.Request{Method: "PUT", Id: 1})
  445. }
  446. time.Sleep(time.Millisecond)
  447. s.Stop()
  448. action := p.Action()
  449. // each operation is recorded as a Save
  450. // Nop + SnapCount * Puts + Cut + SaveSnap = Save + SnapCount * Save + Cut + SaveSnap
  451. if len(action) != 3+int(s.SnapCount) {
  452. t.Fatalf("len(action) = %d, want %d", len(action), 3+int(s.SnapCount))
  453. }
  454. if action[12] != "SaveSnap" {
  455. t.Errorf("action = %s, want SaveSnap", action[12])
  456. }
  457. }
  458. // TODO: test wait trigger correctness in multi-server case
  459. func TestGetBool(t *testing.T) {
  460. tests := []struct {
  461. b *bool
  462. wb bool
  463. wset bool
  464. }{
  465. {nil, false, false},
  466. {boolp(true), true, true},
  467. {boolp(false), false, true},
  468. }
  469. for i, tt := range tests {
  470. b, set := getBool(tt.b)
  471. if b != tt.wb {
  472. t.Errorf("#%d: value = %v, want %v", i, b, tt.wb)
  473. }
  474. if set != tt.wset {
  475. t.Errorf("#%d: set = %v, want %v", i, set, tt.wset)
  476. }
  477. }
  478. }
  479. type recorder struct {
  480. sync.Mutex
  481. action []string
  482. }
  483. func (r *recorder) record(action string) {
  484. r.Lock()
  485. r.action = append(r.action, action)
  486. r.Unlock()
  487. }
  488. func (r *recorder) Action() []string {
  489. r.Lock()
  490. cpy := make([]string, len(r.action))
  491. copy(cpy, r.action)
  492. r.Unlock()
  493. return cpy
  494. }
  495. type storeRecorder struct {
  496. recorder
  497. }
  498. func (s *storeRecorder) Version() int { return 0 }
  499. func (s *storeRecorder) Index() uint64 { return 0 }
  500. func (s *storeRecorder) Get(_ string, _, _ bool) (*store.Event, error) {
  501. s.record("Get")
  502. return &store.Event{}, nil
  503. }
  504. func (s *storeRecorder) Set(_ string, _ bool, _ string, _ time.Time) (*store.Event, error) {
  505. s.record("Set")
  506. return &store.Event{}, nil
  507. }
  508. func (s *storeRecorder) Update(_, _ string, _ time.Time) (*store.Event, error) {
  509. s.record("Update")
  510. return &store.Event{}, nil
  511. }
  512. func (s *storeRecorder) Create(_ string, _ bool, _ string, _ bool, _ time.Time) (*store.Event, error) {
  513. s.record("Create")
  514. return &store.Event{}, nil
  515. }
  516. func (s *storeRecorder) CompareAndSwap(_, _ string, _ uint64, _ string, _ time.Time) (*store.Event, error) {
  517. s.record("CompareAndSwap")
  518. return &store.Event{}, nil
  519. }
  520. func (s *storeRecorder) Delete(_ string, _, _ bool) (*store.Event, error) {
  521. s.record("Delete")
  522. return &store.Event{}, nil
  523. }
  524. func (s *storeRecorder) CompareAndDelete(_, _ string, _ uint64) (*store.Event, error) {
  525. s.record("CompareAndDelete")
  526. return &store.Event{}, nil
  527. }
  528. func (s *storeRecorder) Watch(_ string, _, _ bool, _ uint64) (store.Watcher, error) {
  529. s.record("Watch")
  530. return &stubWatcher{}, nil
  531. }
  532. func (s *storeRecorder) Save() ([]byte, error) {
  533. s.record("Save")
  534. return nil, nil
  535. }
  536. func (s *storeRecorder) Recovery(b []byte) error { return nil }
  537. func (s *storeRecorder) TotalTransactions() uint64 { return 0 }
  538. func (s *storeRecorder) JsonStats() []byte { return nil }
  539. func (s *storeRecorder) DeleteExpiredKeys(cutoff time.Time) {
  540. s.record("DeleteExpiredKeys")
  541. }
  542. type stubWatcher struct{}
  543. func (w *stubWatcher) EventChan() chan *store.Event { return nil }
  544. func (w *stubWatcher) Remove() {}
  545. type waitRecorder struct {
  546. action []string
  547. }
  548. func (w *waitRecorder) Register(id int64) <-chan interface{} {
  549. w.action = append(w.action, fmt.Sprint("Register", id))
  550. return nil
  551. }
  552. func (w *waitRecorder) Trigger(id int64, x interface{}) {
  553. w.action = append(w.action, fmt.Sprint("Trigger", id))
  554. }
  555. func boolp(b bool) *bool { return &b }
  556. func stringp(s string) *string { return &s }
  557. type storageRecorder struct {
  558. recorder
  559. }
  560. func (p *storageRecorder) Save(st raftpb.HardState, ents []raftpb.Entry) {
  561. p.record("Save")
  562. }
  563. func (p *storageRecorder) Cut() error {
  564. p.record("Cut")
  565. return nil
  566. }
  567. func (p *storageRecorder) SaveSnap(st raftpb.Snapshot) {
  568. if raft.IsEmptySnap(st) {
  569. return
  570. }
  571. p.record("SaveSnap")
  572. }