server_test.go 36 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424
  1. // Copyright 2015 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package etcdserver
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "io/ioutil"
  19. "os"
  20. "path"
  21. "reflect"
  22. "strconv"
  23. "testing"
  24. "time"
  25. "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
  26. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  27. "github.com/coreos/etcd/pkg/idutil"
  28. "github.com/coreos/etcd/pkg/pbutil"
  29. "github.com/coreos/etcd/pkg/testutil"
  30. "github.com/coreos/etcd/pkg/types"
  31. "github.com/coreos/etcd/pkg/wait"
  32. "github.com/coreos/etcd/raft"
  33. "github.com/coreos/etcd/raft/raftpb"
  34. "github.com/coreos/etcd/rafthttp"
  35. dstorage "github.com/coreos/etcd/storage"
  36. "github.com/coreos/etcd/store"
  37. )
  38. // TestDoLocalAction tests requests which do not need to go through raft to be applied,
  39. // and are served through local data.
  40. func TestDoLocalAction(t *testing.T) {
  41. tests := []struct {
  42. req pb.Request
  43. wresp Response
  44. werr error
  45. wactions []testutil.Action
  46. }{
  47. {
  48. pb.Request{Method: "GET", ID: 1, Wait: true},
  49. Response{Watcher: store.NewNopWatcher()}, nil, []testutil.Action{{Name: "Watch"}},
  50. },
  51. {
  52. pb.Request{Method: "GET", ID: 1},
  53. Response{Event: &store.Event{}}, nil,
  54. []testutil.Action{
  55. {
  56. Name: "Get",
  57. Params: []interface{}{"", false, false},
  58. },
  59. },
  60. },
  61. {
  62. pb.Request{Method: "HEAD", ID: 1},
  63. Response{Event: &store.Event{}}, nil,
  64. []testutil.Action{
  65. {
  66. Name: "Get",
  67. Params: []interface{}{"", false, false},
  68. },
  69. },
  70. },
  71. {
  72. pb.Request{Method: "BADMETHOD", ID: 1},
  73. Response{}, ErrUnknownMethod, []testutil.Action{},
  74. },
  75. }
  76. for i, tt := range tests {
  77. st := store.NewRecorder()
  78. srv := &EtcdServer{
  79. store: st,
  80. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  81. }
  82. resp, err := srv.Do(context.TODO(), tt.req)
  83. if err != tt.werr {
  84. t.Fatalf("#%d: err = %+v, want %+v", i, err, tt.werr)
  85. }
  86. if !reflect.DeepEqual(resp, tt.wresp) {
  87. t.Errorf("#%d: resp = %+v, want %+v", i, resp, tt.wresp)
  88. }
  89. gaction := st.Action()
  90. if !reflect.DeepEqual(gaction, tt.wactions) {
  91. t.Errorf("#%d: action = %+v, want %+v", i, gaction, tt.wactions)
  92. }
  93. }
  94. }
  95. // TestDoBadLocalAction tests server requests which do not need to go through consensus,
  96. // and return errors when they fetch from local data.
  97. func TestDoBadLocalAction(t *testing.T) {
  98. storeErr := fmt.Errorf("bah")
  99. tests := []struct {
  100. req pb.Request
  101. wactions []testutil.Action
  102. }{
  103. {
  104. pb.Request{Method: "GET", ID: 1, Wait: true},
  105. []testutil.Action{{Name: "Watch"}},
  106. },
  107. {
  108. pb.Request{Method: "GET", ID: 1},
  109. []testutil.Action{
  110. {
  111. Name: "Get",
  112. Params: []interface{}{"", false, false},
  113. },
  114. },
  115. },
  116. {
  117. pb.Request{Method: "HEAD", ID: 1},
  118. []testutil.Action{
  119. {
  120. Name: "Get",
  121. Params: []interface{}{"", false, false},
  122. },
  123. },
  124. },
  125. }
  126. for i, tt := range tests {
  127. st := store.NewErrRecorder(storeErr)
  128. srv := &EtcdServer{
  129. store: st,
  130. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  131. }
  132. resp, err := srv.Do(context.Background(), tt.req)
  133. if err != storeErr {
  134. t.Fatalf("#%d: err = %+v, want %+v", i, err, storeErr)
  135. }
  136. if !reflect.DeepEqual(resp, Response{}) {
  137. t.Errorf("#%d: resp = %+v, want %+v", i, resp, Response{})
  138. }
  139. gaction := st.Action()
  140. if !reflect.DeepEqual(gaction, tt.wactions) {
  141. t.Errorf("#%d: action = %+v, want %+v", i, gaction, tt.wactions)
  142. }
  143. }
  144. }
  145. func TestApplyRequest(t *testing.T) {
  146. tests := []struct {
  147. req pb.Request
  148. wresp Response
  149. wactions []testutil.Action
  150. }{
  151. // POST ==> Create
  152. {
  153. pb.Request{Method: "POST", ID: 1},
  154. Response{Event: &store.Event{}},
  155. []testutil.Action{
  156. {
  157. Name: "Create",
  158. Params: []interface{}{"", false, "", true, time.Time{}},
  159. },
  160. },
  161. },
  162. // POST ==> Create, with expiration
  163. {
  164. pb.Request{Method: "POST", ID: 1, Expiration: 1337},
  165. Response{Event: &store.Event{}},
  166. []testutil.Action{
  167. {
  168. Name: "Create",
  169. Params: []interface{}{"", false, "", true, time.Unix(0, 1337)},
  170. },
  171. },
  172. },
  173. // POST ==> Create, with dir
  174. {
  175. pb.Request{Method: "POST", ID: 1, Dir: true},
  176. Response{Event: &store.Event{}},
  177. []testutil.Action{
  178. {
  179. Name: "Create",
  180. Params: []interface{}{"", true, "", true, time.Time{}},
  181. },
  182. },
  183. },
  184. // PUT ==> Set
  185. {
  186. pb.Request{Method: "PUT", ID: 1},
  187. Response{Event: &store.Event{}},
  188. []testutil.Action{
  189. {
  190. Name: "Set",
  191. Params: []interface{}{"", false, "", time.Time{}},
  192. },
  193. },
  194. },
  195. // PUT ==> Set, with dir
  196. {
  197. pb.Request{Method: "PUT", ID: 1, Dir: true},
  198. Response{Event: &store.Event{}},
  199. []testutil.Action{
  200. {
  201. Name: "Set",
  202. Params: []interface{}{"", true, "", time.Time{}},
  203. },
  204. },
  205. },
  206. // PUT with PrevExist=true ==> Update
  207. {
  208. pb.Request{Method: "PUT", ID: 1, PrevExist: pbutil.Boolp(true)},
  209. Response{Event: &store.Event{}},
  210. []testutil.Action{
  211. {
  212. Name: "Update",
  213. Params: []interface{}{"", "", time.Time{}},
  214. },
  215. },
  216. },
  217. // PUT with PrevExist=false ==> Create
  218. {
  219. pb.Request{Method: "PUT", ID: 1, PrevExist: pbutil.Boolp(false)},
  220. Response{Event: &store.Event{}},
  221. []testutil.Action{
  222. {
  223. Name: "Create",
  224. Params: []interface{}{"", false, "", false, time.Time{}},
  225. },
  226. },
  227. },
  228. // PUT with PrevExist=true *and* PrevIndex set ==> CompareAndSwap
  229. {
  230. pb.Request{Method: "PUT", ID: 1, PrevExist: pbutil.Boolp(true), PrevIndex: 1},
  231. Response{Event: &store.Event{}},
  232. []testutil.Action{
  233. {
  234. Name: "CompareAndSwap",
  235. Params: []interface{}{"", "", uint64(1), "", time.Time{}},
  236. },
  237. },
  238. },
  239. // PUT with PrevExist=false *and* PrevIndex set ==> Create
  240. {
  241. pb.Request{Method: "PUT", ID: 1, PrevExist: pbutil.Boolp(false), PrevIndex: 1},
  242. Response{Event: &store.Event{}},
  243. []testutil.Action{
  244. {
  245. Name: "Create",
  246. Params: []interface{}{"", false, "", false, time.Time{}},
  247. },
  248. },
  249. },
  250. // PUT with PrevIndex set ==> CompareAndSwap
  251. {
  252. pb.Request{Method: "PUT", ID: 1, PrevIndex: 1},
  253. Response{Event: &store.Event{}},
  254. []testutil.Action{
  255. {
  256. Name: "CompareAndSwap",
  257. Params: []interface{}{"", "", uint64(1), "", time.Time{}},
  258. },
  259. },
  260. },
  261. // PUT with PrevValue set ==> CompareAndSwap
  262. {
  263. pb.Request{Method: "PUT", ID: 1, PrevValue: "bar"},
  264. Response{Event: &store.Event{}},
  265. []testutil.Action{
  266. {
  267. Name: "CompareAndSwap",
  268. Params: []interface{}{"", "bar", uint64(0), "", time.Time{}},
  269. },
  270. },
  271. },
  272. // PUT with PrevIndex and PrevValue set ==> CompareAndSwap
  273. {
  274. pb.Request{Method: "PUT", ID: 1, PrevIndex: 1, PrevValue: "bar"},
  275. Response{Event: &store.Event{}},
  276. []testutil.Action{
  277. {
  278. Name: "CompareAndSwap",
  279. Params: []interface{}{"", "bar", uint64(1), "", time.Time{}},
  280. },
  281. },
  282. },
  283. // DELETE ==> Delete
  284. {
  285. pb.Request{Method: "DELETE", ID: 1},
  286. Response{Event: &store.Event{}},
  287. []testutil.Action{
  288. {
  289. Name: "Delete",
  290. Params: []interface{}{"", false, false},
  291. },
  292. },
  293. },
  294. // DELETE with PrevIndex set ==> CompareAndDelete
  295. {
  296. pb.Request{Method: "DELETE", ID: 1, PrevIndex: 1},
  297. Response{Event: &store.Event{}},
  298. []testutil.Action{
  299. {
  300. Name: "CompareAndDelete",
  301. Params: []interface{}{"", "", uint64(1)},
  302. },
  303. },
  304. },
  305. // DELETE with PrevValue set ==> CompareAndDelete
  306. {
  307. pb.Request{Method: "DELETE", ID: 1, PrevValue: "bar"},
  308. Response{Event: &store.Event{}},
  309. []testutil.Action{
  310. {
  311. Name: "CompareAndDelete",
  312. Params: []interface{}{"", "bar", uint64(0)},
  313. },
  314. },
  315. },
  316. // DELETE with PrevIndex *and* PrevValue set ==> CompareAndDelete
  317. {
  318. pb.Request{Method: "DELETE", ID: 1, PrevIndex: 5, PrevValue: "bar"},
  319. Response{Event: &store.Event{}},
  320. []testutil.Action{
  321. {
  322. Name: "CompareAndDelete",
  323. Params: []interface{}{"", "bar", uint64(5)},
  324. },
  325. },
  326. },
  327. // QGET ==> Get
  328. {
  329. pb.Request{Method: "QGET", ID: 1},
  330. Response{Event: &store.Event{}},
  331. []testutil.Action{
  332. {
  333. Name: "Get",
  334. Params: []interface{}{"", false, false},
  335. },
  336. },
  337. },
  338. // SYNC ==> DeleteExpiredKeys
  339. {
  340. pb.Request{Method: "SYNC", ID: 1},
  341. Response{},
  342. []testutil.Action{
  343. {
  344. Name: "DeleteExpiredKeys",
  345. Params: []interface{}{time.Unix(0, 0)},
  346. },
  347. },
  348. },
  349. {
  350. pb.Request{Method: "SYNC", ID: 1, Time: 12345},
  351. Response{},
  352. []testutil.Action{
  353. {
  354. Name: "DeleteExpiredKeys",
  355. Params: []interface{}{time.Unix(0, 12345)},
  356. },
  357. },
  358. },
  359. // Unknown method - error
  360. {
  361. pb.Request{Method: "BADMETHOD", ID: 1},
  362. Response{err: ErrUnknownMethod},
  363. []testutil.Action{},
  364. },
  365. }
  366. for i, tt := range tests {
  367. st := store.NewRecorder()
  368. srv := &EtcdServer{store: st}
  369. resp := srv.applyRequest(tt.req)
  370. if !reflect.DeepEqual(resp, tt.wresp) {
  371. t.Errorf("#%d: resp = %+v, want %+v", i, resp, tt.wresp)
  372. }
  373. gaction := st.Action()
  374. if !reflect.DeepEqual(gaction, tt.wactions) {
  375. t.Errorf("#%d: action = %#v, want %#v", i, gaction, tt.wactions)
  376. }
  377. }
  378. }
  379. func TestApplyRequestOnAdminMemberAttributes(t *testing.T) {
  380. cl := newTestCluster([]*Member{{ID: 1}})
  381. srv := &EtcdServer{
  382. store: store.NewRecorder(),
  383. cluster: cl,
  384. }
  385. req := pb.Request{
  386. Method: "PUT",
  387. ID: 1,
  388. Path: path.Join(storeMembersPrefix, strconv.FormatUint(1, 16), attributesSuffix),
  389. Val: `{"Name":"abc","ClientURLs":["http://127.0.0.1:2379"]}`,
  390. }
  391. srv.applyRequest(req)
  392. w := Attributes{Name: "abc", ClientURLs: []string{"http://127.0.0.1:2379"}}
  393. if g := cl.Member(1).Attributes; !reflect.DeepEqual(g, w) {
  394. t.Errorf("attributes = %v, want %v", g, w)
  395. }
  396. }
  397. func TestApplyConfChangeError(t *testing.T) {
  398. cl := newCluster("")
  399. cl.SetStore(store.New())
  400. for i := 1; i <= 4; i++ {
  401. cl.AddMember(&Member{ID: types.ID(i)})
  402. }
  403. cl.RemoveMember(4)
  404. tests := []struct {
  405. cc raftpb.ConfChange
  406. werr error
  407. }{
  408. {
  409. raftpb.ConfChange{
  410. Type: raftpb.ConfChangeAddNode,
  411. NodeID: 4,
  412. },
  413. ErrIDRemoved,
  414. },
  415. {
  416. raftpb.ConfChange{
  417. Type: raftpb.ConfChangeUpdateNode,
  418. NodeID: 4,
  419. },
  420. ErrIDRemoved,
  421. },
  422. {
  423. raftpb.ConfChange{
  424. Type: raftpb.ConfChangeAddNode,
  425. NodeID: 1,
  426. },
  427. ErrIDExists,
  428. },
  429. {
  430. raftpb.ConfChange{
  431. Type: raftpb.ConfChangeRemoveNode,
  432. NodeID: 5,
  433. },
  434. ErrIDNotFound,
  435. },
  436. }
  437. for i, tt := range tests {
  438. n := newNodeRecorder()
  439. srv := &EtcdServer{
  440. r: raftNode{Node: n},
  441. cluster: cl,
  442. cfg: &ServerConfig{},
  443. }
  444. _, err := srv.applyConfChange(tt.cc, nil)
  445. if err != tt.werr {
  446. t.Errorf("#%d: applyConfChange error = %v, want %v", i, err, tt.werr)
  447. }
  448. cc := raftpb.ConfChange{Type: tt.cc.Type, NodeID: raft.None}
  449. w := []testutil.Action{
  450. {
  451. Name: "ApplyConfChange",
  452. Params: []interface{}{cc},
  453. },
  454. }
  455. if g := n.Action(); !reflect.DeepEqual(g, w) {
  456. t.Errorf("#%d: action = %+v, want %+v", i, g, w)
  457. }
  458. }
  459. }
  460. func TestApplyConfChangeShouldStop(t *testing.T) {
  461. cl := newCluster("")
  462. cl.SetStore(store.New())
  463. for i := 1; i <= 3; i++ {
  464. cl.AddMember(&Member{ID: types.ID(i)})
  465. }
  466. srv := &EtcdServer{
  467. id: 1,
  468. r: raftNode{
  469. Node: newNodeNop(),
  470. transport: rafthttp.NewNopTransporter(),
  471. },
  472. cluster: cl,
  473. }
  474. cc := raftpb.ConfChange{
  475. Type: raftpb.ConfChangeRemoveNode,
  476. NodeID: 2,
  477. }
  478. // remove non-local member
  479. shouldStop, err := srv.applyConfChange(cc, &raftpb.ConfState{})
  480. if err != nil {
  481. t.Fatalf("unexpected error %v", err)
  482. }
  483. if shouldStop != false {
  484. t.Errorf("shouldStop = %t, want %t", shouldStop, false)
  485. }
  486. // remove local member
  487. cc.NodeID = 1
  488. shouldStop, err = srv.applyConfChange(cc, &raftpb.ConfState{})
  489. if err != nil {
  490. t.Fatalf("unexpected error %v", err)
  491. }
  492. if shouldStop != true {
  493. t.Errorf("shouldStop = %t, want %t", shouldStop, true)
  494. }
  495. }
  496. // TestApplyMultiConfChangeShouldStop ensures that apply will return shouldStop
  497. // if the local member is removed along with other conf updates.
  498. func TestApplyMultiConfChangeShouldStop(t *testing.T) {
  499. cl := newCluster("")
  500. cl.SetStore(store.New())
  501. for i := 1; i <= 5; i++ {
  502. cl.AddMember(&Member{ID: types.ID(i)})
  503. }
  504. srv := &EtcdServer{
  505. id: 2,
  506. r: raftNode{
  507. Node: newNodeNop(),
  508. transport: rafthttp.NewNopTransporter(),
  509. },
  510. cluster: cl,
  511. w: wait.New(),
  512. }
  513. ents := []raftpb.Entry{}
  514. for i := 1; i <= 4; i++ {
  515. ent := raftpb.Entry{
  516. Term: 1,
  517. Index: uint64(i),
  518. Type: raftpb.EntryConfChange,
  519. Data: pbutil.MustMarshal(
  520. &raftpb.ConfChange{
  521. Type: raftpb.ConfChangeRemoveNode,
  522. NodeID: uint64(i)}),
  523. }
  524. ents = append(ents, ent)
  525. }
  526. _, shouldStop := srv.apply(ents, &raftpb.ConfState{})
  527. if shouldStop == false {
  528. t.Errorf("shouldStop = %t, want %t", shouldStop, true)
  529. }
  530. }
  531. func TestDoProposal(t *testing.T) {
  532. tests := []pb.Request{
  533. {Method: "POST", ID: 1},
  534. {Method: "PUT", ID: 1},
  535. {Method: "DELETE", ID: 1},
  536. {Method: "GET", ID: 1, Quorum: true},
  537. }
  538. for i, tt := range tests {
  539. st := store.NewRecorder()
  540. srv := &EtcdServer{
  541. cfg: &ServerConfig{TickMs: 1},
  542. r: raftNode{
  543. Node: newNodeCommitter(),
  544. storage: &storageRecorder{},
  545. raftStorage: raft.NewMemoryStorage(),
  546. transport: rafthttp.NewNopTransporter(),
  547. },
  548. store: st,
  549. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  550. }
  551. srv.start()
  552. resp, err := srv.Do(context.Background(), tt)
  553. srv.Stop()
  554. action := st.Action()
  555. if len(action) != 1 {
  556. t.Errorf("#%d: len(action) = %d, want 1", i, len(action))
  557. }
  558. if err != nil {
  559. t.Fatalf("#%d: err = %v, want nil", i, err)
  560. }
  561. wresp := Response{Event: &store.Event{}}
  562. if !reflect.DeepEqual(resp, wresp) {
  563. t.Errorf("#%d: resp = %v, want %v", i, resp, wresp)
  564. }
  565. }
  566. }
  567. func TestDoProposalCancelled(t *testing.T) {
  568. wait := wait.NewRecorder()
  569. srv := &EtcdServer{
  570. cfg: &ServerConfig{TickMs: 1},
  571. r: raftNode{Node: newNodeNop()},
  572. w: wait,
  573. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  574. }
  575. ctx, cancel := context.WithCancel(context.Background())
  576. cancel()
  577. _, err := srv.Do(ctx, pb.Request{Method: "PUT"})
  578. if err != ErrCanceled {
  579. t.Fatalf("err = %v, want %v", err, ErrCanceled)
  580. }
  581. w := []testutil.Action{{Name: "Register"}, {Name: "Trigger"}}
  582. if !reflect.DeepEqual(wait.Action(), w) {
  583. t.Errorf("wait.action = %+v, want %+v", wait.Action(), w)
  584. }
  585. }
  586. func TestDoProposalTimeout(t *testing.T) {
  587. srv := &EtcdServer{
  588. cfg: &ServerConfig{TickMs: 1},
  589. r: raftNode{Node: newNodeNop()},
  590. w: wait.NewNop(),
  591. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  592. }
  593. ctx, _ := context.WithTimeout(context.Background(), 0)
  594. _, err := srv.Do(ctx, pb.Request{Method: "PUT"})
  595. if err != ErrTimeout {
  596. t.Fatalf("err = %v, want %v", err, ErrTimeout)
  597. }
  598. }
  599. func TestDoProposalStopped(t *testing.T) {
  600. srv := &EtcdServer{
  601. cfg: &ServerConfig{TickMs: 1},
  602. r: raftNode{Node: newNodeNop()},
  603. w: wait.NewNop(),
  604. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  605. }
  606. srv.done = make(chan struct{})
  607. close(srv.done)
  608. _, err := srv.Do(context.Background(), pb.Request{Method: "PUT", ID: 1})
  609. if err != ErrStopped {
  610. t.Errorf("err = %v, want %v", err, ErrStopped)
  611. }
  612. }
  613. // TestSync tests sync 1. is nonblocking 2. proposes SYNC request.
  614. func TestSync(t *testing.T) {
  615. n := newNodeRecorder()
  616. srv := &EtcdServer{
  617. r: raftNode{Node: n},
  618. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  619. }
  620. // check that sync is non-blocking
  621. done := make(chan struct{})
  622. go func() {
  623. srv.sync(10 * time.Second)
  624. done <- struct{}{}
  625. }()
  626. select {
  627. case <-done:
  628. case <-time.After(time.Second):
  629. t.Fatal("sync should be non-blocking but did not return after 1s!")
  630. }
  631. testutil.WaitSchedule()
  632. action := n.Action()
  633. if len(action) != 1 {
  634. t.Fatalf("len(action) = %d, want 1", len(action))
  635. }
  636. if action[0].Name != "Propose" {
  637. t.Fatalf("action = %s, want Propose", action[0].Name)
  638. }
  639. data := action[0].Params[0].([]byte)
  640. var r pb.Request
  641. if err := r.Unmarshal(data); err != nil {
  642. t.Fatalf("unmarshal request error: %v", err)
  643. }
  644. if r.Method != "SYNC" {
  645. t.Errorf("method = %s, want SYNC", r.Method)
  646. }
  647. }
  648. // TestSyncTimeout tests the case that sync 1. is non-blocking 2. cancel request
  649. // after timeout
  650. func TestSyncTimeout(t *testing.T) {
  651. n := newProposalBlockerRecorder()
  652. srv := &EtcdServer{
  653. r: raftNode{Node: n},
  654. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  655. }
  656. // check that sync is non-blocking
  657. done := make(chan struct{})
  658. go func() {
  659. srv.sync(0)
  660. done <- struct{}{}
  661. }()
  662. select {
  663. case <-done:
  664. case <-time.After(time.Second):
  665. t.Fatal("sync should be non-blocking but did not return after 1s!")
  666. }
  667. // give time for goroutine in sync to cancel
  668. testutil.WaitSchedule()
  669. w := []testutil.Action{{Name: "Propose blocked"}}
  670. if g := n.Action(); !reflect.DeepEqual(g, w) {
  671. t.Errorf("action = %v, want %v", g, w)
  672. }
  673. }
  674. // TODO: TestNoSyncWhenNoLeader
  675. // TestSyncTrigger tests that the server proposes a SYNC request when its sync timer ticks
  676. func TestSyncTrigger(t *testing.T) {
  677. n := newReadyNode()
  678. st := make(chan time.Time, 1)
  679. srv := &EtcdServer{
  680. cfg: &ServerConfig{TickMs: 1},
  681. r: raftNode{
  682. Node: n,
  683. raftStorage: raft.NewMemoryStorage(),
  684. transport: rafthttp.NewNopTransporter(),
  685. storage: &storageRecorder{},
  686. },
  687. store: store.NewNop(),
  688. SyncTicker: st,
  689. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  690. }
  691. srv.start()
  692. defer srv.Stop()
  693. // trigger the server to become a leader and accept sync requests
  694. n.readyc <- raft.Ready{
  695. SoftState: &raft.SoftState{
  696. RaftState: raft.StateLeader,
  697. },
  698. }
  699. // trigger a sync request
  700. st <- time.Time{}
  701. testutil.WaitSchedule()
  702. action := n.Action()
  703. if len(action) != 1 {
  704. t.Fatalf("len(action) = %d, want 1", len(action))
  705. }
  706. if action[0].Name != "Propose" {
  707. t.Fatalf("action = %s, want Propose", action[0].Name)
  708. }
  709. data := action[0].Params[0].([]byte)
  710. var req pb.Request
  711. if err := req.Unmarshal(data); err != nil {
  712. t.Fatalf("error unmarshalling data: %v", err)
  713. }
  714. if req.Method != "SYNC" {
  715. t.Fatalf("unexpected proposed request: %#v", req.Method)
  716. }
  717. }
  718. // snapshot should snapshot the store and cut the persistent
  719. func TestSnapshot(t *testing.T) {
  720. s := raft.NewMemoryStorage()
  721. s.Append([]raftpb.Entry{{Index: 1}})
  722. st := store.NewRecorder()
  723. p := &storageRecorder{}
  724. srv := &EtcdServer{
  725. cfg: &ServerConfig{},
  726. r: raftNode{
  727. Node: newNodeNop(),
  728. raftStorage: s,
  729. storage: p,
  730. },
  731. store: st,
  732. }
  733. srv.snapshot(1, raftpb.ConfState{Nodes: []uint64{1}})
  734. testutil.WaitSchedule()
  735. gaction := st.Action()
  736. if len(gaction) != 2 {
  737. t.Fatalf("len(action) = %d, want 1", len(gaction))
  738. }
  739. if !reflect.DeepEqual(gaction[0], testutil.Action{Name: "Clone"}) {
  740. t.Errorf("action = %s, want Clone", gaction[0])
  741. }
  742. if !reflect.DeepEqual(gaction[1], testutil.Action{Name: "SaveNoCopy"}) {
  743. t.Errorf("action = %s, want SaveNoCopy", gaction[1])
  744. }
  745. gaction = p.Action()
  746. if len(gaction) != 1 {
  747. t.Fatalf("len(action) = %d, want 1", len(gaction))
  748. }
  749. if !reflect.DeepEqual(gaction[0], testutil.Action{Name: "SaveSnap"}) {
  750. t.Errorf("action = %s, want SaveSnap", gaction[0])
  751. }
  752. }
  753. // Applied > SnapCount should trigger a SaveSnap event
  754. func TestTriggerSnap(t *testing.T) {
  755. snapc := 10
  756. st := store.NewRecorder()
  757. p := &storageRecorder{}
  758. srv := &EtcdServer{
  759. cfg: &ServerConfig{TickMs: 1},
  760. snapCount: uint64(snapc),
  761. r: raftNode{
  762. Node: newNodeCommitter(),
  763. raftStorage: raft.NewMemoryStorage(),
  764. storage: p,
  765. transport: rafthttp.NewNopTransporter(),
  766. },
  767. store: st,
  768. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  769. }
  770. srv.start()
  771. for i := 0; i < snapc+1; i++ {
  772. srv.Do(context.Background(), pb.Request{Method: "PUT"})
  773. }
  774. srv.Stop()
  775. // wait for snapshot goroutine to finish
  776. testutil.WaitSchedule()
  777. gaction := p.Action()
  778. // each operation is recorded as a Save
  779. // (SnapCount+1) * Puts + SaveSnap = (SnapCount+1) * Save + SaveSnap
  780. wcnt := 2 + snapc
  781. if len(gaction) != wcnt {
  782. t.Fatalf("len(action) = %d, want %d", len(gaction), wcnt)
  783. }
  784. if !reflect.DeepEqual(gaction[wcnt-1], testutil.Action{Name: "SaveSnap"}) {
  785. t.Errorf("action = %s, want SaveSnap", gaction[wcnt-1])
  786. }
  787. }
  788. // TestConcurrentApplyAndSnapshotV3 will send out snapshots concurrently with
  789. // proposals.
  790. func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
  791. const (
  792. // snapshots that may queue up at once without dropping
  793. maxInFlightMsgSnap = 16
  794. )
  795. n := newReadyNode()
  796. cl := newCluster("abc")
  797. cl.SetStore(store.New())
  798. testdir, err := ioutil.TempDir(os.TempDir(), "testsnapdir")
  799. if err != nil {
  800. t.Fatalf("Couldn't open tempdir (%v)", err)
  801. }
  802. defer os.RemoveAll(testdir)
  803. if err := os.MkdirAll(testdir+"/member/snap", 0755); err != nil {
  804. t.Fatalf("Couldn't make snap dir (%v)", err)
  805. }
  806. rs := raft.NewMemoryStorage()
  807. tr, snapDoneC := rafthttp.NewSnapTransporter(testdir)
  808. s := &EtcdServer{
  809. cfg: &ServerConfig{
  810. V3demo: true,
  811. DataDir: testdir,
  812. },
  813. r: raftNode{
  814. Node: n,
  815. transport: tr,
  816. storage: &storageRecorder{dbPath: testdir},
  817. raftStorage: rs,
  818. },
  819. store: cl.store,
  820. cluster: cl,
  821. msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap),
  822. }
  823. s.kv = dstorage.New(
  824. path.Join(testdir, "testdb.db"),
  825. &s.consistIndex)
  826. s.start()
  827. defer s.Stop()
  828. // submit applied entries and snap entries
  829. idx := uint64(0)
  830. outdated := 0
  831. accepted := 0
  832. for k := 1; k <= 101; k++ {
  833. idx++
  834. ch := s.w.Register(uint64(idx))
  835. req := &pb.Request{Method: "QGET", ID: uint64(idx)}
  836. ent := raftpb.Entry{Index: uint64(idx), Data: pbutil.MustMarshal(req)}
  837. ready := raft.Ready{Entries: []raftpb.Entry{ent}}
  838. n.readyc <- ready
  839. ready = raft.Ready{CommittedEntries: []raftpb.Entry{ent}}
  840. n.readyc <- ready
  841. // "idx" applied
  842. <-ch
  843. // one snapshot for every two messages
  844. if k%2 != 0 {
  845. continue
  846. }
  847. n.readyc <- raft.Ready{Messages: []raftpb.Message{{Type: raftpb.MsgSnap}}}
  848. // get the snapshot sent by the transport
  849. snapMsg := <-snapDoneC
  850. // If the snapshot trails applied records, recovery will panic
  851. // since there's no allocated snapshot at the place of the
  852. // snapshot record. This only happens when the applier and the
  853. // snapshot sender get out of sync.
  854. if snapMsg.Snapshot.Metadata.Index == idx {
  855. idx++
  856. snapMsg.Snapshot.Metadata.Index = idx
  857. ready = raft.Ready{Snapshot: snapMsg.Snapshot}
  858. n.readyc <- ready
  859. accepted++
  860. } else {
  861. outdated++
  862. }
  863. // don't wait for the snapshot to complete, move to next message
  864. }
  865. if accepted != 50 {
  866. t.Errorf("accepted=%v, want 50", accepted)
  867. }
  868. if outdated != 0 {
  869. t.Errorf("outdated=%v, want 0", outdated)
  870. }
  871. }
  872. // TestRecvSnapshot tests when it receives a snapshot from raft leader,
  873. // it should trigger storage.SaveSnap and also store.Recover.
  874. func TestRecvSnapshot(t *testing.T) {
  875. n := newReadyNode()
  876. st := store.NewRecorder()
  877. p := &storageRecorder{}
  878. cl := newCluster("abc")
  879. cl.SetStore(store.New())
  880. s := &EtcdServer{
  881. cfg: &ServerConfig{},
  882. r: raftNode{
  883. Node: n,
  884. transport: rafthttp.NewNopTransporter(),
  885. storage: p,
  886. raftStorage: raft.NewMemoryStorage(),
  887. },
  888. store: st,
  889. cluster: cl,
  890. }
  891. s.start()
  892. n.readyc <- raft.Ready{Snapshot: raftpb.Snapshot{Metadata: raftpb.SnapshotMetadata{Index: 1}}}
  893. // wait for actions happened on the storage
  894. for len(p.Action()) == 0 {
  895. time.Sleep(10 * time.Millisecond)
  896. }
  897. s.Stop()
  898. wactions := []testutil.Action{{Name: "Recovery"}}
  899. if g := st.Action(); !reflect.DeepEqual(g, wactions) {
  900. t.Errorf("store action = %v, want %v", g, wactions)
  901. }
  902. wactions = []testutil.Action{{Name: "SaveSnap"}, {Name: "Save"}}
  903. if g := p.Action(); !reflect.DeepEqual(g, wactions) {
  904. t.Errorf("storage action = %v, want %v", g, wactions)
  905. }
  906. }
  907. // TestApplySnapshotAndCommittedEntries tests that server applies snapshot
  908. // first and then committed entries.
  909. func TestApplySnapshotAndCommittedEntries(t *testing.T) {
  910. n := newReadyNode()
  911. st := store.NewRecorder()
  912. cl := newCluster("abc")
  913. cl.SetStore(store.New())
  914. storage := raft.NewMemoryStorage()
  915. s := &EtcdServer{
  916. cfg: &ServerConfig{},
  917. r: raftNode{
  918. Node: n,
  919. storage: &storageRecorder{},
  920. raftStorage: storage,
  921. transport: rafthttp.NewNopTransporter(),
  922. },
  923. store: st,
  924. cluster: cl,
  925. }
  926. s.start()
  927. req := &pb.Request{Method: "QGET"}
  928. n.readyc <- raft.Ready{
  929. Snapshot: raftpb.Snapshot{Metadata: raftpb.SnapshotMetadata{Index: 1}},
  930. CommittedEntries: []raftpb.Entry{
  931. {Index: 2, Data: pbutil.MustMarshal(req)},
  932. },
  933. }
  934. // make goroutines move forward to receive snapshot
  935. testutil.WaitSchedule()
  936. s.Stop()
  937. actions := st.Action()
  938. if len(actions) != 2 {
  939. t.Fatalf("len(action) = %d, want 2", len(actions))
  940. }
  941. if actions[0].Name != "Recovery" {
  942. t.Errorf("actions[0] = %s, want %s", actions[0].Name, "Recovery")
  943. }
  944. if actions[1].Name != "Get" {
  945. t.Errorf("actions[1] = %s, want %s", actions[1].Name, "Get")
  946. }
  947. }
  948. // TestAddMember tests AddMember can propose and perform node addition.
  949. func TestAddMember(t *testing.T) {
  950. n := newNodeConfChangeCommitterRecorder()
  951. n.readyc <- raft.Ready{
  952. SoftState: &raft.SoftState{RaftState: raft.StateLeader},
  953. }
  954. cl := newTestCluster(nil)
  955. st := store.New()
  956. cl.SetStore(st)
  957. s := &EtcdServer{
  958. r: raftNode{
  959. Node: n,
  960. raftStorage: raft.NewMemoryStorage(),
  961. storage: &storageRecorder{},
  962. transport: rafthttp.NewNopTransporter(),
  963. },
  964. cfg: &ServerConfig{},
  965. store: st,
  966. cluster: cl,
  967. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  968. }
  969. s.start()
  970. m := Member{ID: 1234, RaftAttributes: RaftAttributes{PeerURLs: []string{"foo"}}}
  971. err := s.AddMember(context.TODO(), m)
  972. gaction := n.Action()
  973. s.Stop()
  974. if err != nil {
  975. t.Fatalf("AddMember error: %v", err)
  976. }
  977. wactions := []testutil.Action{{Name: "ProposeConfChange:ConfChangeAddNode"}, {Name: "ApplyConfChange:ConfChangeAddNode"}}
  978. if !reflect.DeepEqual(gaction, wactions) {
  979. t.Errorf("action = %v, want %v", gaction, wactions)
  980. }
  981. if cl.Member(1234) == nil {
  982. t.Errorf("member with id 1234 is not added")
  983. }
  984. }
  985. // TestRemoveMember tests RemoveMember can propose and perform node removal.
  986. func TestRemoveMember(t *testing.T) {
  987. n := newNodeConfChangeCommitterRecorder()
  988. n.readyc <- raft.Ready{
  989. SoftState: &raft.SoftState{RaftState: raft.StateLeader},
  990. }
  991. cl := newTestCluster(nil)
  992. st := store.New()
  993. cl.SetStore(store.New())
  994. cl.AddMember(&Member{ID: 1234})
  995. s := &EtcdServer{
  996. r: raftNode{
  997. Node: n,
  998. raftStorage: raft.NewMemoryStorage(),
  999. storage: &storageRecorder{},
  1000. transport: rafthttp.NewNopTransporter(),
  1001. },
  1002. cfg: &ServerConfig{},
  1003. store: st,
  1004. cluster: cl,
  1005. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  1006. }
  1007. s.start()
  1008. err := s.RemoveMember(context.TODO(), 1234)
  1009. gaction := n.Action()
  1010. s.Stop()
  1011. if err != nil {
  1012. t.Fatalf("RemoveMember error: %v", err)
  1013. }
  1014. wactions := []testutil.Action{{Name: "ProposeConfChange:ConfChangeRemoveNode"}, {Name: "ApplyConfChange:ConfChangeRemoveNode"}}
  1015. if !reflect.DeepEqual(gaction, wactions) {
  1016. t.Errorf("action = %v, want %v", gaction, wactions)
  1017. }
  1018. if cl.Member(1234) != nil {
  1019. t.Errorf("member with id 1234 is not removed")
  1020. }
  1021. }
  1022. // TestUpdateMember tests RemoveMember can propose and perform node update.
  1023. func TestUpdateMember(t *testing.T) {
  1024. n := newNodeConfChangeCommitterRecorder()
  1025. n.readyc <- raft.Ready{
  1026. SoftState: &raft.SoftState{RaftState: raft.StateLeader},
  1027. }
  1028. cl := newTestCluster(nil)
  1029. st := store.New()
  1030. cl.SetStore(st)
  1031. cl.AddMember(&Member{ID: 1234})
  1032. s := &EtcdServer{
  1033. r: raftNode{
  1034. Node: n,
  1035. raftStorage: raft.NewMemoryStorage(),
  1036. storage: &storageRecorder{},
  1037. transport: rafthttp.NewNopTransporter(),
  1038. },
  1039. store: st,
  1040. cluster: cl,
  1041. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  1042. }
  1043. s.start()
  1044. wm := Member{ID: 1234, RaftAttributes: RaftAttributes{PeerURLs: []string{"http://127.0.0.1:1"}}}
  1045. err := s.UpdateMember(context.TODO(), wm)
  1046. gaction := n.Action()
  1047. s.Stop()
  1048. if err != nil {
  1049. t.Fatalf("UpdateMember error: %v", err)
  1050. }
  1051. wactions := []testutil.Action{{Name: "ProposeConfChange:ConfChangeUpdateNode"}, {Name: "ApplyConfChange:ConfChangeUpdateNode"}}
  1052. if !reflect.DeepEqual(gaction, wactions) {
  1053. t.Errorf("action = %v, want %v", gaction, wactions)
  1054. }
  1055. if !reflect.DeepEqual(cl.Member(1234), &wm) {
  1056. t.Errorf("member = %v, want %v", cl.Member(1234), &wm)
  1057. }
  1058. }
  1059. // TODO: test server could stop itself when being removed
  1060. func TestPublish(t *testing.T) {
  1061. n := newNodeRecorder()
  1062. ch := make(chan interface{}, 1)
  1063. // simulate that request has gone through consensus
  1064. ch <- Response{}
  1065. w := wait.NewWithResponse(ch)
  1066. srv := &EtcdServer{
  1067. cfg: &ServerConfig{TickMs: 1},
  1068. id: 1,
  1069. r: raftNode{Node: n},
  1070. attributes: Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}},
  1071. cluster: &cluster{},
  1072. w: w,
  1073. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  1074. }
  1075. srv.publish(time.Hour)
  1076. action := n.Action()
  1077. if len(action) != 1 {
  1078. t.Fatalf("len(action) = %d, want 1", len(action))
  1079. }
  1080. if action[0].Name != "Propose" {
  1081. t.Fatalf("action = %s, want Propose", action[0].Name)
  1082. }
  1083. data := action[0].Params[0].([]byte)
  1084. var r pb.Request
  1085. if err := r.Unmarshal(data); err != nil {
  1086. t.Fatalf("unmarshal request error: %v", err)
  1087. }
  1088. if r.Method != "PUT" {
  1089. t.Errorf("method = %s, want PUT", r.Method)
  1090. }
  1091. wm := Member{ID: 1, Attributes: Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}}}
  1092. if wpath := path.Join(memberStoreKey(wm.ID), attributesSuffix); r.Path != wpath {
  1093. t.Errorf("path = %s, want %s", r.Path, wpath)
  1094. }
  1095. var gattr Attributes
  1096. if err := json.Unmarshal([]byte(r.Val), &gattr); err != nil {
  1097. t.Fatalf("unmarshal val error: %v", err)
  1098. }
  1099. if !reflect.DeepEqual(gattr, wm.Attributes) {
  1100. t.Errorf("member = %v, want %v", gattr, wm.Attributes)
  1101. }
  1102. }
  1103. // TestPublishStopped tests that publish will be stopped if server is stopped.
  1104. func TestPublishStopped(t *testing.T) {
  1105. srv := &EtcdServer{
  1106. cfg: &ServerConfig{TickMs: 1},
  1107. r: raftNode{
  1108. Node: newNodeNop(),
  1109. transport: rafthttp.NewNopTransporter(),
  1110. },
  1111. cluster: &cluster{},
  1112. w: wait.NewNop(),
  1113. done: make(chan struct{}),
  1114. stop: make(chan struct{}),
  1115. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  1116. }
  1117. close(srv.done)
  1118. srv.publish(time.Hour)
  1119. }
  1120. // TestPublishRetry tests that publish will keep retry until success.
  1121. func TestPublishRetry(t *testing.T) {
  1122. n := newNodeRecorder()
  1123. srv := &EtcdServer{
  1124. cfg: &ServerConfig{TickMs: 1},
  1125. r: raftNode{Node: n},
  1126. w: wait.NewNop(),
  1127. done: make(chan struct{}),
  1128. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  1129. }
  1130. // TODO: use fakeClockwork
  1131. time.AfterFunc(10*time.Millisecond, func() { close(srv.done) })
  1132. srv.publish(10 * time.Nanosecond)
  1133. action := n.Action()
  1134. // multiple Proposes
  1135. if cnt := len(action); cnt < 2 {
  1136. t.Errorf("len(action) = %d, want >= 2", cnt)
  1137. }
  1138. }
  1139. func TestUpdateVersion(t *testing.T) {
  1140. n := newNodeRecorder()
  1141. ch := make(chan interface{}, 1)
  1142. // simulate that request has gone through consensus
  1143. ch <- Response{}
  1144. w := wait.NewWithResponse(ch)
  1145. srv := &EtcdServer{
  1146. id: 1,
  1147. cfg: &ServerConfig{TickMs: 1},
  1148. r: raftNode{Node: n},
  1149. attributes: Attributes{Name: "node1", ClientURLs: []string{"http://node1.com"}},
  1150. cluster: &cluster{},
  1151. w: w,
  1152. reqIDGen: idutil.NewGenerator(0, time.Time{}),
  1153. }
  1154. srv.updateClusterVersion("2.0.0")
  1155. action := n.Action()
  1156. if len(action) != 1 {
  1157. t.Fatalf("len(action) = %d, want 1", len(action))
  1158. }
  1159. if action[0].Name != "Propose" {
  1160. t.Fatalf("action = %s, want Propose", action[0].Name)
  1161. }
  1162. data := action[0].Params[0].([]byte)
  1163. var r pb.Request
  1164. if err := r.Unmarshal(data); err != nil {
  1165. t.Fatalf("unmarshal request error: %v", err)
  1166. }
  1167. if r.Method != "PUT" {
  1168. t.Errorf("method = %s, want PUT", r.Method)
  1169. }
  1170. if wpath := path.Join(StoreClusterPrefix, "version"); r.Path != wpath {
  1171. t.Errorf("path = %s, want %s", r.Path, wpath)
  1172. }
  1173. if r.Val != "2.0.0" {
  1174. t.Errorf("val = %s, want %s", r.Val, "2.0.0")
  1175. }
  1176. }
  1177. func TestStopNotify(t *testing.T) {
  1178. s := &EtcdServer{
  1179. stop: make(chan struct{}),
  1180. done: make(chan struct{}),
  1181. }
  1182. go func() {
  1183. <-s.stop
  1184. close(s.done)
  1185. }()
  1186. notifier := s.StopNotify()
  1187. select {
  1188. case <-notifier:
  1189. t.Fatalf("received unexpected stop notification")
  1190. default:
  1191. }
  1192. s.Stop()
  1193. select {
  1194. case <-notifier:
  1195. default:
  1196. t.Fatalf("cannot receive stop notification")
  1197. }
  1198. }
  1199. func TestGetOtherPeerURLs(t *testing.T) {
  1200. tests := []struct {
  1201. membs []*Member
  1202. self string
  1203. wurls []string
  1204. }{
  1205. {
  1206. []*Member{
  1207. newTestMember(1, []string{"http://10.0.0.1"}, "a", nil),
  1208. },
  1209. "a",
  1210. []string{},
  1211. },
  1212. {
  1213. []*Member{
  1214. newTestMember(1, []string{"http://10.0.0.1"}, "a", nil),
  1215. newTestMember(2, []string{"http://10.0.0.2"}, "b", nil),
  1216. newTestMember(3, []string{"http://10.0.0.3"}, "c", nil),
  1217. },
  1218. "a",
  1219. []string{"http://10.0.0.2", "http://10.0.0.3"},
  1220. },
  1221. {
  1222. []*Member{
  1223. newTestMember(1, []string{"http://10.0.0.1"}, "a", nil),
  1224. newTestMember(3, []string{"http://10.0.0.3"}, "c", nil),
  1225. newTestMember(2, []string{"http://10.0.0.2"}, "b", nil),
  1226. },
  1227. "a",
  1228. []string{"http://10.0.0.2", "http://10.0.0.3"},
  1229. },
  1230. }
  1231. for i, tt := range tests {
  1232. cl := newClusterFromMembers("", types.ID(0), tt.membs)
  1233. urls := getRemotePeerURLs(cl, tt.self)
  1234. if !reflect.DeepEqual(urls, tt.wurls) {
  1235. t.Errorf("#%d: urls = %+v, want %+v", i, urls, tt.wurls)
  1236. }
  1237. }
  1238. }
  1239. type nodeRecorder struct{ testutil.Recorder }
  1240. func newNodeRecorder() *nodeRecorder { return &nodeRecorder{} }
  1241. func newNodeNop() raft.Node { return newNodeRecorder() }
  1242. func (n *nodeRecorder) Tick() { n.Record(testutil.Action{Name: "Tick"}) }
  1243. func (n *nodeRecorder) Campaign(ctx context.Context) error {
  1244. n.Record(testutil.Action{Name: "Campaign"})
  1245. return nil
  1246. }
  1247. func (n *nodeRecorder) Propose(ctx context.Context, data []byte) error {
  1248. n.Record(testutil.Action{Name: "Propose", Params: []interface{}{data}})
  1249. return nil
  1250. }
  1251. func (n *nodeRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error {
  1252. n.Record(testutil.Action{Name: "ProposeConfChange"})
  1253. return nil
  1254. }
  1255. func (n *nodeRecorder) Step(ctx context.Context, msg raftpb.Message) error {
  1256. n.Record(testutil.Action{Name: "Step"})
  1257. return nil
  1258. }
  1259. func (n *nodeRecorder) Status() raft.Status { return raft.Status{} }
  1260. func (n *nodeRecorder) Ready() <-chan raft.Ready { return nil }
  1261. func (n *nodeRecorder) Advance() {}
  1262. func (n *nodeRecorder) ApplyConfChange(conf raftpb.ConfChange) *raftpb.ConfState {
  1263. n.Record(testutil.Action{Name: "ApplyConfChange", Params: []interface{}{conf}})
  1264. return &raftpb.ConfState{}
  1265. }
  1266. func (n *nodeRecorder) Stop() {
  1267. n.Record(testutil.Action{Name: "Stop"})
  1268. }
  1269. func (n *nodeRecorder) ReportUnreachable(id uint64) {}
  1270. func (n *nodeRecorder) ReportSnapshot(id uint64, status raft.SnapshotStatus) {}
  1271. func (n *nodeRecorder) Compact(index uint64, nodes []uint64, d []byte) {
  1272. n.Record(testutil.Action{Name: "Compact"})
  1273. }
  1274. type nodeProposalBlockerRecorder struct {
  1275. nodeRecorder
  1276. }
  1277. func newProposalBlockerRecorder() *nodeProposalBlockerRecorder {
  1278. return &nodeProposalBlockerRecorder{*newNodeRecorder()}
  1279. }
  1280. func (n *nodeProposalBlockerRecorder) Propose(ctx context.Context, data []byte) error {
  1281. <-ctx.Done()
  1282. n.Record(testutil.Action{Name: "Propose blocked"})
  1283. return nil
  1284. }
  1285. // readyNode is a nodeRecorder with a user-writeable ready channel
  1286. type readyNode struct {
  1287. nodeRecorder
  1288. readyc chan raft.Ready
  1289. }
  1290. func newReadyNode() *readyNode {
  1291. return &readyNode{*newNodeRecorder(), make(chan raft.Ready, 1)}
  1292. }
  1293. func (n *readyNode) Ready() <-chan raft.Ready { return n.readyc }
  1294. type nodeConfChangeCommitterRecorder struct {
  1295. readyNode
  1296. index uint64
  1297. }
  1298. func newNodeConfChangeCommitterRecorder() *nodeConfChangeCommitterRecorder {
  1299. return &nodeConfChangeCommitterRecorder{*newReadyNode(), 0}
  1300. }
  1301. func (n *nodeConfChangeCommitterRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error {
  1302. data, err := conf.Marshal()
  1303. if err != nil {
  1304. return err
  1305. }
  1306. n.index++
  1307. n.Record(testutil.Action{Name: "ProposeConfChange:" + conf.Type.String()})
  1308. n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Index: n.index, Type: raftpb.EntryConfChange, Data: data}}}
  1309. return nil
  1310. }
  1311. func (n *nodeConfChangeCommitterRecorder) Ready() <-chan raft.Ready {
  1312. return n.readyc
  1313. }
  1314. func (n *nodeConfChangeCommitterRecorder) ApplyConfChange(conf raftpb.ConfChange) *raftpb.ConfState {
  1315. n.Record(testutil.Action{Name: "ApplyConfChange:" + conf.Type.String()})
  1316. return &raftpb.ConfState{}
  1317. }
  1318. // nodeCommitter commits proposed data immediately.
  1319. type nodeCommitter struct {
  1320. readyNode
  1321. index uint64
  1322. }
  1323. func newNodeCommitter() raft.Node {
  1324. return &nodeCommitter{*newReadyNode(), 0}
  1325. }
  1326. func (n *nodeCommitter) Propose(ctx context.Context, data []byte) error {
  1327. n.index++
  1328. ents := []raftpb.Entry{{Index: n.index, Data: data}}
  1329. n.readyc <- raft.Ready{
  1330. Entries: ents,
  1331. CommittedEntries: ents,
  1332. }
  1333. return nil
  1334. }