server_test.go 27 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093
  1. package etcdserver
  2. import (
  3. "fmt"
  4. "math/rand"
  5. "reflect"
  6. "sync"
  7. "testing"
  8. "time"
  9. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  10. "github.com/coreos/etcd/raft"
  11. "github.com/coreos/etcd/raft/raftpb"
  12. "github.com/coreos/etcd/store"
  13. "github.com/coreos/etcd/testutil"
  14. "github.com/coreos/etcd/third_party/code.google.com/p/go.net/context"
  15. )
  16. func TestGetExpirationTime(t *testing.T) {
  17. tests := []struct {
  18. r pb.Request
  19. want time.Time
  20. }{
  21. {
  22. pb.Request{Expiration: 0},
  23. time.Time{},
  24. },
  25. {
  26. pb.Request{Expiration: 60000},
  27. time.Unix(0, 60000),
  28. },
  29. {
  30. pb.Request{Expiration: -60000},
  31. time.Unix(0, -60000),
  32. },
  33. }
  34. for i, tt := range tests {
  35. got := getExpirationTime(&tt.r)
  36. if !reflect.DeepEqual(tt.want, got) {
  37. t.Errorf("#%d: incorrect expiration time: want=%v got=%v", i, tt.want, got)
  38. }
  39. }
  40. }
  41. // TestDoLocalAction tests requests which do not need to go through raft to be applied,
  42. // and are served through local data.
  43. func TestDoLocalAction(t *testing.T) {
  44. tests := []struct {
  45. req pb.Request
  46. wresp Response
  47. werr error
  48. wactions []action
  49. }{
  50. {
  51. pb.Request{Method: "GET", Id: 1, Wait: true},
  52. Response{Watcher: &stubWatcher{}}, nil, []action{action{name: "Watch"}},
  53. },
  54. {
  55. pb.Request{Method: "GET", Id: 1},
  56. Response{Event: &store.Event{}}, nil,
  57. []action{
  58. action{
  59. name: "Get",
  60. params: []interface{}{"", false, false},
  61. },
  62. },
  63. },
  64. {
  65. pb.Request{Method: "BADMETHOD", Id: 1},
  66. Response{}, ErrUnknownMethod, []action{},
  67. },
  68. }
  69. for i, tt := range tests {
  70. st := &storeRecorder{}
  71. srv := &EtcdServer{Store: st}
  72. resp, err := srv.Do(context.TODO(), tt.req)
  73. if err != tt.werr {
  74. t.Fatalf("#%d: err = %+v, want %+v", i, err, tt.werr)
  75. }
  76. if !reflect.DeepEqual(resp, tt.wresp) {
  77. t.Errorf("#%d: resp = %+v, want %+v", i, resp, tt.wresp)
  78. }
  79. gaction := st.Action()
  80. if !reflect.DeepEqual(gaction, tt.wactions) {
  81. t.Errorf("#%d: action = %+v, want %+v", i, gaction, tt.wactions)
  82. }
  83. }
  84. }
  85. // TestDoBadLocalAction tests server requests which do not need to go through consensus,
  86. // and return errors when they fetch from local data.
  87. func TestDoBadLocalAction(t *testing.T) {
  88. storeErr := fmt.Errorf("bah")
  89. tests := []struct {
  90. req pb.Request
  91. wactions []action
  92. }{
  93. {
  94. pb.Request{Method: "GET", Id: 1, Wait: true},
  95. []action{action{name: "Watch"}},
  96. },
  97. {
  98. pb.Request{Method: "GET", Id: 1},
  99. []action{action{name: "Get"}},
  100. },
  101. }
  102. for i, tt := range tests {
  103. st := &errStoreRecorder{err: storeErr}
  104. srv := &EtcdServer{Store: st}
  105. resp, err := srv.Do(context.Background(), tt.req)
  106. if err != storeErr {
  107. t.Fatalf("#%d: err = %+v, want %+v", i, err, storeErr)
  108. }
  109. if !reflect.DeepEqual(resp, Response{}) {
  110. t.Errorf("#%d: resp = %+v, want %+v", i, resp, Response{})
  111. }
  112. gaction := st.Action()
  113. if !reflect.DeepEqual(gaction, tt.wactions) {
  114. t.Errorf("#%d: action = %+v, want %+v", i, gaction, tt.wactions)
  115. }
  116. }
  117. }
  118. func TestApply(t *testing.T) {
  119. tests := []struct {
  120. req pb.Request
  121. wresp Response
  122. wactions []action
  123. }{
  124. // POST ==> Create
  125. {
  126. pb.Request{Method: "POST", Id: 1},
  127. Response{Event: &store.Event{}},
  128. []action{
  129. action{
  130. name: "Create",
  131. params: []interface{}{"", false, "", true, time.Time{}},
  132. },
  133. },
  134. },
  135. // POST ==> Create, with expiration
  136. {
  137. pb.Request{Method: "POST", Id: 1, Expiration: 1337},
  138. Response{Event: &store.Event{}},
  139. []action{
  140. action{
  141. name: "Create",
  142. params: []interface{}{"", false, "", true, time.Unix(0, 1337)},
  143. },
  144. },
  145. },
  146. // PUT ==> Set
  147. {
  148. pb.Request{Method: "PUT", Id: 1},
  149. Response{Event: &store.Event{}}, []action{action{name: "Set"}},
  150. },
  151. // PUT with PrevExist=true ==> Update
  152. {
  153. pb.Request{Method: "PUT", Id: 1, PrevExist: boolp(true)},
  154. Response{Event: &store.Event{}},
  155. []action{
  156. action{
  157. name: "Update",
  158. params: []interface{}{"", "", time.Time{}},
  159. },
  160. },
  161. },
  162. // PUT with PrevExist=false ==> Create
  163. {
  164. pb.Request{Method: "PUT", Id: 1, PrevExist: boolp(false)},
  165. Response{Event: &store.Event{}},
  166. []action{
  167. action{
  168. name: "Create",
  169. params: []interface{}{"", false, "", false, time.Time{}},
  170. },
  171. },
  172. },
  173. // PUT with PrevExist=true *and* PrevIndex set ==> Update
  174. // TODO(jonboulle): is this expected?!
  175. {
  176. pb.Request{Method: "PUT", Id: 1, PrevExist: boolp(true), PrevIndex: 1},
  177. Response{Event: &store.Event{}},
  178. []action{
  179. action{
  180. name: "Update",
  181. params: []interface{}{"", "", time.Time{}},
  182. },
  183. },
  184. },
  185. // PUT with PrevExist=false *and* PrevIndex set ==> Create
  186. // TODO(jonboulle): is this expected?!
  187. {
  188. pb.Request{Method: "PUT", Id: 1, PrevExist: boolp(false), PrevIndex: 1},
  189. Response{Event: &store.Event{}},
  190. []action{
  191. action{
  192. name: "Create",
  193. params: []interface{}{"", false, "", false, time.Time{}},
  194. },
  195. },
  196. },
  197. // PUT with PrevIndex set ==> CompareAndSwap
  198. {
  199. pb.Request{Method: "PUT", Id: 1, PrevIndex: 1},
  200. Response{Event: &store.Event{}},
  201. []action{
  202. action{
  203. name: "CompareAndSwap",
  204. params: []interface{}{"", "", uint64(1), "", time.Time{}},
  205. },
  206. },
  207. },
  208. // PUT with PrevValue set ==> CompareAndSwap
  209. {
  210. pb.Request{Method: "PUT", Id: 1, PrevValue: "bar"},
  211. Response{Event: &store.Event{}},
  212. []action{
  213. action{
  214. name: "CompareAndSwap",
  215. params: []interface{}{"", "bar", uint64(0), "", time.Time{}},
  216. },
  217. },
  218. },
  219. // PUT with PrevIndex and PrevValue set ==> CompareAndSwap
  220. {
  221. pb.Request{Method: "PUT", Id: 1, PrevIndex: 1, PrevValue: "bar"},
  222. Response{Event: &store.Event{}},
  223. []action{
  224. action{
  225. name: "CompareAndSwap",
  226. params: []interface{}{"", "bar", uint64(1), "", time.Time{}},
  227. },
  228. },
  229. },
  230. // DELETE ==> Delete
  231. {
  232. pb.Request{Method: "DELETE", Id: 1},
  233. Response{Event: &store.Event{}},
  234. []action{
  235. action{
  236. name: "Delete",
  237. params: []interface{}{"", false, false},
  238. },
  239. },
  240. },
  241. // DELETE with PrevIndex set ==> CompareAndDelete
  242. {
  243. pb.Request{Method: "DELETE", Id: 1, PrevIndex: 1},
  244. Response{Event: &store.Event{}},
  245. []action{
  246. action{
  247. name: "CompareAndDelete",
  248. params: []interface{}{"", "", uint64(1)},
  249. },
  250. },
  251. },
  252. // DELETE with PrevValue set ==> CompareAndDelete
  253. {
  254. pb.Request{Method: "DELETE", Id: 1, PrevValue: "bar"},
  255. Response{Event: &store.Event{}},
  256. []action{
  257. action{
  258. name: "CompareAndDelete",
  259. params: []interface{}{"", "bar", uint64(0)},
  260. },
  261. },
  262. },
  263. // DELETE with PrevIndex *and* PrevValue set ==> CompareAndDelete
  264. {
  265. pb.Request{Method: "DELETE", Id: 1, PrevIndex: 5, PrevValue: "bar"},
  266. Response{Event: &store.Event{}},
  267. []action{
  268. action{
  269. name: "CompareAndDelete",
  270. params: []interface{}{"", "bar", uint64(5)},
  271. },
  272. },
  273. },
  274. // QGET ==> Get
  275. {
  276. pb.Request{Method: "QGET", Id: 1},
  277. Response{Event: &store.Event{}},
  278. []action{
  279. action{
  280. name: "Get",
  281. params: []interface{}{"", false, false},
  282. },
  283. },
  284. },
  285. // SYNC ==> DeleteExpiredKeys
  286. {
  287. pb.Request{Method: "SYNC", Id: 1},
  288. Response{},
  289. []action{
  290. action{
  291. name: "DeleteExpiredKeys",
  292. params: []interface{}{time.Unix(0, 0)},
  293. },
  294. },
  295. },
  296. {
  297. pb.Request{Method: "SYNC", Id: 1, Time: 12345},
  298. Response{},
  299. []action{
  300. action{
  301. name: "DeleteExpiredKeys",
  302. params: []interface{}{time.Unix(0, 12345)},
  303. },
  304. },
  305. },
  306. // Unknown method - error
  307. {
  308. pb.Request{Method: "BADMETHOD", Id: 1},
  309. Response{err: ErrUnknownMethod},
  310. []action{},
  311. },
  312. }
  313. for i, tt := range tests {
  314. st := &storeRecorder{}
  315. srv := &EtcdServer{Store: st}
  316. resp := srv.apply(tt.req)
  317. if !reflect.DeepEqual(resp, tt.wresp) {
  318. t.Errorf("#%d: resp = %+v, want %+v", i, resp, tt.wresp)
  319. }
  320. gaction := st.Action()
  321. if !reflect.DeepEqual(gaction, tt.wactions) {
  322. t.Errorf("#%d: action = %#v, want %#v", i, gaction, tt.wactions)
  323. }
  324. }
  325. }
  326. func TestClusterOf1(t *testing.T) { testServer(t, 1) }
  327. func TestClusterOf3(t *testing.T) { testServer(t, 3) }
  328. func testServer(t *testing.T, ns int64) {
  329. ctx, cancel := context.WithCancel(context.Background())
  330. defer cancel()
  331. ss := make([]*EtcdServer, ns)
  332. send := func(msgs []raftpb.Message) {
  333. for _, m := range msgs {
  334. t.Logf("m = %+v\n", m)
  335. ss[m.To-1].Node.Step(ctx, m)
  336. }
  337. }
  338. peers := make([]int64, ns)
  339. for i := int64(0); i < ns; i++ {
  340. peers[i] = i + 1
  341. }
  342. for i := int64(0); i < ns; i++ {
  343. id := i + 1
  344. n := raft.StartNode(id, peers, 10, 1)
  345. tk := time.NewTicker(10 * time.Millisecond)
  346. defer tk.Stop()
  347. srv := &EtcdServer{
  348. Node: n,
  349. Store: store.New(),
  350. Send: send,
  351. Storage: &storageRecorder{},
  352. Ticker: tk.C,
  353. }
  354. srv.Start()
  355. // TODO(xiangli): randomize election timeout
  356. // then remove this sleep.
  357. time.Sleep(1 * time.Millisecond)
  358. ss[i] = srv
  359. }
  360. for i := 1; i <= 10; i++ {
  361. r := pb.Request{
  362. Method: "PUT",
  363. Id: int64(i),
  364. Path: "/foo",
  365. Val: "bar",
  366. }
  367. j := rand.Intn(len(ss))
  368. t.Logf("ss = %d", j)
  369. resp, err := ss[j].Do(ctx, r)
  370. if err != nil {
  371. t.Fatal(err)
  372. }
  373. g, w := resp.Event.Node, &store.NodeExtern{
  374. Key: "/foo",
  375. ModifiedIndex: uint64(i),
  376. CreatedIndex: uint64(i),
  377. Value: stringp("bar"),
  378. }
  379. if !reflect.DeepEqual(g, w) {
  380. t.Error("value:", *g.Value)
  381. t.Errorf("g = %+v, w %+v", g, w)
  382. }
  383. }
  384. time.Sleep(10 * time.Millisecond)
  385. var last interface{}
  386. for i, sv := range ss {
  387. sv.Stop()
  388. g, _ := sv.Store.Get("/", true, true)
  389. if last != nil && !reflect.DeepEqual(last, g) {
  390. t.Errorf("server %d: Root = %#v, want %#v", i, g, last)
  391. }
  392. last = g
  393. }
  394. }
  395. func TestDoProposal(t *testing.T) {
  396. tests := []pb.Request{
  397. pb.Request{Method: "POST", Id: 1},
  398. pb.Request{Method: "PUT", Id: 1},
  399. pb.Request{Method: "DELETE", Id: 1},
  400. pb.Request{Method: "GET", Id: 1, Quorum: true},
  401. }
  402. for i, tt := range tests {
  403. ctx, _ := context.WithCancel(context.Background())
  404. n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1)
  405. st := &storeRecorder{}
  406. tk := make(chan time.Time)
  407. // this makes <-tk always successful, which accelerates internal clock
  408. close(tk)
  409. srv := &EtcdServer{
  410. Node: n,
  411. Store: st,
  412. Send: func(_ []raftpb.Message) {},
  413. Storage: &storageRecorder{},
  414. Ticker: tk,
  415. }
  416. srv.Start()
  417. resp, err := srv.Do(ctx, tt)
  418. srv.Stop()
  419. action := st.Action()
  420. if len(action) != 1 {
  421. t.Errorf("#%d: len(action) = %d, want 1", i, len(action))
  422. }
  423. if err != nil {
  424. t.Fatalf("#%d: err = %v, want nil", i, err)
  425. }
  426. wresp := Response{Event: &store.Event{}}
  427. if !reflect.DeepEqual(resp, wresp) {
  428. t.Errorf("#%d: resp = %v, want %v", i, resp, wresp)
  429. }
  430. }
  431. }
  432. func TestDoProposalCancelled(t *testing.T) {
  433. ctx, cancel := context.WithCancel(context.Background())
  434. // node cannot make any progress because there are two nodes
  435. n := raft.StartNode(0xBAD0, []int64{0xBAD0, 0xBAD1}, 10, 1)
  436. st := &storeRecorder{}
  437. wait := &waitRecorder{}
  438. srv := &EtcdServer{
  439. // TODO: use fake node for better testability
  440. Node: n,
  441. Store: st,
  442. w: wait,
  443. }
  444. done := make(chan struct{})
  445. var err error
  446. go func() {
  447. _, err = srv.Do(ctx, pb.Request{Method: "PUT", Id: 1})
  448. close(done)
  449. }()
  450. cancel()
  451. <-done
  452. gaction := st.Action()
  453. if len(gaction) != 0 {
  454. t.Errorf("len(action) = %v, want 0", len(gaction))
  455. }
  456. if err != context.Canceled {
  457. t.Fatalf("err = %v, want %v", err, context.Canceled)
  458. }
  459. w := []action{action{name: "Register1"}, action{name: "Trigger1"}}
  460. if !reflect.DeepEqual(wait.action, w) {
  461. t.Errorf("wait.action = %+v, want %+v", wait.action, w)
  462. }
  463. }
  464. func TestDoProposalStopped(t *testing.T) {
  465. ctx, cancel := context.WithCancel(context.Background())
  466. defer cancel()
  467. // node cannot make any progress because there are two nodes
  468. n := raft.StartNode(0xBAD0, []int64{0xBAD0, 0xBAD1}, 10, 1)
  469. st := &storeRecorder{}
  470. tk := make(chan time.Time)
  471. // this makes <-tk always successful, which accelarates internal clock
  472. close(tk)
  473. srv := &EtcdServer{
  474. // TODO: use fake node for better testability
  475. Node: n,
  476. Store: st,
  477. Send: func(_ []raftpb.Message) {},
  478. Storage: &storageRecorder{},
  479. Ticker: tk,
  480. }
  481. srv.Start()
  482. done := make(chan struct{})
  483. var err error
  484. go func() {
  485. _, err = srv.Do(ctx, pb.Request{Method: "PUT", Id: 1})
  486. close(done)
  487. }()
  488. srv.Stop()
  489. <-done
  490. action := st.Action()
  491. if len(action) != 0 {
  492. t.Errorf("len(action) = %v, want 0", len(action))
  493. }
  494. if err != ErrStopped {
  495. t.Errorf("err = %v, want %v", err, ErrStopped)
  496. }
  497. }
  498. // TestSync tests sync 1. is nonblocking 2. sends out SYNC request.
  499. func TestSync(t *testing.T) {
  500. n := &nodeProposeDataRecorder{}
  501. srv := &EtcdServer{
  502. Node: n,
  503. }
  504. start := time.Now()
  505. srv.sync(defaultSyncTimeout)
  506. // check that sync is non-blocking
  507. if d := time.Since(start); d > time.Millisecond {
  508. t.Errorf("CallSyncTime = %v, want < %v", d, time.Millisecond)
  509. }
  510. testutil.ForceGosched()
  511. data := n.data()
  512. if len(data) != 1 {
  513. t.Fatalf("len(proposeData) = %d, want 1", len(data))
  514. }
  515. var r pb.Request
  516. if err := r.Unmarshal(data[0]); err != nil {
  517. t.Fatalf("unmarshal request error: %v", err)
  518. }
  519. if r.Method != "SYNC" {
  520. t.Errorf("method = %s, want SYNC", r.Method)
  521. }
  522. }
  523. // TestSyncTimeout tests the case that sync 1. is non-blocking 2. cancel request
  524. // after timeout
  525. func TestSyncTimeout(t *testing.T) {
  526. n := &nodeProposalBlockerRecorder{}
  527. srv := &EtcdServer{
  528. Node: n,
  529. }
  530. start := time.Now()
  531. srv.sync(0)
  532. // check that sync is non-blocking
  533. if d := time.Since(start); d > time.Millisecond {
  534. t.Errorf("CallSyncTime = %v, want < %v", d, time.Millisecond)
  535. }
  536. // give time for goroutine in sync to cancel
  537. // TODO: use fake clock
  538. testutil.ForceGosched()
  539. w := []action{action{name: "Propose blocked"}}
  540. if g := n.Action(); !reflect.DeepEqual(g, w) {
  541. t.Errorf("action = %v, want %v", g, w)
  542. }
  543. }
  544. // TODO: TestNoSyncWhenNoLeader
  545. // blockingNodeProposer implements the node interface to allow users to
  546. // block until Propose has been called and then verify the Proposed data
  547. type blockingNodeProposer struct {
  548. ch chan []byte
  549. readyNode
  550. }
  551. func (n *blockingNodeProposer) Propose(_ context.Context, data []byte) error {
  552. n.ch <- data
  553. return nil
  554. }
  555. // TestSyncTrigger tests that the server proposes a SYNC request when its sync timer ticks
  556. func TestSyncTrigger(t *testing.T) {
  557. n := &blockingNodeProposer{
  558. ch: make(chan []byte),
  559. readyNode: *newReadyNode(),
  560. }
  561. st := make(chan time.Time, 1)
  562. srv := &EtcdServer{
  563. Node: n,
  564. Store: &storeRecorder{},
  565. Send: func(_ []raftpb.Message) {},
  566. Storage: &storageRecorder{},
  567. SyncTicker: st,
  568. }
  569. srv.Start()
  570. // trigger the server to become a leader and accept sync requests
  571. n.readyc <- raft.Ready{
  572. SoftState: &raft.SoftState{
  573. RaftState: raft.StateLeader,
  574. },
  575. }
  576. // trigger a sync request
  577. st <- time.Time{}
  578. var data []byte
  579. select {
  580. case <-time.After(time.Second):
  581. t.Fatalf("did not receive proposed request as expected!")
  582. case data = <-n.ch:
  583. }
  584. srv.Stop()
  585. var req pb.Request
  586. if err := req.Unmarshal(data); err != nil {
  587. t.Fatalf("error unmarshalling data: %v", err)
  588. }
  589. if req.Method != "SYNC" {
  590. t.Fatalf("unexpected proposed request: %#v", req.Method)
  591. }
  592. }
  593. // snapshot should snapshot the store and cut the persistent
  594. // TODO: node.Compact is called... we need to make the node an interface
  595. func TestSnapshot(t *testing.T) {
  596. n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1)
  597. defer n.Stop()
  598. st := &storeRecorder{}
  599. p := &storageRecorder{}
  600. s := &EtcdServer{
  601. Store: st,
  602. Storage: p,
  603. Node: n,
  604. }
  605. s.snapshot()
  606. gaction := st.Action()
  607. if len(gaction) != 1 {
  608. t.Fatalf("len(action) = %d, want 1", len(gaction))
  609. }
  610. if !reflect.DeepEqual(gaction[0], action{name: "Save"}) {
  611. t.Errorf("action = %s, want Save", gaction[0])
  612. }
  613. gaction = p.Action()
  614. if len(gaction) != 1 {
  615. t.Fatalf("len(action) = %d, want 1", len(gaction))
  616. }
  617. if !reflect.DeepEqual(gaction[0], action{name: "Cut"}) {
  618. t.Errorf("action = %s, want Cut", gaction[0])
  619. }
  620. }
  621. // Applied > SnapCount should trigger a SaveSnap event
  622. func TestTriggerSnap(t *testing.T) {
  623. ctx := context.Background()
  624. n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1)
  625. n.Campaign(ctx)
  626. st := &storeRecorder{}
  627. p := &storageRecorder{}
  628. s := &EtcdServer{
  629. Store: st,
  630. Send: func(_ []raftpb.Message) {},
  631. Storage: p,
  632. Node: n,
  633. SnapCount: 10,
  634. }
  635. s.Start()
  636. for i := 0; int64(i) < s.SnapCount; i++ {
  637. s.Do(ctx, pb.Request{Method: "PUT", Id: 1})
  638. }
  639. time.Sleep(time.Millisecond)
  640. s.Stop()
  641. gaction := p.Action()
  642. // each operation is recorded as a Save
  643. // Nop + SnapCount * Puts + Cut + SaveSnap = Save + SnapCount * Save + Cut + SaveSnap
  644. if len(gaction) != 3+int(s.SnapCount) {
  645. t.Fatalf("len(action) = %d, want %d", len(gaction), 3+int(s.SnapCount))
  646. }
  647. if !reflect.DeepEqual(gaction[12], action{name: "SaveSnap"}) {
  648. t.Errorf("action = %s, want SaveSnap", gaction[12])
  649. }
  650. }
  651. // TestRecvSnapshot tests when it receives a snapshot from raft leader,
  652. // it should trigger storage.SaveSnap and also store.Recover.
  653. func TestRecvSnapshot(t *testing.T) {
  654. n := newReadyNode()
  655. st := &storeRecorder{}
  656. p := &storageRecorder{}
  657. s := &EtcdServer{
  658. Store: st,
  659. Send: func(_ []raftpb.Message) {},
  660. Storage: p,
  661. Node: n,
  662. }
  663. s.Start()
  664. n.readyc <- raft.Ready{Snapshot: raftpb.Snapshot{Index: 1}}
  665. // make goroutines move forward to receive snapshot
  666. testutil.ForceGosched()
  667. s.Stop()
  668. wactions := []action{action{name: "Recovery"}}
  669. if g := st.Action(); !reflect.DeepEqual(g, wactions) {
  670. t.Errorf("store action = %v, want %v", g, wactions)
  671. }
  672. wactions = []action{action{name: "Save"}, action{name: "SaveSnap"}}
  673. if g := p.Action(); !reflect.DeepEqual(g, wactions) {
  674. t.Errorf("storage action = %v, want %v", g, wactions)
  675. }
  676. }
  677. // TestRecvSlowSnapshot tests that slow snapshot will not be applied
  678. // to store.
  679. func TestRecvSlowSnapshot(t *testing.T) {
  680. n := newReadyNode()
  681. st := &storeRecorder{}
  682. s := &EtcdServer{
  683. Store: st,
  684. Send: func(_ []raftpb.Message) {},
  685. Storage: &storageRecorder{},
  686. Node: n,
  687. }
  688. s.Start()
  689. n.readyc <- raft.Ready{Snapshot: raftpb.Snapshot{Index: 1}}
  690. // make goroutines move forward to receive snapshot
  691. testutil.ForceGosched()
  692. action := st.Action()
  693. n.readyc <- raft.Ready{Snapshot: raftpb.Snapshot{Index: 1}}
  694. // make goroutines move forward to receive snapshot
  695. testutil.ForceGosched()
  696. s.Stop()
  697. if g := st.Action(); !reflect.DeepEqual(g, action) {
  698. t.Errorf("store action = %v, want %v", g, action)
  699. }
  700. }
  701. // TestAddNode tests AddNode can propose and perform node addition.
  702. func TestAddNode(t *testing.T) {
  703. n := newNodeConfChangeCommitterRecorder()
  704. s := &EtcdServer{
  705. Node: n,
  706. Store: &storeRecorder{},
  707. Send: func(_ []raftpb.Message) {},
  708. Storage: &storageRecorder{},
  709. }
  710. s.Start()
  711. s.AddNode(context.TODO(), 1, []byte("foo"))
  712. gaction := n.Action()
  713. s.Stop()
  714. wactions := []action{action{name: "ProposeConfChange:ConfChangeAddNode"}, action{name: "ApplyConfChange:ConfChangeAddNode"}}
  715. if !reflect.DeepEqual(gaction, wactions) {
  716. t.Errorf("action = %v, want %v", gaction, wactions)
  717. }
  718. }
  719. // TestRemoveNode tests RemoveNode can propose and perform node removal.
  720. func TestRemoveNode(t *testing.T) {
  721. n := newNodeConfChangeCommitterRecorder()
  722. s := &EtcdServer{
  723. Node: n,
  724. Store: &storeRecorder{},
  725. Send: func(_ []raftpb.Message) {},
  726. Storage: &storageRecorder{},
  727. }
  728. s.Start()
  729. s.RemoveNode(context.TODO(), 1)
  730. gaction := n.Action()
  731. s.Stop()
  732. wactions := []action{action{name: "ProposeConfChange:ConfChangeRemoveNode"}, action{name: "ApplyConfChange:ConfChangeRemoveNode"}}
  733. if !reflect.DeepEqual(gaction, wactions) {
  734. t.Errorf("action = %v, want %v", gaction, wactions)
  735. }
  736. }
  737. // TODO: test wait trigger correctness in multi-server case
  738. func TestGetBool(t *testing.T) {
  739. tests := []struct {
  740. b *bool
  741. wb bool
  742. wset bool
  743. }{
  744. {nil, false, false},
  745. {boolp(true), true, true},
  746. {boolp(false), false, true},
  747. }
  748. for i, tt := range tests {
  749. b, set := getBool(tt.b)
  750. if b != tt.wb {
  751. t.Errorf("#%d: value = %v, want %v", i, b, tt.wb)
  752. }
  753. if set != tt.wset {
  754. t.Errorf("#%d: set = %v, want %v", i, set, tt.wset)
  755. }
  756. }
  757. }
  758. func TestGenID(t *testing.T) {
  759. // Sanity check that the GenID function has been seeded appropriately
  760. // (math/rand is seeded with 1 by default)
  761. r := rand.NewSource(int64(1))
  762. var n int64
  763. for n == 0 {
  764. n = r.Int63()
  765. }
  766. if n == GenID() {
  767. t.Fatalf("GenID's rand seeded with 1!")
  768. }
  769. }
  770. type action struct {
  771. name string
  772. params []interface{}
  773. }
  774. type recorder struct {
  775. sync.Mutex
  776. actions []action
  777. }
  778. func (r *recorder) record(a action) {
  779. r.Lock()
  780. r.actions = append(r.actions, a)
  781. r.Unlock()
  782. }
  783. func (r *recorder) Action() []action {
  784. r.Lock()
  785. cpy := make([]action, len(r.actions))
  786. copy(cpy, r.actions)
  787. r.Unlock()
  788. return cpy
  789. }
  790. type storeRecorder struct {
  791. recorder
  792. }
  793. func (s *storeRecorder) Version() int { return 0 }
  794. func (s *storeRecorder) Index() uint64 { return 0 }
  795. func (s *storeRecorder) Get(path string, recursive, sorted bool) (*store.Event, error) {
  796. s.record(action{
  797. name: "Get",
  798. params: []interface{}{path, recursive, sorted},
  799. })
  800. return &store.Event{}, nil
  801. }
  802. func (s *storeRecorder) Set(_ string, _ bool, _ string, _ time.Time) (*store.Event, error) {
  803. s.record(action{name: "Set"})
  804. return &store.Event{}, nil
  805. }
  806. func (s *storeRecorder) Update(path, val string, expr time.Time) (*store.Event, error) {
  807. s.record(action{
  808. name: "Update",
  809. params: []interface{}{path, val, expr},
  810. })
  811. return &store.Event{}, nil
  812. }
  813. func (s *storeRecorder) Create(path string, dir bool, val string, uniq bool, exp time.Time) (*store.Event, error) {
  814. s.record(action{
  815. name: "Create",
  816. params: []interface{}{path, dir, val, uniq, exp},
  817. })
  818. return &store.Event{}, nil
  819. }
  820. func (s *storeRecorder) CompareAndSwap(path, prevVal string, prevIdx uint64, val string, expr time.Time) (*store.Event, error) {
  821. s.record(action{
  822. name: "CompareAndSwap",
  823. params: []interface{}{path, prevVal, prevIdx, val, expr},
  824. })
  825. return &store.Event{}, nil
  826. }
  827. func (s *storeRecorder) Delete(path string, dir, recursive bool) (*store.Event, error) {
  828. s.record(action{
  829. name: "Delete",
  830. params: []interface{}{path, dir, recursive},
  831. })
  832. return &store.Event{}, nil
  833. }
  834. func (s *storeRecorder) CompareAndDelete(path, prevVal string, prevIdx uint64) (*store.Event, error) {
  835. s.record(action{
  836. name: "CompareAndDelete",
  837. params: []interface{}{path, prevVal, prevIdx},
  838. })
  839. return &store.Event{}, nil
  840. }
  841. func (s *storeRecorder) Watch(_ string, _, _ bool, _ uint64) (store.Watcher, error) {
  842. s.record(action{name: "Watch"})
  843. return &stubWatcher{}, nil
  844. }
  845. func (s *storeRecorder) Save() ([]byte, error) {
  846. s.record(action{name: "Save"})
  847. return nil, nil
  848. }
  849. func (s *storeRecorder) Recovery(b []byte) error {
  850. s.record(action{name: "Recovery"})
  851. return nil
  852. }
  853. func (s *storeRecorder) TotalTransactions() uint64 { return 0 }
  854. func (s *storeRecorder) JsonStats() []byte { return nil }
  855. func (s *storeRecorder) DeleteExpiredKeys(cutoff time.Time) {
  856. s.record(action{
  857. name: "DeleteExpiredKeys",
  858. params: []interface{}{cutoff},
  859. })
  860. }
  861. type stubWatcher struct{}
  862. func (w *stubWatcher) EventChan() chan *store.Event { return nil }
  863. func (w *stubWatcher) Remove() {}
  864. // errStoreRecorder returns an store error on Get, Watch request
  865. type errStoreRecorder struct {
  866. storeRecorder
  867. err error
  868. }
  869. func (s *errStoreRecorder) Get(_ string, _, _ bool) (*store.Event, error) {
  870. s.record(action{name: "Get"})
  871. return nil, s.err
  872. }
  873. func (s *errStoreRecorder) Watch(_ string, _, _ bool, _ uint64) (store.Watcher, error) {
  874. s.record(action{name: "Watch"})
  875. return nil, s.err
  876. }
  877. type waitRecorder struct {
  878. action []action
  879. }
  880. func (w *waitRecorder) Register(id int64) <-chan interface{} {
  881. w.action = append(w.action, action{name: fmt.Sprint("Register", id)})
  882. return nil
  883. }
  884. func (w *waitRecorder) Trigger(id int64, x interface{}) {
  885. w.action = append(w.action, action{name: fmt.Sprint("Trigger", id)})
  886. }
  887. func boolp(b bool) *bool { return &b }
  888. func stringp(s string) *string { return &s }
  889. type storageRecorder struct {
  890. recorder
  891. }
  892. func (p *storageRecorder) Save(st raftpb.HardState, ents []raftpb.Entry) {
  893. p.record(action{name: "Save"})
  894. }
  895. func (p *storageRecorder) Cut() error {
  896. p.record(action{name: "Cut"})
  897. return nil
  898. }
  899. func (p *storageRecorder) SaveSnap(st raftpb.Snapshot) {
  900. if raft.IsEmptySnap(st) {
  901. return
  902. }
  903. p.record(action{name: "SaveSnap"})
  904. }
  905. type readyNode struct {
  906. readyc chan raft.Ready
  907. }
  908. func newReadyNode() *readyNode {
  909. readyc := make(chan raft.Ready, 1)
  910. return &readyNode{readyc: readyc}
  911. }
  912. func (n *readyNode) Tick() {}
  913. func (n *readyNode) Campaign(ctx context.Context) error { return nil }
  914. func (n *readyNode) Propose(ctx context.Context, data []byte) error { return nil }
  915. func (n *readyNode) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error {
  916. return nil
  917. }
  918. func (n *readyNode) Step(ctx context.Context, msg raftpb.Message) error { return nil }
  919. func (n *readyNode) Ready() <-chan raft.Ready { return n.readyc }
  920. func (n *readyNode) ApplyConfChange(conf raftpb.ConfChange) {}
  921. func (n *readyNode) Stop() {}
  922. func (n *readyNode) Compact(d []byte) {}
  923. type nodeRecorder struct {
  924. recorder
  925. }
  926. func (n *nodeRecorder) Tick() {
  927. n.record(action{name: "Tick"})
  928. }
  929. func (n *nodeRecorder) Campaign(ctx context.Context) error {
  930. n.record(action{name: "Campaign"})
  931. return nil
  932. }
  933. func (n *nodeRecorder) Propose(ctx context.Context, data []byte) error {
  934. n.record(action{name: "Propose"})
  935. return nil
  936. }
  937. func (n *nodeRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error {
  938. n.record(action{name: "ProposeConfChange"})
  939. return nil
  940. }
  941. func (n *nodeRecorder) Step(ctx context.Context, msg raftpb.Message) error {
  942. n.record(action{name: "Step"})
  943. return nil
  944. }
  945. func (n *nodeRecorder) Ready() <-chan raft.Ready { return nil }
  946. func (n *nodeRecorder) ApplyConfChange(conf raftpb.ConfChange) {
  947. n.record(action{name: "ApplyConfChange"})
  948. }
  949. func (n *nodeRecorder) Stop() {
  950. n.record(action{name: "Stop"})
  951. }
  952. func (n *nodeRecorder) Compact(d []byte) {
  953. n.record(action{name: "Compact"})
  954. }
  955. type nodeProposeDataRecorder struct {
  956. nodeRecorder
  957. sync.Mutex
  958. d [][]byte
  959. }
  960. func (n *nodeProposeDataRecorder) data() [][]byte {
  961. n.Lock()
  962. d := n.d
  963. n.Unlock()
  964. return d
  965. }
  966. func (n *nodeProposeDataRecorder) Propose(ctx context.Context, data []byte) error {
  967. n.nodeRecorder.Propose(ctx, data)
  968. n.Lock()
  969. n.d = append(n.d, data)
  970. n.Unlock()
  971. return nil
  972. }
  973. type nodeProposalBlockerRecorder struct {
  974. nodeRecorder
  975. }
  976. func (n *nodeProposalBlockerRecorder) Propose(ctx context.Context, data []byte) error {
  977. <-ctx.Done()
  978. n.record(action{name: "Propose blocked"})
  979. return nil
  980. }
  981. type nodeConfChangeCommitterRecorder struct {
  982. nodeRecorder
  983. readyc chan raft.Ready
  984. }
  985. func newNodeConfChangeCommitterRecorder() *nodeConfChangeCommitterRecorder {
  986. readyc := make(chan raft.Ready, 1)
  987. readyc <- raft.Ready{SoftState: &raft.SoftState{RaftState: raft.StateLeader}}
  988. return &nodeConfChangeCommitterRecorder{readyc: readyc}
  989. }
  990. func (n *nodeConfChangeCommitterRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error {
  991. data, err := conf.Marshal()
  992. if err != nil {
  993. return err
  994. }
  995. n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Type: raftpb.EntryConfChange, Data: data}}}
  996. n.record(action{name: "ProposeConfChange:" + conf.Type.String()})
  997. return nil
  998. }
  999. func (n *nodeConfChangeCommitterRecorder) Ready() <-chan raft.Ready {
  1000. return n.readyc
  1001. }
  1002. func (n *nodeConfChangeCommitterRecorder) ApplyConfChange(conf raftpb.ConfChange) {
  1003. n.record(action{name: "ApplyConfChange:" + conf.Type.String()})
  1004. }