server_test.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650
  1. package etcdserver
  2. import (
  3. "fmt"
  4. "math/rand"
  5. "reflect"
  6. "runtime"
  7. "sync"
  8. "testing"
  9. "time"
  10. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  11. "github.com/coreos/etcd/raft"
  12. "github.com/coreos/etcd/raft/raftpb"
  13. "github.com/coreos/etcd/store"
  14. "github.com/coreos/etcd/third_party/code.google.com/p/go.net/context"
  15. )
  16. // TestDoLocalAction tests requests which do not need to go through raft to be applied,
  17. // and are served through local data.
  18. func TestDoLocalAction(t *testing.T) {
  19. tests := []struct {
  20. req pb.Request
  21. wresp Response
  22. werr error
  23. waction []string
  24. }{
  25. {
  26. pb.Request{Method: "GET", Id: 1, Wait: true},
  27. Response{Watcher: &stubWatcher{}}, nil, []string{"Watch"},
  28. },
  29. {
  30. pb.Request{Method: "GET", Id: 1},
  31. Response{Event: &store.Event{}}, nil, []string{"Get"},
  32. },
  33. {
  34. pb.Request{Method: "BADMETHOD", Id: 1},
  35. Response{}, ErrUnknownMethod, []string{},
  36. },
  37. }
  38. for i, tt := range tests {
  39. st := &storeRecorder{}
  40. srv := &EtcdServer{Store: st}
  41. resp, err := srv.Do(context.TODO(), tt.req)
  42. if err != tt.werr {
  43. t.Fatalf("#%d: err = %+v, want %+v", i, err, tt.werr)
  44. }
  45. if !reflect.DeepEqual(resp, tt.wresp) {
  46. t.Errorf("#%d: resp = %+v, want %+v", i, resp, tt.wresp)
  47. }
  48. action := st.Action()
  49. if !reflect.DeepEqual(action, tt.waction) {
  50. t.Errorf("#%d: action = %+v, want %+v", i, action, tt.waction)
  51. }
  52. }
  53. }
  54. func TestApply(t *testing.T) {
  55. tests := []struct {
  56. req pb.Request
  57. wresp Response
  58. waction []string
  59. }{
  60. {
  61. pb.Request{Method: "POST", Id: 1},
  62. Response{Event: &store.Event{}}, []string{"Create"},
  63. },
  64. {
  65. pb.Request{Method: "PUT", Id: 1, PrevExists: boolp(true), PrevIndex: 1},
  66. Response{Event: &store.Event{}}, []string{"Update"},
  67. },
  68. {
  69. pb.Request{Method: "PUT", Id: 1, PrevExists: boolp(false), PrevIndex: 1},
  70. Response{Event: &store.Event{}}, []string{"Create"},
  71. },
  72. {
  73. pb.Request{Method: "PUT", Id: 1, PrevExists: boolp(true)},
  74. Response{Event: &store.Event{}}, []string{"Update"},
  75. },
  76. {
  77. pb.Request{Method: "PUT", Id: 1, PrevExists: boolp(false)},
  78. Response{Event: &store.Event{}}, []string{"Create"},
  79. },
  80. {
  81. pb.Request{Method: "PUT", Id: 1, PrevIndex: 1},
  82. Response{Event: &store.Event{}}, []string{"CompareAndSwap"},
  83. },
  84. {
  85. pb.Request{Method: "PUT", Id: 1, PrevValue: "bar"},
  86. Response{Event: &store.Event{}}, []string{"CompareAndSwap"},
  87. },
  88. {
  89. pb.Request{Method: "PUT", Id: 1},
  90. Response{Event: &store.Event{}}, []string{"Set"},
  91. },
  92. {
  93. pb.Request{Method: "DELETE", Id: 1, PrevIndex: 1},
  94. Response{Event: &store.Event{}}, []string{"CompareAndDelete"},
  95. },
  96. {
  97. pb.Request{Method: "DELETE", Id: 1, PrevValue: "bar"},
  98. Response{Event: &store.Event{}}, []string{"CompareAndDelete"},
  99. },
  100. {
  101. pb.Request{Method: "DELETE", Id: 1},
  102. Response{Event: &store.Event{}}, []string{"Delete"},
  103. },
  104. {
  105. pb.Request{Method: "QGET", Id: 1},
  106. Response{Event: &store.Event{}}, []string{"Get"},
  107. },
  108. {
  109. pb.Request{Method: "SYNC", Id: 1},
  110. Response{}, []string{"DeleteExpiredKeys"},
  111. },
  112. {
  113. pb.Request{Method: "BADMETHOD", Id: 1},
  114. Response{err: ErrUnknownMethod}, []string{},
  115. },
  116. }
  117. for i, tt := range tests {
  118. st := &storeRecorder{}
  119. srv := &EtcdServer{Store: st}
  120. resp := srv.apply(tt.req)
  121. if !reflect.DeepEqual(resp, tt.wresp) {
  122. t.Errorf("#%d: resp = %+v, want %+v", i, resp, tt.wresp)
  123. }
  124. action := st.Action()
  125. if !reflect.DeepEqual(action, tt.waction) {
  126. t.Errorf("#%d: action = %+v, want %+v", i, action, tt.waction)
  127. }
  128. }
  129. }
  130. func TestClusterOf1(t *testing.T) { testServer(t, 1) }
  131. func TestClusterOf3(t *testing.T) { testServer(t, 3) }
  132. func testServer(t *testing.T, ns int64) {
  133. ctx, cancel := context.WithCancel(context.Background())
  134. defer cancel()
  135. ss := make([]*EtcdServer, ns)
  136. send := func(msgs []raftpb.Message) {
  137. for _, m := range msgs {
  138. t.Logf("m = %+v\n", m)
  139. ss[m.To-1].Node.Step(ctx, m)
  140. }
  141. }
  142. peers := make([]int64, ns)
  143. for i := int64(0); i < ns; i++ {
  144. peers[i] = i + 1
  145. }
  146. for i := int64(0); i < ns; i++ {
  147. id := i + 1
  148. n := raft.StartNode(id, peers, 10, 1)
  149. tk := time.NewTicker(10 * time.Millisecond)
  150. defer tk.Stop()
  151. srv := &EtcdServer{
  152. Node: n,
  153. Store: store.New(),
  154. Send: send,
  155. Storage: &storageRecorder{},
  156. Ticker: tk.C,
  157. }
  158. srv.Start()
  159. // TODO(xiangli): randomize election timeout
  160. // then remove this sleep.
  161. time.Sleep(1 * time.Millisecond)
  162. ss[i] = srv
  163. }
  164. for i := 1; i <= 10; i++ {
  165. r := pb.Request{
  166. Method: "PUT",
  167. Id: int64(i),
  168. Path: "/foo",
  169. Val: "bar",
  170. }
  171. j := rand.Intn(len(ss))
  172. t.Logf("ss = %d", j)
  173. resp, err := ss[j].Do(ctx, r)
  174. if err != nil {
  175. t.Fatal(err)
  176. }
  177. g, w := resp.Event.Node, &store.NodeExtern{
  178. Key: "/foo",
  179. ModifiedIndex: uint64(i),
  180. CreatedIndex: uint64(i),
  181. Value: stringp("bar"),
  182. }
  183. if !reflect.DeepEqual(g, w) {
  184. t.Error("value:", *g.Value)
  185. t.Errorf("g = %+v, w %+v", g, w)
  186. }
  187. }
  188. time.Sleep(10 * time.Millisecond)
  189. var last interface{}
  190. for i, sv := range ss {
  191. sv.Stop()
  192. g, _ := sv.Store.Get("/", true, true)
  193. if last != nil && !reflect.DeepEqual(last, g) {
  194. t.Errorf("server %d: Root = %#v, want %#v", i, g, last)
  195. }
  196. last = g
  197. }
  198. }
  199. func TestDoProposal(t *testing.T) {
  200. tests := []pb.Request{
  201. pb.Request{Method: "POST", Id: 1},
  202. pb.Request{Method: "PUT", Id: 1},
  203. pb.Request{Method: "DELETE", Id: 1},
  204. pb.Request{Method: "GET", Id: 1, Quorum: true},
  205. }
  206. for i, tt := range tests {
  207. ctx, _ := context.WithCancel(context.Background())
  208. n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1)
  209. st := &storeRecorder{}
  210. tk := make(chan time.Time)
  211. // this makes <-tk always successful, which accelerates internal clock
  212. close(tk)
  213. srv := &EtcdServer{
  214. Node: n,
  215. Store: st,
  216. Send: func(_ []raftpb.Message) {},
  217. Storage: &storageRecorder{},
  218. Ticker: tk,
  219. }
  220. srv.Start()
  221. resp, err := srv.Do(ctx, tt)
  222. srv.Stop()
  223. action := st.Action()
  224. if len(action) != 1 {
  225. t.Errorf("#%d: len(action) = %d, want 1", i, len(action))
  226. }
  227. if err != nil {
  228. t.Fatalf("#%d: err = %v, want nil", i, err)
  229. }
  230. wresp := Response{Event: &store.Event{}}
  231. if !reflect.DeepEqual(resp, wresp) {
  232. t.Errorf("#%d: resp = %v, want %v", i, resp, wresp)
  233. }
  234. }
  235. }
  236. func TestDoProposalCancelled(t *testing.T) {
  237. ctx, cancel := context.WithCancel(context.Background())
  238. // node cannot make any progress because there are two nodes
  239. n := raft.StartNode(0xBAD0, []int64{0xBAD0, 0xBAD1}, 10, 1)
  240. st := &storeRecorder{}
  241. wait := &waitRecorder{}
  242. srv := &EtcdServer{
  243. // TODO: use fake node for better testability
  244. Node: n,
  245. Store: st,
  246. w: wait,
  247. }
  248. done := make(chan struct{})
  249. var err error
  250. go func() {
  251. _, err = srv.Do(ctx, pb.Request{Method: "PUT", Id: 1})
  252. close(done)
  253. }()
  254. cancel()
  255. <-done
  256. action := st.Action()
  257. if len(action) != 0 {
  258. t.Errorf("len(action) = %v, want 0", len(action))
  259. }
  260. if err != context.Canceled {
  261. t.Fatalf("err = %v, want %v", err, context.Canceled)
  262. }
  263. w := []string{"Register1", "Trigger1"}
  264. if !reflect.DeepEqual(wait.action, w) {
  265. t.Errorf("wait.action = %+v, want %+v", wait.action, w)
  266. }
  267. }
  268. func TestDoProposalStopped(t *testing.T) {
  269. ctx, cancel := context.WithCancel(context.Background())
  270. defer cancel()
  271. // node cannot make any progress because there are two nodes
  272. n := raft.StartNode(0xBAD0, []int64{0xBAD0, 0xBAD1}, 10, 1)
  273. st := &storeRecorder{}
  274. tk := make(chan time.Time)
  275. // this makes <-tk always successful, which accelarates internal clock
  276. close(tk)
  277. srv := &EtcdServer{
  278. // TODO: use fake node for better testability
  279. Node: n,
  280. Store: st,
  281. Send: func(_ []raftpb.Message) {},
  282. Storage: &storageRecorder{},
  283. Ticker: tk,
  284. }
  285. srv.Start()
  286. done := make(chan struct{})
  287. var err error
  288. go func() {
  289. _, err = srv.Do(ctx, pb.Request{Method: "PUT", Id: 1})
  290. close(done)
  291. }()
  292. srv.Stop()
  293. <-done
  294. action := st.Action()
  295. if len(action) != 0 {
  296. t.Errorf("len(action) = %v, want 0", len(action))
  297. }
  298. if err != ErrStopped {
  299. t.Errorf("err = %v, want %v", err, ErrStopped)
  300. }
  301. }
  302. // TestSync tests sync 1. is nonblocking 2. sends out SYNC request.
  303. func TestSync(t *testing.T) {
  304. n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1)
  305. n.Campaign(context.TODO())
  306. select {
  307. case <-n.Ready():
  308. case <-time.After(time.Millisecond):
  309. t.Fatalf("expect to receive ready within 1ms, but fail")
  310. }
  311. srv := &EtcdServer{
  312. // TODO: use fake node for better testability
  313. Node: n,
  314. }
  315. start := time.Now()
  316. srv.sync(defaultSyncTimeout)
  317. // check that sync is non-blocking
  318. if d := time.Since(start); d > time.Millisecond {
  319. t.Errorf("CallSyncTime = %v, want < %v", d, time.Millisecond)
  320. }
  321. // give time for goroutine in sync to run
  322. // TODO: use fake clock
  323. var ready raft.Ready
  324. select {
  325. case ready = <-n.Ready():
  326. case <-time.After(time.Millisecond):
  327. t.Fatalf("expect to receive ready within 1ms, but fail")
  328. }
  329. if len(ready.CommittedEntries) != 1 {
  330. t.Fatalf("len(CommittedEntries) = %d, want 1", len(ready.CommittedEntries))
  331. }
  332. e := ready.CommittedEntries[0]
  333. var req pb.Request
  334. if err := req.Unmarshal(e.Data); err != nil {
  335. t.Fatalf("unmarshal error: %v", err)
  336. }
  337. if req.Method != "SYNC" {
  338. t.Errorf("method = %s, want SYNC", req.Method)
  339. }
  340. }
  341. // TestSyncFail tests the case that sync 1. is non-blocking 2. fails to
  342. // propose SYNC request because there is no leader
  343. func TestSyncFail(t *testing.T) {
  344. // The node is run without Tick and Campaign, so it has no leader forever.
  345. n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1)
  346. select {
  347. case <-n.Ready():
  348. case <-time.After(time.Millisecond):
  349. t.Fatalf("expect to receive ready within 1ms, but fail")
  350. }
  351. srv := &EtcdServer{
  352. // TODO: use fake node for better testability
  353. Node: n,
  354. }
  355. routineN := runtime.NumGoroutine()
  356. start := time.Now()
  357. srv.sync(time.Millisecond)
  358. // check that sync is non-blocking
  359. if d := time.Since(start); d > time.Millisecond {
  360. t.Errorf("CallSyncTime = %v, want < %v", d, time.Millisecond)
  361. }
  362. // give time for goroutine in sync to cancel
  363. // TODO: use fake clock
  364. time.Sleep(2 * time.Millisecond)
  365. if g := runtime.NumGoroutine(); g != routineN {
  366. t.Errorf("NumGoroutine = %d, want %d", g, routineN)
  367. }
  368. select {
  369. case g := <-n.Ready():
  370. t.Errorf("ready = %+v, want no", g)
  371. default:
  372. }
  373. }
  374. func TestSyncTriggerDeleteExpriedKeys(t *testing.T) {
  375. n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1)
  376. n.Campaign(context.TODO())
  377. st := &storeRecorder{}
  378. syncInterval := 5 * time.Millisecond
  379. syncTicker := time.NewTicker(syncInterval)
  380. defer syncTicker.Stop()
  381. srv := &EtcdServer{
  382. // TODO: use fake node for better testability
  383. Node: n,
  384. Store: st,
  385. Send: func(_ []raftpb.Message) {},
  386. Storage: &storageRecorder{},
  387. SyncTicker: syncTicker.C,
  388. }
  389. srv.Start()
  390. // give time for sync request to be proposed and performed
  391. // TODO: use fake clock
  392. time.Sleep(syncInterval + time.Millisecond)
  393. srv.Stop()
  394. action := st.Action()
  395. if len(action) != 1 {
  396. t.Fatalf("len(action) = %d, want 1", len(action))
  397. }
  398. if action[0] != "DeleteExpiredKeys" {
  399. t.Errorf("action = %s, want DeleteExpiredKeys", action[0])
  400. }
  401. }
  402. // snapshot should snapshot the store and cut the persistent
  403. // TODO: node.Compact is called... we need to make the node an interface
  404. func TestSnapshot(t *testing.T) {
  405. n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1)
  406. defer n.Stop()
  407. st := &storeRecorder{}
  408. p := &storageRecorder{}
  409. s := &EtcdServer{
  410. Store: st,
  411. Storage: p,
  412. Node: n,
  413. }
  414. s.snapshot()
  415. action := st.Action()
  416. if len(action) != 1 {
  417. t.Fatalf("len(action) = %d, want 1", len(action))
  418. }
  419. if action[0] != "Save" {
  420. t.Errorf("action = %s, want Save", action[0])
  421. }
  422. action = p.Action()
  423. if len(action) != 1 {
  424. t.Fatalf("len(action) = %d, want 1", len(action))
  425. }
  426. if action[0] != "Cut" {
  427. t.Errorf("action = %s, want Cut", action[0])
  428. }
  429. }
  430. // Applied > SnapCount should trigger a SaveSnap event
  431. // TODO: receive a snapshot from raft leader should also be able
  432. // to trigger snapSave and also trigger a store.Recover.
  433. // We need fake node!
  434. func TestTriggerSnap(t *testing.T) {
  435. ctx := context.Background()
  436. n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1)
  437. n.Campaign(ctx)
  438. st := &storeRecorder{}
  439. p := &storageRecorder{}
  440. s := &EtcdServer{
  441. Store: st,
  442. Send: func(_ []raftpb.Message) {},
  443. Storage: p,
  444. Node: n,
  445. SnapCount: 10,
  446. }
  447. s.Start()
  448. for i := 0; int64(i) < s.SnapCount; i++ {
  449. s.Do(ctx, pb.Request{Method: "PUT", Id: 1})
  450. }
  451. time.Sleep(time.Millisecond)
  452. s.Stop()
  453. action := p.Action()
  454. // each operation is recorded as a Save
  455. // Nop + SnapCount * Puts + Cut + SaveSnap = Save + SnapCount * Save + Cut + SaveSnap
  456. if len(action) != 3+int(s.SnapCount) {
  457. t.Fatalf("len(action) = %d, want %d", len(action), 3+int(s.SnapCount))
  458. }
  459. if action[12] != "SaveSnap" {
  460. t.Errorf("action = %s, want SaveSnap", action[12])
  461. }
  462. }
  463. // TODO: test wait trigger correctness in multi-server case
  464. func TestGetBool(t *testing.T) {
  465. tests := []struct {
  466. b *bool
  467. wb bool
  468. wset bool
  469. }{
  470. {nil, false, false},
  471. {boolp(true), true, true},
  472. {boolp(false), false, true},
  473. }
  474. for i, tt := range tests {
  475. b, set := getBool(tt.b)
  476. if b != tt.wb {
  477. t.Errorf("#%d: value = %v, want %v", i, b, tt.wb)
  478. }
  479. if set != tt.wset {
  480. t.Errorf("#%d: set = %v, want %v", i, set, tt.wset)
  481. }
  482. }
  483. }
  484. type recorder struct {
  485. sync.Mutex
  486. action []string
  487. }
  488. func (r *recorder) record(action string) {
  489. r.Lock()
  490. r.action = append(r.action, action)
  491. r.Unlock()
  492. }
  493. func (r *recorder) Action() []string {
  494. r.Lock()
  495. cpy := make([]string, len(r.action))
  496. copy(cpy, r.action)
  497. r.Unlock()
  498. return cpy
  499. }
  500. type storeRecorder struct {
  501. recorder
  502. }
  503. func (s *storeRecorder) Version() int { return 0 }
  504. func (s *storeRecorder) Index() uint64 { return 0 }
  505. func (s *storeRecorder) Get(_ string, _, _ bool) (*store.Event, error) {
  506. s.record("Get")
  507. return &store.Event{}, nil
  508. }
  509. func (s *storeRecorder) Set(_ string, _ bool, _ string, _ time.Time) (*store.Event, error) {
  510. s.record("Set")
  511. return &store.Event{}, nil
  512. }
  513. func (s *storeRecorder) Update(_, _ string, _ time.Time) (*store.Event, error) {
  514. s.record("Update")
  515. return &store.Event{}, nil
  516. }
  517. func (s *storeRecorder) Create(_ string, _ bool, _ string, _ bool, _ time.Time) (*store.Event, error) {
  518. s.record("Create")
  519. return &store.Event{}, nil
  520. }
  521. func (s *storeRecorder) CompareAndSwap(_, _ string, _ uint64, _ string, _ time.Time) (*store.Event, error) {
  522. s.record("CompareAndSwap")
  523. return &store.Event{}, nil
  524. }
  525. func (s *storeRecorder) Delete(_ string, _, _ bool) (*store.Event, error) {
  526. s.record("Delete")
  527. return &store.Event{}, nil
  528. }
  529. func (s *storeRecorder) CompareAndDelete(_, _ string, _ uint64) (*store.Event, error) {
  530. s.record("CompareAndDelete")
  531. return &store.Event{}, nil
  532. }
  533. func (s *storeRecorder) Watch(_ string, _, _ bool, _ uint64) (store.Watcher, error) {
  534. s.record("Watch")
  535. return &stubWatcher{}, nil
  536. }
  537. func (s *storeRecorder) Save() ([]byte, error) {
  538. s.record("Save")
  539. return nil, nil
  540. }
  541. func (s *storeRecorder) Recovery(b []byte) error { return nil }
  542. func (s *storeRecorder) TotalTransactions() uint64 { return 0 }
  543. func (s *storeRecorder) JsonStats() []byte { return nil }
  544. func (s *storeRecorder) DeleteExpiredKeys(cutoff time.Time) {
  545. s.record("DeleteExpiredKeys")
  546. }
  547. type stubWatcher struct{}
  548. func (w *stubWatcher) EventChan() chan *store.Event { return nil }
  549. func (w *stubWatcher) Remove() {}
  550. type waitRecorder struct {
  551. action []string
  552. }
  553. func (w *waitRecorder) Register(id int64) <-chan interface{} {
  554. w.action = append(w.action, fmt.Sprint("Register", id))
  555. return nil
  556. }
  557. func (w *waitRecorder) Trigger(id int64, x interface{}) {
  558. w.action = append(w.action, fmt.Sprint("Trigger", id))
  559. }
  560. func boolp(b bool) *bool { return &b }
  561. func stringp(s string) *string { return &s }
  562. type storageRecorder struct {
  563. recorder
  564. }
  565. func (p *storageRecorder) Save(st raftpb.HardState, ents []raftpb.Entry) {
  566. p.record("Save")
  567. }
  568. func (p *storageRecorder) Cut() error {
  569. p.record("Cut")
  570. return nil
  571. }
  572. func (p *storageRecorder) SaveSnap(st raftpb.Snapshot) {
  573. if raft.IsEmptySnap(st) {
  574. return
  575. }
  576. p.record("SaveSnap")
  577. }
  578. func TestGenID(t *testing.T) {
  579. // Sanity check that the GenID function has been seeded appropriately
  580. // (math/rand is seeded with 1 by default)
  581. r := rand.NewSource(int64(1))
  582. var n int64
  583. for n == 0 {
  584. n = r.Int63()
  585. }
  586. if n == GenID() {
  587. t.Fatalf("GenID's rand seeded with 1!")
  588. }
  589. }