server_test.go 37 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435
  1. // Copyright 2015 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package etcdserver
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "io/ioutil"
  19. "os"
  20. "path"
  21. "reflect"
  22. "strconv"
  23. "testing"
  24. "time"
  25. "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
  26. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  27. "github.com/coreos/etcd/pkg/idutil"
  28. "github.com/coreos/etcd/pkg/pbutil"
  29. "github.com/coreos/etcd/pkg/testutil"
  30. "github.com/coreos/etcd/pkg/types"
  31. "github.com/coreos/etcd/pkg/wait"
  32. "github.com/coreos/etcd/raft"
  33. "github.com/coreos/etcd/raft/raftpb"
  34. "github.com/coreos/etcd/rafthttp"
  35. dstorage "github.com/coreos/etcd/storage"
  36. "github.com/coreos/etcd/storage/backend"
  37. "github.com/coreos/etcd/store"
  38. )
  39. // TestDoLocalAction tests requests which do not need to go through raft to be applied,
  40. // and are served through local data.
  41. func TestDoLocalAction(t *testing.T) {
  42. tests := []struct {
  43. req pb.Request
  44. wresp Response
  45. werr error
  46. wactions []testutil.Action
  47. }{
  48. {
  49. pb.Request{Method: "GET", ID: 1, Wait: true},
  50. Response{Watcher: store.NewNopWatcher()}, nil, []testutil.Action{{Name: "Watch"}},
  51. },
  52. {
  53. pb.Request{Method: "GET", ID: 1},
  54. Response{Event: &store.Event{}}, nil,
  55. []testutil.Action{
  56. {
  57. Name: "Get",
  58. Params: []interface{}{"", false, false},
  59. },
  60. },
  61. },
  62. {
  63. pb.Request{Method: "HEAD", ID: 1},
  64. Response{Event: &store.Event{}}, nil,
  65. []testutil.Action{
  66. {
  67. Name: "Get",
  68. Params: []interface{}{"", false, false},
  69. },
  70. },
  71. },
  72. {
  73. pb.Request{Method: "BADMETHOD", ID: 1},
  74. Response{}, ErrUnknownMethod, []testutil.Action{},
  75. },
  76. }
  77. for i, tt := range tests {
  78. st := store.NewRecorder()
  79. srv := &EtcdServer{
  80. store: st,
  81. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  82. }
  83. resp, err := srv.Do(context.TODO(), tt.req)
  84. if err != tt.werr {
  85. t.Fatalf("#%d: err = %+v, want %+v", i, err, tt.werr)
  86. }
  87. if !reflect.DeepEqual(resp, tt.wresp) {
  88. t.Errorf("#%d: resp = %+v, want %+v", i, resp, tt.wresp)
  89. }
  90. gaction := st.Action()
  91. if !reflect.DeepEqual(gaction, tt.wactions) {
  92. t.Errorf("#%d: action = %+v, want %+v", i, gaction, tt.wactions)
  93. }
  94. }
  95. }
  96. // TestDoBadLocalAction tests server requests which do not need to go through consensus,
  97. // and return errors when they fetch from local data.
  98. func TestDoBadLocalAction(t *testing.T) {
  99. storeErr := fmt.Errorf("bah")
  100. tests := []struct {
  101. req pb.Request
  102. wactions []testutil.Action
  103. }{
  104. {
  105. pb.Request{Method: "GET", ID: 1, Wait: true},
  106. []testutil.Action{{Name: "Watch"}},
  107. },
  108. {
  109. pb.Request{Method: "GET", ID: 1},
  110. []testutil.Action{
  111. {
  112. Name: "Get",
  113. Params: []interface{}{"", false, false},
  114. },
  115. },
  116. },
  117. {
  118. pb.Request{Method: "HEAD", ID: 1},
  119. []testutil.Action{
  120. {
  121. Name: "Get",
  122. Params: []interface{}{"", false, false},
  123. },
  124. },
  125. },
  126. }
  127. for i, tt := range tests {
  128. st := store.NewErrRecorder(storeErr)
  129. srv := &EtcdServer{
  130. store: st,
  131. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  132. }
  133. resp, err := srv.Do(context.Background(), tt.req)
  134. if err != storeErr {
  135. t.Fatalf("#%d: err = %+v, want %+v", i, err, storeErr)
  136. }
  137. if !reflect.DeepEqual(resp, Response{}) {
  138. t.Errorf("#%d: resp = %+v, want %+v", i, resp, Response{})
  139. }
  140. gaction := st.Action()
  141. if !reflect.DeepEqual(gaction, tt.wactions) {
  142. t.Errorf("#%d: action = %+v, want %+v", i, gaction, tt.wactions)
  143. }
  144. }
  145. }
  146. func TestApplyRequest(t *testing.T) {
  147. tests := []struct {
  148. req pb.Request
  149. wresp Response
  150. wactions []testutil.Action
  151. }{
  152. // POST ==> Create
  153. {
  154. pb.Request{Method: "POST", ID: 1},
  155. Response{Event: &store.Event{}},
  156. []testutil.Action{
  157. {
  158. Name: "Create",
  159. Params: []interface{}{"", false, "", true, time.Time{}},
  160. },
  161. },
  162. },
  163. // POST ==> Create, with expiration
  164. {
  165. pb.Request{Method: "POST", ID: 1, Expiration: 1337},
  166. Response{Event: &store.Event{}},
  167. []testutil.Action{
  168. {
  169. Name: "Create",
  170. Params: []interface{}{"", false, "", true, time.Unix(0, 1337)},
  171. },
  172. },
  173. },
  174. // POST ==> Create, with dir
  175. {
  176. pb.Request{Method: "POST", ID: 1, Dir: true},
  177. Response{Event: &store.Event{}},
  178. []testutil.Action{
  179. {
  180. Name: "Create",
  181. Params: []interface{}{"", true, "", true, time.Time{}},
  182. },
  183. },
  184. },
  185. // PUT ==> Set
  186. {
  187. pb.Request{Method: "PUT", ID: 1},
  188. Response{Event: &store.Event{}},
  189. []testutil.Action{
  190. {
  191. Name: "Set",
  192. Params: []interface{}{"", false, "", time.Time{}},
  193. },
  194. },
  195. },
  196. // PUT ==> Set, with dir
  197. {
  198. pb.Request{Method: "PUT", ID: 1, Dir: true},
  199. Response{Event: &store.Event{}},
  200. []testutil.Action{
  201. {
  202. Name: "Set",
  203. Params: []interface{}{"", true, "", time.Time{}},
  204. },
  205. },
  206. },
  207. // PUT with PrevExist=true ==> Update
  208. {
  209. pb.Request{Method: "PUT", ID: 1, PrevExist: pbutil.Boolp(true)},
  210. Response{Event: &store.Event{}},
  211. []testutil.Action{
  212. {
  213. Name: "Update",
  214. Params: []interface{}{"", "", time.Time{}},
  215. },
  216. },
  217. },
  218. // PUT with PrevExist=false ==> Create
  219. {
  220. pb.Request{Method: "PUT", ID: 1, PrevExist: pbutil.Boolp(false)},
  221. Response{Event: &store.Event{}},
  222. []testutil.Action{
  223. {
  224. Name: "Create",
  225. Params: []interface{}{"", false, "", false, time.Time{}},
  226. },
  227. },
  228. },
  229. // PUT with PrevExist=true *and* PrevIndex set ==> CompareAndSwap
  230. {
  231. pb.Request{Method: "PUT", ID: 1, PrevExist: pbutil.Boolp(true), PrevIndex: 1},
  232. Response{Event: &store.Event{}},
  233. []testutil.Action{
  234. {
  235. Name: "CompareAndSwap",
  236. Params: []interface{}{"", "", uint64(1), "", time.Time{}},
  237. },
  238. },
  239. },
  240. // PUT with PrevExist=false *and* PrevIndex set ==> Create
  241. {
  242. pb.Request{Method: "PUT", ID: 1, PrevExist: pbutil.Boolp(false), PrevIndex: 1},
  243. Response{Event: &store.Event{}},
  244. []testutil.Action{
  245. {
  246. Name: "Create",
  247. Params: []interface{}{"", false, "", false, time.Time{}},
  248. },
  249. },
  250. },
  251. // PUT with PrevIndex set ==> CompareAndSwap
  252. {
  253. pb.Request{Method: "PUT", ID: 1, PrevIndex: 1},
  254. Response{Event: &store.Event{}},
  255. []testutil.Action{
  256. {
  257. Name: "CompareAndSwap",
  258. Params: []interface{}{"", "", uint64(1), "", time.Time{}},
  259. },
  260. },
  261. },
  262. // PUT with PrevValue set ==> CompareAndSwap
  263. {
  264. pb.Request{Method: "PUT", ID: 1, PrevValue: "bar"},
  265. Response{Event: &store.Event{}},
  266. []testutil.Action{
  267. {
  268. Name: "CompareAndSwap",
  269. Params: []interface{}{"", "bar", uint64(0), "", time.Time{}},
  270. },
  271. },
  272. },
  273. // PUT with PrevIndex and PrevValue set ==> CompareAndSwap
  274. {
  275. pb.Request{Method: "PUT", ID: 1, PrevIndex: 1, PrevValue: "bar"},
  276. Response{Event: &store.Event{}},
  277. []testutil.Action{
  278. {
  279. Name: "CompareAndSwap",
  280. Params: []interface{}{"", "bar", uint64(1), "", time.Time{}},
  281. },
  282. },
  283. },
  284. // DELETE ==> Delete
  285. {
  286. pb.Request{Method: "DELETE", ID: 1},
  287. Response{Event: &store.Event{}},
  288. []testutil.Action{
  289. {
  290. Name: "Delete",
  291. Params: []interface{}{"", false, false},
  292. },
  293. },
  294. },
  295. // DELETE with PrevIndex set ==> CompareAndDelete
  296. {
  297. pb.Request{Method: "DELETE", ID: 1, PrevIndex: 1},
  298. Response{Event: &store.Event{}},
  299. []testutil.Action{
  300. {
  301. Name: "CompareAndDelete",
  302. Params: []interface{}{"", "", uint64(1)},
  303. },
  304. },
  305. },
  306. // DELETE with PrevValue set ==> CompareAndDelete
  307. {
  308. pb.Request{Method: "DELETE", ID: 1, PrevValue: "bar"},
  309. Response{Event: &store.Event{}},
  310. []testutil.Action{
  311. {
  312. Name: "CompareAndDelete",
  313. Params: []interface{}{"", "bar", uint64(0)},
  314. },
  315. },
  316. },
  317. // DELETE with PrevIndex *and* PrevValue set ==> CompareAndDelete
  318. {
  319. pb.Request{Method: "DELETE", ID: 1, PrevIndex: 5, PrevValue: "bar"},
  320. Response{Event: &store.Event{}},
  321. []testutil.Action{
  322. {
  323. Name: "CompareAndDelete",
  324. Params: []interface{}{"", "bar", uint64(5)},
  325. },
  326. },
  327. },
  328. // QGET ==> Get
  329. {
  330. pb.Request{Method: "QGET", ID: 1},
  331. Response{Event: &store.Event{}},
  332. []testutil.Action{
  333. {
  334. Name: "Get",
  335. Params: []interface{}{"", false, false},
  336. },
  337. },
  338. },
  339. // SYNC ==> DeleteExpiredKeys
  340. {
  341. pb.Request{Method: "SYNC", ID: 1},
  342. Response{},
  343. []testutil.Action{
  344. {
  345. Name: "DeleteExpiredKeys",
  346. Params: []interface{}{time.Unix(0, 0)},
  347. },
  348. },
  349. },
  350. {
  351. pb.Request{Method: "SYNC", ID: 1, Time: 12345},
  352. Response{},
  353. []testutil.Action{
  354. {
  355. Name: "DeleteExpiredKeys",
  356. Params: []interface{}{time.Unix(0, 12345)},
  357. },
  358. },
  359. },
  360. // Unknown method - error
  361. {
  362. pb.Request{Method: "BADMETHOD", ID: 1},
  363. Response{err: ErrUnknownMethod},
  364. []testutil.Action{},
  365. },
  366. }
  367. for i, tt := range tests {
  368. st := store.NewRecorder()
  369. srv := &EtcdServer{store: st}
  370. resp := srv.applyRequest(tt.req)
  371. if !reflect.DeepEqual(resp, tt.wresp) {
  372. t.Errorf("#%d: resp = %+v, want %+v", i, resp, tt.wresp)
  373. }
  374. gaction := st.Action()
  375. if !reflect.DeepEqual(gaction, tt.wactions) {
  376. t.Errorf("#%d: action = %#v, want %#v", i, gaction, tt.wactions)
  377. }
  378. }
  379. }
  380. func TestApplyRequestOnAdminMemberAttributes(t *testing.T) {
  381. cl := newTestCluster([]*Member{{ID: 1}})
  382. srv := &EtcdServer{
  383. store: store.NewRecorder(),
  384. cluster: cl,
  385. }
  386. req := pb.Request{
  387. Method: "PUT",
  388. ID: 1,
  389. Path: path.Join(storeMembersPrefix, strconv.FormatUint(1, 16), attributesSuffix),
  390. Val: `{"Name":"abc","ClientURLs":["http://127.0.0.1:2379"]}`,
  391. }
  392. srv.applyRequest(req)
  393. w := Attributes{Name: "abc", ClientURLs: []string{"http://127.0.0.1:2379"}}
  394. if g := cl.Member(1).Attributes; !reflect.DeepEqual(g, w) {
  395. t.Errorf("attributes = %v, want %v", g, w)
  396. }
  397. }
  398. func TestApplyConfChangeError(t *testing.T) {
  399. cl := newCluster("")
  400. cl.SetStore(store.New())
  401. for i := 1; i <= 4; i++ {
  402. cl.AddMember(&Member{ID: types.ID(i)})
  403. }
  404. cl.RemoveMember(4)
  405. tests := []struct {
  406. cc raftpb.ConfChange
  407. werr error
  408. }{
  409. {
  410. raftpb.ConfChange{
  411. Type: raftpb.ConfChangeAddNode,
  412. NodeID: 4,
  413. },
  414. ErrIDRemoved,
  415. },
  416. {
  417. raftpb.ConfChange{
  418. Type: raftpb.ConfChangeUpdateNode,
  419. NodeID: 4,
  420. },
  421. ErrIDRemoved,
  422. },
  423. {
  424. raftpb.ConfChange{
  425. Type: raftpb.ConfChangeAddNode,
  426. NodeID: 1,
  427. },
  428. ErrIDExists,
  429. },
  430. {
  431. raftpb.ConfChange{
  432. Type: raftpb.ConfChangeRemoveNode,
  433. NodeID: 5,
  434. },
  435. ErrIDNotFound,
  436. },
  437. }
  438. for i, tt := range tests {
  439. n := newNodeRecorder()
  440. srv := &EtcdServer{
  441. r: raftNode{Node: n},
  442. cluster: cl,
  443. cfg: &ServerConfig{},
  444. }
  445. _, err := srv.applyConfChange(tt.cc, nil)
  446. if err != tt.werr {
  447. t.Errorf("#%d: applyConfChange error = %v, want %v", i, err, tt.werr)
  448. }
  449. cc := raftpb.ConfChange{Type: tt.cc.Type, NodeID: raft.None}
  450. w := []testutil.Action{
  451. {
  452. Name: "ApplyConfChange",
  453. Params: []interface{}{cc},
  454. },
  455. }
  456. if g, _ := n.Wait(1); !reflect.DeepEqual(g, w) {
  457. t.Errorf("#%d: action = %+v, want %+v", i, g, w)
  458. }
  459. }
  460. }
  461. func TestApplyConfChangeShouldStop(t *testing.T) {
  462. cl := newCluster("")
  463. cl.SetStore(store.New())
  464. for i := 1; i <= 3; i++ {
  465. cl.AddMember(&Member{ID: types.ID(i)})
  466. }
  467. srv := &EtcdServer{
  468. id: 1,
  469. r: raftNode{
  470. Node: newNodeNop(),
  471. transport: rafthttp.NewNopTransporter(),
  472. },
  473. cluster: cl,
  474. }
  475. cc := raftpb.ConfChange{
  476. Type: raftpb.ConfChangeRemoveNode,
  477. NodeID: 2,
  478. }
  479. // remove non-local member
  480. shouldStop, err := srv.applyConfChange(cc, &raftpb.ConfState{})
  481. if err != nil {
  482. t.Fatalf("unexpected error %v", err)
  483. }
  484. if shouldStop != false {
  485. t.Errorf("shouldStop = %t, want %t", shouldStop, false)
  486. }
  487. // remove local member
  488. cc.NodeID = 1
  489. shouldStop, err = srv.applyConfChange(cc, &raftpb.ConfState{})
  490. if err != nil {
  491. t.Fatalf("unexpected error %v", err)
  492. }
  493. if shouldStop != true {
  494. t.Errorf("shouldStop = %t, want %t", shouldStop, true)
  495. }
  496. }
  497. // TestApplyMultiConfChangeShouldStop ensures that apply will return shouldStop
  498. // if the local member is removed along with other conf updates.
  499. func TestApplyMultiConfChangeShouldStop(t *testing.T) {
  500. cl := newCluster("")
  501. cl.SetStore(store.New())
  502. for i := 1; i <= 5; i++ {
  503. cl.AddMember(&Member{ID: types.ID(i)})
  504. }
  505. srv := &EtcdServer{
  506. id: 2,
  507. r: raftNode{
  508. Node: newNodeNop(),
  509. transport: rafthttp.NewNopTransporter(),
  510. },
  511. cluster: cl,
  512. w: wait.New(),
  513. }
  514. ents := []raftpb.Entry{}
  515. for i := 1; i <= 4; i++ {
  516. ent := raftpb.Entry{
  517. Term: 1,
  518. Index: uint64(i),
  519. Type: raftpb.EntryConfChange,
  520. Data: pbutil.MustMarshal(
  521. &raftpb.ConfChange{
  522. Type: raftpb.ConfChangeRemoveNode,
  523. NodeID: uint64(i)}),
  524. }
  525. ents = append(ents, ent)
  526. }
  527. _, shouldStop := srv.apply(ents, &raftpb.ConfState{})
  528. if shouldStop == false {
  529. t.Errorf("shouldStop = %t, want %t", shouldStop, true)
  530. }
  531. }
  532. func TestDoProposal(t *testing.T) {
  533. tests := []pb.Request{
  534. {Method: "POST", ID: 1},
  535. {Method: "PUT", ID: 1},
  536. {Method: "DELETE", ID: 1},
  537. {Method: "GET", ID: 1, Quorum: true},
  538. }
  539. for i, tt := range tests {
  540. st := store.NewRecorder()
  541. srv := &EtcdServer{
  542. cfg: &ServerConfig{TickMs: 1},
  543. r: raftNode{
  544. Node: newNodeCommitter(),
  545. storage: &storageRecorder{},
  546. raftStorage: raft.NewMemoryStorage(),
  547. transport: rafthttp.NewNopTransporter(),
  548. },
  549. store: st,
  550. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  551. }
  552. srv.start()
  553. resp, err := srv.Do(context.Background(), tt)
  554. srv.Stop()
  555. action := st.Action()
  556. if len(action) != 1 {
  557. t.Errorf("#%d: len(action) = %d, want 1", i, len(action))
  558. }
  559. if err != nil {
  560. t.Fatalf("#%d: err = %v, want nil", i, err)
  561. }
  562. wresp := Response{Event: &store.Event{}}
  563. if !reflect.DeepEqual(resp, wresp) {
  564. t.Errorf("#%d: resp = %v, want %v", i, resp, wresp)
  565. }
  566. }
  567. }
  568. func TestDoProposalCancelled(t *testing.T) {
  569. wait := wait.NewRecorder()
  570. srv := &EtcdServer{
  571. cfg: &ServerConfig{TickMs: 1},
  572. r: raftNode{Node: newNodeNop()},
  573. w: wait,
  574. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  575. }
  576. ctx, cancel := context.WithCancel(context.Background())
  577. cancel()
  578. _, err := srv.Do(ctx, pb.Request{Method: "PUT"})
  579. if err != ErrCanceled {
  580. t.Fatalf("err = %v, want %v", err, ErrCanceled)
  581. }
  582. w := []testutil.Action{{Name: "Register"}, {Name: "Trigger"}}
  583. if !reflect.DeepEqual(wait.Action(), w) {
  584. t.Errorf("wait.action = %+v, want %+v", wait.Action(), w)
  585. }
  586. }
  587. func TestDoProposalTimeout(t *testing.T) {
  588. srv := &EtcdServer{
  589. cfg: &ServerConfig{TickMs: 1},
  590. r: raftNode{Node: newNodeNop()},
  591. w: wait.NewNop(),
  592. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  593. }
  594. ctx, _ := context.WithTimeout(context.Background(), 0)
  595. _, err := srv.Do(ctx, pb.Request{Method: "PUT"})
  596. if err != ErrTimeout {
  597. t.Fatalf("err = %v, want %v", err, ErrTimeout)
  598. }
  599. }
  600. func TestDoProposalStopped(t *testing.T) {
  601. srv := &EtcdServer{
  602. cfg: &ServerConfig{TickMs: 1},
  603. r: raftNode{Node: newNodeNop()},
  604. w: wait.NewNop(),
  605. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  606. }
  607. srv.done = make(chan struct{})
  608. close(srv.done)
  609. _, err := srv.Do(context.Background(), pb.Request{Method: "PUT", ID: 1})
  610. if err != ErrStopped {
  611. t.Errorf("err = %v, want %v", err, ErrStopped)
  612. }
  613. }
  614. // TestSync tests sync 1. is nonblocking 2. proposes SYNC request.
  615. func TestSync(t *testing.T) {
  616. n := newNodeRecorder()
  617. srv := &EtcdServer{
  618. r: raftNode{Node: n},
  619. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  620. }
  621. // check that sync is non-blocking
  622. done := make(chan struct{})
  623. go func() {
  624. srv.sync(10 * time.Second)
  625. done <- struct{}{}
  626. }()
  627. select {
  628. case <-done:
  629. case <-time.After(time.Second):
  630. t.Fatal("sync should be non-blocking but did not return after 1s!")
  631. }
  632. action, _ := n.Wait(1)
  633. if len(action) != 1 {
  634. t.Fatalf("len(action) = %d, want 1", len(action))
  635. }
  636. if action[0].Name != "Propose" {
  637. t.Fatalf("action = %s, want Propose", action[0].Name)
  638. }
  639. data := action[0].Params[0].([]byte)
  640. var r pb.Request
  641. if err := r.Unmarshal(data); err != nil {
  642. t.Fatalf("unmarshal request error: %v", err)
  643. }
  644. if r.Method != "SYNC" {
  645. t.Errorf("method = %s, want SYNC", r.Method)
  646. }
  647. }
  648. // TestSyncTimeout tests the case that sync 1. is non-blocking 2. cancel request
  649. // after timeout
  650. func TestSyncTimeout(t *testing.T) {
  651. n := newProposalBlockerRecorder()
  652. srv := &EtcdServer{
  653. r: raftNode{Node: n},
  654. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  655. }
  656. // check that sync is non-blocking
  657. done := make(chan struct{})
  658. go func() {
  659. srv.sync(0)
  660. done <- struct{}{}
  661. }()
  662. select {
  663. case <-done:
  664. case <-time.After(time.Second):
  665. t.Fatal("sync should be non-blocking but did not return after 1s!")
  666. }
  667. w := []testutil.Action{{Name: "Propose blocked"}}
  668. if g, _ := n.Wait(1); !reflect.DeepEqual(g, w) {
  669. t.Errorf("action = %v, want %v", g, w)
  670. }
  671. }
  672. // TODO: TestNoSyncWhenNoLeader
  673. // TestSyncTrigger tests that the server proposes a SYNC request when its sync timer ticks
  674. func TestSyncTrigger(t *testing.T) {
  675. n := newReadyNode()
  676. st := make(chan time.Time, 1)
  677. srv := &EtcdServer{
  678. cfg: &ServerConfig{TickMs: 1},
  679. r: raftNode{
  680. Node: n,
  681. raftStorage: raft.NewMemoryStorage(),
  682. transport: rafthttp.NewNopTransporter(),
  683. storage: &storageRecorder{},
  684. },
  685. store: store.NewNop(),
  686. SyncTicker: st,
  687. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  688. }
  689. // trigger the server to become a leader and accept sync requests
  690. go func() {
  691. srv.start()
  692. n.readyc <- raft.Ready{
  693. SoftState: &raft.SoftState{
  694. RaftState: raft.StateLeader,
  695. },
  696. }
  697. // trigger a sync request
  698. st <- time.Time{}
  699. }()
  700. action, _ := n.Wait(1)
  701. go srv.Stop()
  702. if len(action) != 1 {
  703. t.Fatalf("len(action) = %d, want 1", len(action))
  704. }
  705. if action[0].Name != "Propose" {
  706. t.Fatalf("action = %s, want Propose", action[0].Name)
  707. }
  708. data := action[0].Params[0].([]byte)
  709. var req pb.Request
  710. if err := req.Unmarshal(data); err != nil {
  711. t.Fatalf("error unmarshalling data: %v", err)
  712. }
  713. if req.Method != "SYNC" {
  714. t.Fatalf("unexpected proposed request: %#v", req.Method)
  715. }
  716. // wait on stop message
  717. <-n.Chan()
  718. }
  719. // snapshot should snapshot the store and cut the persistent
  720. func TestSnapshot(t *testing.T) {
  721. s := raft.NewMemoryStorage()
  722. s.Append([]raftpb.Entry{{Index: 1}})
  723. st := store.NewRecorder()
  724. p := &storageRecorder{}
  725. srv := &EtcdServer{
  726. cfg: &ServerConfig{},
  727. r: raftNode{
  728. Node: newNodeNop(),
  729. raftStorage: s,
  730. storage: p,
  731. },
  732. store: st,
  733. }
  734. srv.snapshot(1, raftpb.ConfState{Nodes: []uint64{1}})
  735. gaction, _ := st.Wait(2)
  736. if len(gaction) != 2 {
  737. t.Fatalf("len(action) = %d, want 1", len(gaction))
  738. }
  739. if !reflect.DeepEqual(gaction[0], testutil.Action{Name: "Clone"}) {
  740. t.Errorf("action = %s, want Clone", gaction[0])
  741. }
  742. if !reflect.DeepEqual(gaction[1], testutil.Action{Name: "SaveNoCopy"}) {
  743. t.Errorf("action = %s, want SaveNoCopy", gaction[1])
  744. }
  745. gaction = p.Action()
  746. if len(gaction) != 1 {
  747. t.Fatalf("len(action) = %d, want 1", len(gaction))
  748. }
  749. if !reflect.DeepEqual(gaction[0], testutil.Action{Name: "SaveSnap"}) {
  750. t.Errorf("action = %s, want SaveSnap", gaction[0])
  751. }
  752. }
  753. // Applied > SnapCount should trigger a SaveSnap event
  754. func TestTriggerSnap(t *testing.T) {
  755. snapc := 10
  756. st := store.NewRecorder()
  757. p := &storageRecorder{}
  758. srv := &EtcdServer{
  759. cfg: &ServerConfig{TickMs: 1},
  760. snapCount: uint64(snapc),
  761. r: raftNode{
  762. Node: newNodeCommitter(),
  763. raftStorage: raft.NewMemoryStorage(),
  764. storage: p,
  765. transport: rafthttp.NewNopTransporter(),
  766. },
  767. store: st,
  768. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  769. }
  770. srv.start()
  771. for i := 0; i < snapc+1; i++ {
  772. srv.Do(context.Background(), pb.Request{Method: "PUT"})
  773. }
  774. wcnt := 2 + snapc
  775. gaction, _ := p.Wait(wcnt)
  776. srv.Stop()
  777. // each operation is recorded as a Save
  778. // (SnapCount+1) * Puts + SaveSnap = (SnapCount+1) * Save + SaveSnap
  779. if len(gaction) != wcnt {
  780. t.Fatalf("len(action) = %d, want %d", len(gaction), wcnt)
  781. }
  782. if !reflect.DeepEqual(gaction[wcnt-1], testutil.Action{Name: "SaveSnap"}) {
  783. t.Errorf("action = %s, want SaveSnap", gaction[wcnt-1])
  784. }
  785. }
  786. // TestConcurrentApplyAndSnapshotV3 will send out snapshots concurrently with
  787. // proposals.
  788. func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
  789. const (
  790. // snapshots that may queue up at once without dropping
  791. maxInFlightMsgSnap = 16
  792. )
  793. n := newNopReadyNode()
  794. cl := newCluster("abc")
  795. cl.SetStore(store.New())
  796. testdir, err := ioutil.TempDir(os.TempDir(), "testsnapdir")
  797. if err != nil {
  798. t.Fatalf("Couldn't open tempdir (%v)", err)
  799. }
  800. defer os.RemoveAll(testdir)
  801. if err := os.MkdirAll(testdir+"/member/snap", 0755); err != nil {
  802. t.Fatalf("Couldn't make snap dir (%v)", err)
  803. }
  804. rs := raft.NewMemoryStorage()
  805. tr, snapDoneC := rafthttp.NewSnapTransporter(testdir)
  806. s := &EtcdServer{
  807. cfg: &ServerConfig{
  808. V3demo: true,
  809. DataDir: testdir,
  810. },
  811. r: raftNode{
  812. Node: n,
  813. transport: tr,
  814. storage: &storageRecorder{dbPath: testdir},
  815. raftStorage: rs,
  816. },
  817. store: cl.store,
  818. cluster: cl,
  819. msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap),
  820. }
  821. be, tmpPath := backend.NewDefaultTmpBackend()
  822. defer func() {
  823. os.RemoveAll(tmpPath)
  824. }()
  825. s.kv = dstorage.New(be, &s.consistIndex)
  826. s.be = be
  827. s.start()
  828. defer s.Stop()
  829. // submit applied entries and snap entries
  830. idx := uint64(0)
  831. outdated := 0
  832. accepted := 0
  833. for k := 1; k <= 101; k++ {
  834. idx++
  835. ch := s.w.Register(uint64(idx))
  836. req := &pb.Request{Method: "QGET", ID: uint64(idx)}
  837. ent := raftpb.Entry{Index: uint64(idx), Data: pbutil.MustMarshal(req)}
  838. ready := raft.Ready{Entries: []raftpb.Entry{ent}}
  839. n.readyc <- ready
  840. ready = raft.Ready{CommittedEntries: []raftpb.Entry{ent}}
  841. n.readyc <- ready
  842. // "idx" applied
  843. <-ch
  844. // one snapshot for every two messages
  845. if k%2 != 0 {
  846. continue
  847. }
  848. n.readyc <- raft.Ready{Messages: []raftpb.Message{{Type: raftpb.MsgSnap}}}
  849. // get the snapshot sent by the transport
  850. snapMsg := <-snapDoneC
  851. // If the snapshot trails applied records, recovery will panic
  852. // since there's no allocated snapshot at the place of the
  853. // snapshot record. This only happens when the applier and the
  854. // snapshot sender get out of sync.
  855. if snapMsg.Snapshot.Metadata.Index == idx {
  856. idx++
  857. snapMsg.Snapshot.Metadata.Index = idx
  858. ready = raft.Ready{Snapshot: snapMsg.Snapshot}
  859. n.readyc <- ready
  860. accepted++
  861. } else {
  862. outdated++
  863. }
  864. // don't wait for the snapshot to complete, move to next message
  865. }
  866. if accepted != 50 {
  867. t.Errorf("accepted=%v, want 50", accepted)
  868. }
  869. if outdated != 0 {
  870. t.Errorf("outdated=%v, want 0", outdated)
  871. }
  872. }
  873. // TestRecvSnapshot tests when it receives a snapshot from raft leader,
  874. // it should trigger storage.SaveSnap and also store.Recover.
  875. func TestRecvSnapshot(t *testing.T) {
  876. n := newNopReadyNode()
  877. st := store.NewRecorder()
  878. p := &storageRecorder{}
  879. cl := newCluster("abc")
  880. cl.SetStore(store.New())
  881. s := &EtcdServer{
  882. cfg: &ServerConfig{},
  883. r: raftNode{
  884. Node: n,
  885. transport: rafthttp.NewNopTransporter(),
  886. storage: p,
  887. raftStorage: raft.NewMemoryStorage(),
  888. },
  889. store: st,
  890. cluster: cl,
  891. }
  892. s.start()
  893. n.readyc <- raft.Ready{Snapshot: raftpb.Snapshot{Metadata: raftpb.SnapshotMetadata{Index: 1}}}
  894. // wait for actions happened on the storage
  895. for len(p.Action()) == 0 {
  896. time.Sleep(10 * time.Millisecond)
  897. }
  898. s.Stop()
  899. wactions := []testutil.Action{{Name: "Recovery"}}
  900. if g := st.Action(); !reflect.DeepEqual(g, wactions) {
  901. t.Errorf("store action = %v, want %v", g, wactions)
  902. }
  903. wactions = []testutil.Action{{Name: "SaveSnap"}, {Name: "Save"}}
  904. if g := p.Action(); !reflect.DeepEqual(g, wactions) {
  905. t.Errorf("storage action = %v, want %v", g, wactions)
  906. }
  907. }
  908. // TestApplySnapshotAndCommittedEntries tests that server applies snapshot
  909. // first and then committed entries.
  910. func TestApplySnapshotAndCommittedEntries(t *testing.T) {
  911. n := newNopReadyNode()
  912. st := store.NewRecorder()
  913. cl := newCluster("abc")
  914. cl.SetStore(store.New())
  915. storage := raft.NewMemoryStorage()
  916. s := &EtcdServer{
  917. cfg: &ServerConfig{},
  918. r: raftNode{
  919. Node: n,
  920. storage: &storageRecorder{},
  921. raftStorage: storage,
  922. transport: rafthttp.NewNopTransporter(),
  923. },
  924. store: st,
  925. cluster: cl,
  926. }
  927. s.start()
  928. req := &pb.Request{Method: "QGET"}
  929. n.readyc <- raft.Ready{
  930. Snapshot: raftpb.Snapshot{Metadata: raftpb.SnapshotMetadata{Index: 1}},
  931. CommittedEntries: []raftpb.Entry{
  932. {Index: 2, Data: pbutil.MustMarshal(req)},
  933. },
  934. }
  935. // make goroutines move forward to receive snapshot
  936. actions, _ := st.Wait(2)
  937. s.Stop()
  938. if len(actions) != 2 {
  939. t.Fatalf("len(action) = %d, want 2", len(actions))
  940. }
  941. if actions[0].Name != "Recovery" {
  942. t.Errorf("actions[0] = %s, want %s", actions[0].Name, "Recovery")
  943. }
  944. if actions[1].Name != "Get" {
  945. t.Errorf("actions[1] = %s, want %s", actions[1].Name, "Get")
  946. }
  947. }
  948. // TestAddMember tests AddMember can propose and perform node addition.
  949. func TestAddMember(t *testing.T) {
  950. n := newNodeConfChangeCommitterRecorder()
  951. n.readyc <- raft.Ready{
  952. SoftState: &raft.SoftState{RaftState: raft.StateLeader},
  953. }
  954. cl := newTestCluster(nil)
  955. st := store.New()
  956. cl.SetStore(st)
  957. s := &EtcdServer{
  958. r: raftNode{
  959. Node: n,
  960. raftStorage: raft.NewMemoryStorage(),
  961. storage: &storageRecorder{},
  962. transport: rafthttp.NewNopTransporter(),
  963. },
  964. cfg: &ServerConfig{},
  965. store: st,
  966. cluster: cl,
  967. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  968. }
  969. s.start()
  970. m := Member{ID: 1234, RaftAttributes: RaftAttributes{PeerURLs: []string{"foo"}}}
  971. err := s.AddMember(context.TODO(), m)
  972. gaction := n.Action()
  973. s.Stop()
  974. if err != nil {
  975. t.Fatalf("AddMember error: %v", err)
  976. }
  977. wactions := []testutil.Action{{Name: "ProposeConfChange:ConfChangeAddNode"}, {Name: "ApplyConfChange:ConfChangeAddNode"}}
  978. if !reflect.DeepEqual(gaction, wactions) {
  979. t.Errorf("action = %v, want %v", gaction, wactions)
  980. }
  981. if cl.Member(1234) == nil {
  982. t.Errorf("member with id 1234 is not added")
  983. }
  984. }
  985. // TestRemoveMember tests RemoveMember can propose and perform node removal.
  986. func TestRemoveMember(t *testing.T) {
  987. n := newNodeConfChangeCommitterRecorder()
  988. n.readyc <- raft.Ready{
  989. SoftState: &raft.SoftState{RaftState: raft.StateLeader},
  990. }
  991. cl := newTestCluster(nil)
  992. st := store.New()
  993. cl.SetStore(store.New())
  994. cl.AddMember(&Member{ID: 1234})
  995. s := &EtcdServer{
  996. r: raftNode{
  997. Node: n,
  998. raftStorage: raft.NewMemoryStorage(),
  999. storage: &storageRecorder{},
  1000. transport: rafthttp.NewNopTransporter(),
  1001. },
  1002. cfg: &ServerConfig{},
  1003. store: st,
  1004. cluster: cl,
  1005. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  1006. }
  1007. s.start()
  1008. err := s.RemoveMember(context.TODO(), 1234)
  1009. gaction := n.Action()
  1010. s.Stop()
  1011. if err != nil {
  1012. t.Fatalf("RemoveMember error: %v", err)
  1013. }
  1014. wactions := []testutil.Action{{Name: "ProposeConfChange:ConfChangeRemoveNode"}, {Name: "ApplyConfChange:ConfChangeRemoveNode"}}
  1015. if !reflect.DeepEqual(gaction, wactions) {
  1016. t.Errorf("action = %v, want %v", gaction, wactions)
  1017. }
  1018. if cl.Member(1234) != nil {
  1019. t.Errorf("member with id 1234 is not removed")
  1020. }
  1021. }
  1022. // TestUpdateMember tests RemoveMember can propose and perform node update.
  1023. func TestUpdateMember(t *testing.T) {
  1024. n := newNodeConfChangeCommitterRecorder()
  1025. n.readyc <- raft.Ready{
  1026. SoftState: &raft.SoftState{RaftState: raft.StateLeader},
  1027. }
  1028. cl := newTestCluster(nil)
  1029. st := store.New()
  1030. cl.SetStore(st)
  1031. cl.AddMember(&Member{ID: 1234})
  1032. s := &EtcdServer{
  1033. r: raftNode{
  1034. Node: n,
  1035. raftStorage: raft.NewMemoryStorage(),
  1036. storage: &storageRecorder{},
  1037. transport: rafthttp.NewNopTransporter(),
  1038. },
  1039. store: st,
  1040. cluster: cl,
  1041. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  1042. }
  1043. s.start()
  1044. wm := Member{ID: 1234, RaftAttributes: RaftAttributes{PeerURLs: []string{"http://127.0.0.1:1"}}}
  1045. err := s.UpdateMember(context.TODO(), wm)
  1046. gaction := n.Action()
  1047. s.Stop()
  1048. if err != nil {
  1049. t.Fatalf("UpdateMember error: %v", err)
  1050. }
  1051. wactions := []testutil.Action{{Name: "ProposeConfChange:ConfChangeUpdateNode"}, {Name: "ApplyConfChange:ConfChangeUpdateNode"}}
  1052. if !reflect.DeepEqual(gaction, wactions) {
  1053. t.Errorf("action = %v, want %v", gaction, wactions)
  1054. }
  1055. if !reflect.DeepEqual(cl.Member(1234), &wm) {
  1056. t.Errorf("member = %v, want %v", cl.Member(1234), &wm)
  1057. }
  1058. }
  1059. // TODO: test server could stop itself when being removed
  1060. func TestPublish(t *testing.T) {
  1061. n := newNodeRecorder()
  1062. ch := make(chan interface{}, 1)
  1063. // simulate that request has gone through consensus
  1064. ch <- Response{}
  1065. w := wait.NewWithResponse(ch)
  1066. srv := &EtcdServer{
  1067. cfg: &ServerConfig{TickMs: 1},
  1068. id: 1,
  1069. r: raftNode{Node: n},
  1070. attributes: Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}},
  1071. cluster: &cluster{},
  1072. w: w,
  1073. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  1074. }
  1075. srv.publish(time.Hour)
  1076. action := n.Action()
  1077. if len(action) != 1 {
  1078. t.Fatalf("len(action) = %d, want 1", len(action))
  1079. }
  1080. if action[0].Name != "Propose" {
  1081. t.Fatalf("action = %s, want Propose", action[0].Name)
  1082. }
  1083. data := action[0].Params[0].([]byte)
  1084. var r pb.Request
  1085. if err := r.Unmarshal(data); err != nil {
  1086. t.Fatalf("unmarshal request error: %v", err)
  1087. }
  1088. if r.Method != "PUT" {
  1089. t.Errorf("method = %s, want PUT", r.Method)
  1090. }
  1091. wm := Member{ID: 1, Attributes: Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}}}
  1092. if wpath := path.Join(memberStoreKey(wm.ID), attributesSuffix); r.Path != wpath {
  1093. t.Errorf("path = %s, want %s", r.Path, wpath)
  1094. }
  1095. var gattr Attributes
  1096. if err := json.Unmarshal([]byte(r.Val), &gattr); err != nil {
  1097. t.Fatalf("unmarshal val error: %v", err)
  1098. }
  1099. if !reflect.DeepEqual(gattr, wm.Attributes) {
  1100. t.Errorf("member = %v, want %v", gattr, wm.Attributes)
  1101. }
  1102. }
  1103. // TestPublishStopped tests that publish will be stopped if server is stopped.
  1104. func TestPublishStopped(t *testing.T) {
  1105. srv := &EtcdServer{
  1106. cfg: &ServerConfig{TickMs: 1},
  1107. r: raftNode{
  1108. Node: newNodeNop(),
  1109. transport: rafthttp.NewNopTransporter(),
  1110. },
  1111. cluster: &cluster{},
  1112. w: wait.NewNop(),
  1113. done: make(chan struct{}),
  1114. stop: make(chan struct{}),
  1115. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  1116. }
  1117. close(srv.done)
  1118. srv.publish(time.Hour)
  1119. }
  1120. // TestPublishRetry tests that publish will keep retry until success.
  1121. func TestPublishRetry(t *testing.T) {
  1122. n := newNodeRecorder()
  1123. srv := &EtcdServer{
  1124. cfg: &ServerConfig{TickMs: 1},
  1125. r: raftNode{Node: n},
  1126. w: wait.NewNop(),
  1127. done: make(chan struct{}),
  1128. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  1129. }
  1130. // TODO: use fakeClockwork
  1131. time.AfterFunc(10*time.Millisecond, func() { close(srv.done) })
  1132. srv.publish(10 * time.Nanosecond)
  1133. action := n.Action()
  1134. // multiple Proposes
  1135. if cnt := len(action); cnt < 2 {
  1136. t.Errorf("len(action) = %d, want >= 2", cnt)
  1137. }
  1138. }
  1139. func TestUpdateVersion(t *testing.T) {
  1140. n := newNodeRecorder()
  1141. ch := make(chan interface{}, 1)
  1142. // simulate that request has gone through consensus
  1143. ch <- Response{}
  1144. w := wait.NewWithResponse(ch)
  1145. srv := &EtcdServer{
  1146. id: 1,
  1147. cfg: &ServerConfig{TickMs: 1},
  1148. r: raftNode{Node: n},
  1149. attributes: Attributes{Name: "node1", ClientURLs: []string{"http://node1.com"}},
  1150. cluster: &cluster{},
  1151. w: w,
  1152. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  1153. }
  1154. srv.updateClusterVersion("2.0.0")
  1155. action := n.Action()
  1156. if len(action) != 1 {
  1157. t.Fatalf("len(action) = %d, want 1", len(action))
  1158. }
  1159. if action[0].Name != "Propose" {
  1160. t.Fatalf("action = %s, want Propose", action[0].Name)
  1161. }
  1162. data := action[0].Params[0].([]byte)
  1163. var r pb.Request
  1164. if err := r.Unmarshal(data); err != nil {
  1165. t.Fatalf("unmarshal request error: %v", err)
  1166. }
  1167. if r.Method != "PUT" {
  1168. t.Errorf("method = %s, want PUT", r.Method)
  1169. }
  1170. if wpath := path.Join(StoreClusterPrefix, "version"); r.Path != wpath {
  1171. t.Errorf("path = %s, want %s", r.Path, wpath)
  1172. }
  1173. if r.Val != "2.0.0" {
  1174. t.Errorf("val = %s, want %s", r.Val, "2.0.0")
  1175. }
  1176. }
  1177. func TestStopNotify(t *testing.T) {
  1178. s := &EtcdServer{
  1179. stop: make(chan struct{}),
  1180. done: make(chan struct{}),
  1181. }
  1182. go func() {
  1183. <-s.stop
  1184. close(s.done)
  1185. }()
  1186. notifier := s.StopNotify()
  1187. select {
  1188. case <-notifier:
  1189. t.Fatalf("received unexpected stop notification")
  1190. default:
  1191. }
  1192. s.Stop()
  1193. select {
  1194. case <-notifier:
  1195. default:
  1196. t.Fatalf("cannot receive stop notification")
  1197. }
  1198. }
  1199. func TestGetOtherPeerURLs(t *testing.T) {
  1200. tests := []struct {
  1201. membs []*Member
  1202. self string
  1203. wurls []string
  1204. }{
  1205. {
  1206. []*Member{
  1207. newTestMember(1, []string{"http://10.0.0.1"}, "a", nil),
  1208. },
  1209. "a",
  1210. []string{},
  1211. },
  1212. {
  1213. []*Member{
  1214. newTestMember(1, []string{"http://10.0.0.1"}, "a", nil),
  1215. newTestMember(2, []string{"http://10.0.0.2"}, "b", nil),
  1216. newTestMember(3, []string{"http://10.0.0.3"}, "c", nil),
  1217. },
  1218. "a",
  1219. []string{"http://10.0.0.2", "http://10.0.0.3"},
  1220. },
  1221. {
  1222. []*Member{
  1223. newTestMember(1, []string{"http://10.0.0.1"}, "a", nil),
  1224. newTestMember(3, []string{"http://10.0.0.3"}, "c", nil),
  1225. newTestMember(2, []string{"http://10.0.0.2"}, "b", nil),
  1226. },
  1227. "a",
  1228. []string{"http://10.0.0.2", "http://10.0.0.3"},
  1229. },
  1230. }
  1231. for i, tt := range tests {
  1232. cl := newClusterFromMembers("", types.ID(0), tt.membs)
  1233. urls := getRemotePeerURLs(cl, tt.self)
  1234. if !reflect.DeepEqual(urls, tt.wurls) {
  1235. t.Errorf("#%d: urls = %+v, want %+v", i, urls, tt.wurls)
  1236. }
  1237. }
  1238. }
  1239. type nodeRecorder struct{ testutil.Recorder }
  1240. func newNodeRecorder() *nodeRecorder { return &nodeRecorder{&testutil.RecorderBuffered{}} }
  1241. func newNodeNop() raft.Node { return newNodeRecorder() }
  1242. func (n *nodeRecorder) Tick() { n.Record(testutil.Action{Name: "Tick"}) }
  1243. func (n *nodeRecorder) Campaign(ctx context.Context) error {
  1244. n.Record(testutil.Action{Name: "Campaign"})
  1245. return nil
  1246. }
  1247. func (n *nodeRecorder) Propose(ctx context.Context, data []byte) error {
  1248. n.Record(testutil.Action{Name: "Propose", Params: []interface{}{data}})
  1249. return nil
  1250. }
  1251. func (n *nodeRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error {
  1252. n.Record(testutil.Action{Name: "ProposeConfChange"})
  1253. return nil
  1254. }
  1255. func (n *nodeRecorder) Step(ctx context.Context, msg raftpb.Message) error {
  1256. n.Record(testutil.Action{Name: "Step"})
  1257. return nil
  1258. }
  1259. func (n *nodeRecorder) Status() raft.Status { return raft.Status{} }
  1260. func (n *nodeRecorder) Ready() <-chan raft.Ready { return nil }
  1261. func (n *nodeRecorder) Advance() {}
  1262. func (n *nodeRecorder) ApplyConfChange(conf raftpb.ConfChange) *raftpb.ConfState {
  1263. n.Record(testutil.Action{Name: "ApplyConfChange", Params: []interface{}{conf}})
  1264. return &raftpb.ConfState{}
  1265. }
  1266. func (n *nodeRecorder) Stop() {
  1267. n.Record(testutil.Action{Name: "Stop"})
  1268. }
  1269. func (n *nodeRecorder) ReportUnreachable(id uint64) {}
  1270. func (n *nodeRecorder) ReportSnapshot(id uint64, status raft.SnapshotStatus) {}
  1271. func (n *nodeRecorder) Compact(index uint64, nodes []uint64, d []byte) {
  1272. n.Record(testutil.Action{Name: "Compact"})
  1273. }
  1274. type nodeProposalBlockerRecorder struct {
  1275. nodeRecorder
  1276. }
  1277. func newProposalBlockerRecorder() *nodeProposalBlockerRecorder {
  1278. return &nodeProposalBlockerRecorder{*newNodeRecorder()}
  1279. }
  1280. func (n *nodeProposalBlockerRecorder) Propose(ctx context.Context, data []byte) error {
  1281. <-ctx.Done()
  1282. n.Record(testutil.Action{Name: "Propose blocked"})
  1283. return nil
  1284. }
  1285. // readyNode is a nodeRecorder with a user-writeable ready channel
  1286. type readyNode struct {
  1287. nodeRecorder
  1288. readyc chan raft.Ready
  1289. }
  1290. func newReadyNode() *readyNode {
  1291. return &readyNode{
  1292. nodeRecorder{testutil.NewRecorderStream()},
  1293. make(chan raft.Ready, 1)}
  1294. }
  1295. func newNopReadyNode() *readyNode {
  1296. return &readyNode{*newNodeRecorder(), make(chan raft.Ready, 1)}
  1297. }
  1298. func (n *readyNode) Ready() <-chan raft.Ready { return n.readyc }
  1299. type nodeConfChangeCommitterRecorder struct {
  1300. readyNode
  1301. index uint64
  1302. }
  1303. func newNodeConfChangeCommitterRecorder() *nodeConfChangeCommitterRecorder {
  1304. return &nodeConfChangeCommitterRecorder{*newNopReadyNode(), 0}
  1305. }
  1306. func (n *nodeConfChangeCommitterRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error {
  1307. data, err := conf.Marshal()
  1308. if err != nil {
  1309. return err
  1310. }
  1311. n.index++
  1312. n.Record(testutil.Action{Name: "ProposeConfChange:" + conf.Type.String()})
  1313. n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Index: n.index, Type: raftpb.EntryConfChange, Data: data}}}
  1314. return nil
  1315. }
  1316. func (n *nodeConfChangeCommitterRecorder) Ready() <-chan raft.Ready {
  1317. return n.readyc
  1318. }
  1319. func (n *nodeConfChangeCommitterRecorder) ApplyConfChange(conf raftpb.ConfChange) *raftpb.ConfState {
  1320. n.Record(testutil.Action{Name: "ApplyConfChange:" + conf.Type.String()})
  1321. return &raftpb.ConfState{}
  1322. }
  1323. // nodeCommitter commits proposed data immediately.
  1324. type nodeCommitter struct {
  1325. readyNode
  1326. index uint64
  1327. }
  1328. func newNodeCommitter() raft.Node {
  1329. return &nodeCommitter{*newNopReadyNode(), 0}
  1330. }
  1331. func (n *nodeCommitter) Propose(ctx context.Context, data []byte) error {
  1332. n.index++
  1333. ents := []raftpb.Entry{{Index: n.index, Data: data}}
  1334. n.readyc <- raft.Ready{
  1335. Entries: ents,
  1336. CommittedEntries: ents,
  1337. }
  1338. return nil
  1339. }