server_test.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407
  1. package etcdserver
  2. import (
  3. "fmt"
  4. "math/rand"
  5. "reflect"
  6. "testing"
  7. "time"
  8. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  9. "github.com/coreos/etcd/raft"
  10. "github.com/coreos/etcd/raft/raftpb"
  11. "github.com/coreos/etcd/store"
  12. "github.com/coreos/etcd/third_party/code.google.com/p/go.net/context"
  13. )
  14. // TestDoLocalAction tests requests which do not need to go through raft to be applied,
  15. // and are served through local data.
  16. func TestDoLocalAction(t *testing.T) {
  17. tests := []struct {
  18. req pb.Request
  19. wresp Response
  20. werr error
  21. waction []string
  22. }{
  23. {
  24. pb.Request{Method: "GET", Id: 1, Wait: true},
  25. Response{Watcher: &stubWatcher{}}, nil, []string{"Watch"},
  26. },
  27. {
  28. pb.Request{Method: "GET", Id: 1},
  29. Response{Event: &store.Event{}}, nil, []string{"Get"},
  30. },
  31. {
  32. pb.Request{Method: "BADMETHOD", Id: 1},
  33. Response{}, ErrUnknownMethod, nil,
  34. },
  35. }
  36. for i, tt := range tests {
  37. store := &storeRecorder{}
  38. srv := &Server{Store: store}
  39. resp, err := srv.Do(context.TODO(), tt.req)
  40. if err != tt.werr {
  41. t.Fatalf("#%d: err = %+v, want %+v", i, err, tt.werr)
  42. }
  43. if !reflect.DeepEqual(resp, tt.wresp) {
  44. t.Errorf("#%d: resp = %+v, want %+v", i, resp, tt.wresp)
  45. }
  46. if !reflect.DeepEqual(store.action, tt.waction) {
  47. t.Errorf("#%d: action = %+v, want %+v", i, store.action, tt.waction)
  48. }
  49. }
  50. }
  51. func TestApply(t *testing.T) {
  52. tests := []struct {
  53. req pb.Request
  54. wresp Response
  55. waction []string
  56. }{
  57. {
  58. pb.Request{Method: "POST", Id: 1},
  59. Response{Event: &store.Event{}}, []string{"Create"},
  60. },
  61. {
  62. pb.Request{Method: "PUT", Id: 1, PrevExists: boolp(true), PrevIndex: 1},
  63. Response{Event: &store.Event{}}, []string{"Update"},
  64. },
  65. {
  66. pb.Request{Method: "PUT", Id: 1, PrevExists: boolp(false), PrevIndex: 1},
  67. Response{Event: &store.Event{}}, []string{"Create"},
  68. },
  69. {
  70. pb.Request{Method: "PUT", Id: 1, PrevExists: boolp(true)},
  71. Response{Event: &store.Event{}}, []string{"Update"},
  72. },
  73. {
  74. pb.Request{Method: "PUT", Id: 1, PrevExists: boolp(false)},
  75. Response{Event: &store.Event{}}, []string{"Create"},
  76. },
  77. {
  78. pb.Request{Method: "PUT", Id: 1, PrevIndex: 1},
  79. Response{Event: &store.Event{}}, []string{"CompareAndSwap"},
  80. },
  81. {
  82. pb.Request{Method: "PUT", Id: 1, PrevValue: "bar"},
  83. Response{Event: &store.Event{}}, []string{"CompareAndSwap"},
  84. },
  85. {
  86. pb.Request{Method: "PUT", Id: 1},
  87. Response{Event: &store.Event{}}, []string{"Set"},
  88. },
  89. {
  90. pb.Request{Method: "DELETE", Id: 1, PrevIndex: 1},
  91. Response{Event: &store.Event{}}, []string{"CompareAndDelete"},
  92. },
  93. {
  94. pb.Request{Method: "DELETE", Id: 1, PrevValue: "bar"},
  95. Response{Event: &store.Event{}}, []string{"CompareAndDelete"},
  96. },
  97. {
  98. pb.Request{Method: "DELETE", Id: 1},
  99. Response{Event: &store.Event{}}, []string{"Delete"},
  100. },
  101. {
  102. pb.Request{Method: "QGET", Id: 1},
  103. Response{Event: &store.Event{}}, []string{"Get"},
  104. },
  105. {
  106. pb.Request{Method: "BADMETHOD", Id: 1},
  107. Response{err: ErrUnknownMethod}, nil,
  108. },
  109. }
  110. for i, tt := range tests {
  111. store := &storeRecorder{}
  112. srv := &Server{Store: store}
  113. resp := srv.apply(tt.req)
  114. if !reflect.DeepEqual(resp, tt.wresp) {
  115. t.Errorf("#%d: resp = %+v, want %+v", i, resp, tt.wresp)
  116. }
  117. if !reflect.DeepEqual(store.action, tt.waction) {
  118. t.Errorf("#%d: action = %+v, want %+v", i, store.action, tt.waction)
  119. }
  120. }
  121. }
  122. func TestClusterOf1(t *testing.T) { testServer(t, 1) }
  123. func TestClusterOf3(t *testing.T) { testServer(t, 3) }
  124. func testServer(t *testing.T, ns int64) {
  125. ctx, cancel := context.WithCancel(context.Background())
  126. defer cancel()
  127. ss := make([]*Server, ns)
  128. send := func(msgs []raftpb.Message) {
  129. for _, m := range msgs {
  130. t.Logf("m = %+v\n", m)
  131. ss[m.To-1].Node.Step(ctx, m)
  132. }
  133. }
  134. peers := make([]int64, ns)
  135. for i := int64(0); i < ns; i++ {
  136. peers[i] = i + 1
  137. }
  138. for i := int64(0); i < ns; i++ {
  139. id := i + 1
  140. n := raft.Start(id, peers, 10, 1)
  141. tk := time.NewTicker(10 * time.Millisecond)
  142. defer tk.Stop()
  143. srv := &Server{
  144. Node: n,
  145. Store: store.New(),
  146. Send: send,
  147. Save: func(_ raftpb.State, _ []raftpb.Entry) {},
  148. Ticker: tk.C,
  149. }
  150. Start(srv)
  151. // TODO(xiangli): randomize election timeout
  152. // then remove this sleep.
  153. time.Sleep(1 * time.Millisecond)
  154. ss[i] = srv
  155. }
  156. for i := 1; i <= 10; i++ {
  157. r := pb.Request{
  158. Method: "PUT",
  159. Id: int64(i),
  160. Path: "/foo",
  161. Val: "bar",
  162. }
  163. j := rand.Intn(len(ss))
  164. t.Logf("ss = %d", j)
  165. resp, err := ss[j].Do(ctx, r)
  166. if err != nil {
  167. t.Fatal(err)
  168. }
  169. g, w := resp.Event.Node, &store.NodeExtern{
  170. Key: "/foo",
  171. ModifiedIndex: uint64(i),
  172. CreatedIndex: uint64(i),
  173. Value: stringp("bar"),
  174. }
  175. if !reflect.DeepEqual(g, w) {
  176. t.Error("value:", *g.Value)
  177. t.Errorf("g = %+v, w %+v", g, w)
  178. }
  179. }
  180. time.Sleep(10 * time.Millisecond)
  181. var last interface{}
  182. for i, sv := range ss {
  183. sv.Stop()
  184. g, _ := sv.Store.Get("/", true, true)
  185. if last != nil && !reflect.DeepEqual(last, g) {
  186. t.Errorf("server %d: Root = %#v, want %#v", i, g, last)
  187. }
  188. last = g
  189. }
  190. }
  191. func TestDoProposal(t *testing.T) {
  192. tests := []pb.Request{
  193. pb.Request{Method: "POST", Id: 1},
  194. pb.Request{Method: "PUT", Id: 1},
  195. pb.Request{Method: "DELETE", Id: 1},
  196. pb.Request{Method: "GET", Id: 1, Quorum: true},
  197. }
  198. for i, tt := range tests {
  199. ctx, _ := context.WithCancel(context.Background())
  200. n := raft.Start(0xBAD0, []int64{0xBAD0}, 10, 1)
  201. st := &storeRecorder{}
  202. tk := make(chan time.Time)
  203. // this makes <-tk always successful, which accelerates internal clock
  204. close(tk)
  205. srv := &Server{
  206. Node: n,
  207. Store: st,
  208. Send: func(_ []raftpb.Message) {},
  209. Save: func(_ raftpb.State, _ []raftpb.Entry) {},
  210. Ticker: tk,
  211. }
  212. Start(srv)
  213. resp, err := srv.Do(ctx, tt)
  214. srv.Stop()
  215. if len(st.action) != 1 {
  216. t.Errorf("#%d: len(action) = %d, want 1", i, len(st.action))
  217. }
  218. if err != nil {
  219. t.Fatalf("#%d: err = %v, want nil", i, err)
  220. }
  221. wresp := Response{Event: &store.Event{}}
  222. if !reflect.DeepEqual(resp, wresp) {
  223. t.Errorf("#%d: resp = %v, want %v", i, resp, wresp)
  224. }
  225. }
  226. }
  227. func TestDoProposalCancelled(t *testing.T) {
  228. ctx, cancel := context.WithCancel(context.Background())
  229. // node cannot make any progress because there are two nodes
  230. n := raft.Start(0xBAD0, []int64{0xBAD0, 0xBAD1}, 10, 1)
  231. st := &storeRecorder{}
  232. wait := &waitRecorder{}
  233. srv := &Server{
  234. // TODO: use fake node for better testability
  235. Node: n,
  236. Store: st,
  237. w: wait,
  238. }
  239. done := make(chan struct{})
  240. var err error
  241. go func() {
  242. _, err = srv.Do(ctx, pb.Request{Method: "PUT", Id: 1})
  243. close(done)
  244. }()
  245. cancel()
  246. <-done
  247. if len(st.action) != 0 {
  248. t.Errorf("len(action) = %v, want 0", len(st.action))
  249. }
  250. if err != context.Canceled {
  251. t.Fatalf("err = %v, want %v", err, context.Canceled)
  252. }
  253. w := []string{"Register1", "Trigger1"}
  254. if !reflect.DeepEqual(wait.action, w) {
  255. t.Errorf("wait.action = %+v, want %+v", wait.action, w)
  256. }
  257. }
  258. func TestDoProposalStopped(t *testing.T) {
  259. ctx, cancel := context.WithCancel(context.Background())
  260. defer cancel()
  261. // node cannot make any progress because there are two nodes
  262. n := raft.Start(0xBAD0, []int64{0xBAD0, 0xBAD1}, 10, 1)
  263. st := &storeRecorder{}
  264. tk := make(chan time.Time)
  265. // this makes <-tk always successful, which accelarates internal clock
  266. close(tk)
  267. srv := &Server{
  268. // TODO: use fake node for better testability
  269. Node: n,
  270. Store: st,
  271. Send: func(_ []raftpb.Message) {},
  272. Save: func(_ raftpb.State, _ []raftpb.Entry) {},
  273. Ticker: tk,
  274. }
  275. Start(srv)
  276. done := make(chan struct{})
  277. var err error
  278. go func() {
  279. _, err = srv.Do(ctx, pb.Request{Method: "PUT", Id: 1})
  280. close(done)
  281. }()
  282. srv.Stop()
  283. <-done
  284. if len(st.action) != 0 {
  285. t.Errorf("len(action) = %v, want 0", len(st.action))
  286. }
  287. if err != ErrStopped {
  288. t.Errorf("err = %v, want %v", err, ErrStopped)
  289. }
  290. }
  291. // TODO: test wait trigger correctness in multi-server case
  292. func TestGetBool(t *testing.T) {
  293. tests := []struct {
  294. b *bool
  295. wb bool
  296. wset bool
  297. }{
  298. {nil, false, false},
  299. {boolp(true), true, true},
  300. {boolp(false), false, true},
  301. }
  302. for i, tt := range tests {
  303. b, set := getBool(tt.b)
  304. if b != tt.wb {
  305. t.Errorf("#%d: value = %v, want %v", i, b, tt.wb)
  306. }
  307. if set != tt.wset {
  308. t.Errorf("#%d: set = %v, want %v", i, set, tt.wset)
  309. }
  310. }
  311. }
  312. type storeRecorder struct {
  313. action []string
  314. }
  315. func (s *storeRecorder) Version() int { return 0 }
  316. func (s *storeRecorder) Index() uint64 { return 0 }
  317. func (s *storeRecorder) Get(_ string, _, _ bool) (*store.Event, error) {
  318. s.action = append(s.action, "Get")
  319. return &store.Event{}, nil
  320. }
  321. func (s *storeRecorder) Set(_ string, _ bool, _ string, _ time.Time) (*store.Event, error) {
  322. s.action = append(s.action, "Set")
  323. return &store.Event{}, nil
  324. }
  325. func (s *storeRecorder) Update(_, _ string, _ time.Time) (*store.Event, error) {
  326. s.action = append(s.action, "Update")
  327. return &store.Event{}, nil
  328. }
  329. func (s *storeRecorder) Create(_ string, _ bool, _ string, _ bool, _ time.Time) (*store.Event, error) {
  330. s.action = append(s.action, "Create")
  331. return &store.Event{}, nil
  332. }
  333. func (s *storeRecorder) CompareAndSwap(_, _ string, _ uint64, _ string, _ time.Time) (*store.Event, error) {
  334. s.action = append(s.action, "CompareAndSwap")
  335. return &store.Event{}, nil
  336. }
  337. func (s *storeRecorder) Delete(_ string, _, _ bool) (*store.Event, error) {
  338. s.action = append(s.action, "Delete")
  339. return &store.Event{}, nil
  340. }
  341. func (s *storeRecorder) CompareAndDelete(_, _ string, _ uint64) (*store.Event, error) {
  342. s.action = append(s.action, "CompareAndDelete")
  343. return &store.Event{}, nil
  344. }
  345. func (s *storeRecorder) Watch(_ string, _, _ bool, _ uint64) (store.Watcher, error) {
  346. s.action = append(s.action, "Watch")
  347. return &stubWatcher{}, nil
  348. }
  349. func (s *storeRecorder) Save() ([]byte, error) { return nil, nil }
  350. func (s *storeRecorder) Recovery(b []byte) error { return nil }
  351. func (s *storeRecorder) TotalTransactions() uint64 { return 0 }
  352. func (s *storeRecorder) JsonStats() []byte { return nil }
  353. func (s *storeRecorder) DeleteExpiredKeys(cutoff time.Time) {}
  354. type stubWatcher struct{}
  355. func (w *stubWatcher) EventChan() chan *store.Event { return nil }
  356. func (w *stubWatcher) Remove() {}
  357. type waitRecorder struct {
  358. action []string
  359. }
  360. func (w *waitRecorder) Register(id int64) <-chan interface{} {
  361. w.action = append(w.action, fmt.Sprint("Register", id))
  362. return nil
  363. }
  364. func (w *waitRecorder) Trigger(id int64, x interface{}) {
  365. w.action = append(w.action, fmt.Sprint("Trigger", id))
  366. }
  367. func boolp(b bool) *bool { return &b }
  368. func stringp(s string) *string { return &s }