client_test.go 34 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package etcdhttp
  14. import (
  15. "bytes"
  16. "encoding/json"
  17. "errors"
  18. "fmt"
  19. "io/ioutil"
  20. "net/http"
  21. "net/http/httptest"
  22. "net/url"
  23. "path"
  24. "reflect"
  25. "strings"
  26. "testing"
  27. "time"
  28. "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
  29. "github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork"
  30. etcdErr "github.com/coreos/etcd/error"
  31. "github.com/coreos/etcd/etcdserver"
  32. "github.com/coreos/etcd/etcdserver/etcdserverpb"
  33. "github.com/coreos/etcd/raft/raftpb"
  34. "github.com/coreos/etcd/store"
  35. "github.com/coreos/etcd/version"
  36. )
  37. func mustMarshalEvent(t *testing.T, ev *store.Event) string {
  38. b := new(bytes.Buffer)
  39. if err := json.NewEncoder(b).Encode(ev); err != nil {
  40. t.Fatalf("error marshalling event %#v: %v", ev, err)
  41. }
  42. return b.String()
  43. }
  44. // mustNewForm takes a set of Values and constructs a PUT *http.Request,
  45. // with a URL constructed from appending the given path to the standard keysPrefix
  46. func mustNewForm(t *testing.T, p string, vals url.Values) *http.Request {
  47. u := mustNewURL(t, path.Join(keysPrefix, p))
  48. req, err := http.NewRequest("PUT", u.String(), strings.NewReader(vals.Encode()))
  49. req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
  50. if err != nil {
  51. t.Fatalf("error creating new request: %v", err)
  52. }
  53. return req
  54. }
  55. // mustNewRequest takes a path, appends it to the standard keysPrefix, and constructs
  56. // a GET *http.Request referencing the resulting URL
  57. func mustNewRequest(t *testing.T, p string) *http.Request {
  58. return mustNewMethodRequest(t, "GET", p)
  59. }
  60. func mustNewMethodRequest(t *testing.T, m, p string) *http.Request {
  61. return &http.Request{
  62. Method: m,
  63. URL: mustNewURL(t, path.Join(keysPrefix, p)),
  64. }
  65. }
  66. type serverRecorder struct {
  67. actions []action
  68. }
  69. func (s *serverRecorder) Do(_ context.Context, r etcdserverpb.Request) (etcdserver.Response, error) {
  70. s.actions = append(s.actions, action{name: "Do", params: []interface{}{r}})
  71. return etcdserver.Response{}, nil
  72. }
  73. func (s *serverRecorder) Process(_ context.Context, m raftpb.Message) error {
  74. s.actions = append(s.actions, action{name: "Process", params: []interface{}{m}})
  75. return nil
  76. }
  77. func (s *serverRecorder) Start() {}
  78. func (s *serverRecorder) Stop() {}
  79. func (s *serverRecorder) AddMember(_ context.Context, m etcdserver.Member) error {
  80. s.actions = append(s.actions, action{name: "AddMember", params: []interface{}{m}})
  81. return nil
  82. }
  83. func (s *serverRecorder) RemoveMember(_ context.Context, id uint64) error {
  84. s.actions = append(s.actions, action{name: "RemoveMember", params: []interface{}{id}})
  85. return nil
  86. }
  87. type action struct {
  88. name string
  89. params []interface{}
  90. }
  91. // flushingRecorder provides a channel to allow users to block until the Recorder is Flushed()
  92. type flushingRecorder struct {
  93. *httptest.ResponseRecorder
  94. ch chan struct{}
  95. }
  96. func (fr *flushingRecorder) Flush() {
  97. fr.ResponseRecorder.Flush()
  98. fr.ch <- struct{}{}
  99. }
  100. // resServer implements the etcd.Server interface for testing.
  101. // It returns the given responsefrom any Do calls, and nil error
  102. type resServer struct {
  103. res etcdserver.Response
  104. }
  105. func (rs *resServer) Do(_ context.Context, _ etcdserverpb.Request) (etcdserver.Response, error) {
  106. return rs.res, nil
  107. }
  108. func (rs *resServer) Process(_ context.Context, _ raftpb.Message) error { return nil }
  109. func (rs *resServer) Start() {}
  110. func (rs *resServer) Stop() {}
  111. func (rs *resServer) AddMember(_ context.Context, _ etcdserver.Member) error { return nil }
  112. func (rs *resServer) RemoveMember(_ context.Context, _ uint64) error { return nil }
  113. func boolp(b bool) *bool { return &b }
  114. type dummyRaftTimer struct{}
  115. func (drt dummyRaftTimer) Index() uint64 { return uint64(100) }
  116. func (drt dummyRaftTimer) Term() uint64 { return uint64(5) }
  117. type dummyWatcher struct {
  118. echan chan *store.Event
  119. sidx uint64
  120. }
  121. func (w *dummyWatcher) EventChan() chan *store.Event {
  122. return w.echan
  123. }
  124. func (w *dummyWatcher) StartIndex() uint64 { return w.sidx }
  125. func (w *dummyWatcher) Remove() {}
  126. func TestBadParseRequest(t *testing.T) {
  127. tests := []struct {
  128. in *http.Request
  129. wcode int
  130. }{
  131. {
  132. // parseForm failure
  133. &http.Request{
  134. Body: nil,
  135. Method: "PUT",
  136. },
  137. etcdErr.EcodeInvalidForm,
  138. },
  139. {
  140. // bad key prefix
  141. &http.Request{
  142. URL: mustNewURL(t, "/badprefix/"),
  143. },
  144. etcdErr.EcodeInvalidForm,
  145. },
  146. // bad values for prevIndex, waitIndex, ttl
  147. {
  148. mustNewForm(t, "foo", url.Values{"prevIndex": []string{"garbage"}}),
  149. etcdErr.EcodeIndexNaN,
  150. },
  151. {
  152. mustNewForm(t, "foo", url.Values{"prevIndex": []string{"1.5"}}),
  153. etcdErr.EcodeIndexNaN,
  154. },
  155. {
  156. mustNewForm(t, "foo", url.Values{"prevIndex": []string{"-1"}}),
  157. etcdErr.EcodeIndexNaN,
  158. },
  159. {
  160. mustNewForm(t, "foo", url.Values{"waitIndex": []string{"garbage"}}),
  161. etcdErr.EcodeIndexNaN,
  162. },
  163. {
  164. mustNewForm(t, "foo", url.Values{"waitIndex": []string{"??"}}),
  165. etcdErr.EcodeIndexNaN,
  166. },
  167. {
  168. mustNewForm(t, "foo", url.Values{"ttl": []string{"-1"}}),
  169. etcdErr.EcodeTTLNaN,
  170. },
  171. // bad values for recursive, sorted, wait, prevExist, dir, stream
  172. {
  173. mustNewForm(t, "foo", url.Values{"recursive": []string{"hahaha"}}),
  174. etcdErr.EcodeInvalidField,
  175. },
  176. {
  177. mustNewForm(t, "foo", url.Values{"recursive": []string{"1234"}}),
  178. etcdErr.EcodeInvalidField,
  179. },
  180. {
  181. mustNewForm(t, "foo", url.Values{"recursive": []string{"?"}}),
  182. etcdErr.EcodeInvalidField,
  183. },
  184. {
  185. mustNewForm(t, "foo", url.Values{"sorted": []string{"?"}}),
  186. etcdErr.EcodeInvalidField,
  187. },
  188. {
  189. mustNewForm(t, "foo", url.Values{"sorted": []string{"x"}}),
  190. etcdErr.EcodeInvalidField,
  191. },
  192. {
  193. mustNewForm(t, "foo", url.Values{"wait": []string{"?!"}}),
  194. etcdErr.EcodeInvalidField,
  195. },
  196. {
  197. mustNewForm(t, "foo", url.Values{"wait": []string{"yes"}}),
  198. etcdErr.EcodeInvalidField,
  199. },
  200. {
  201. mustNewForm(t, "foo", url.Values{"prevExist": []string{"yes"}}),
  202. etcdErr.EcodeInvalidField,
  203. },
  204. {
  205. mustNewForm(t, "foo", url.Values{"prevExist": []string{"#2"}}),
  206. etcdErr.EcodeInvalidField,
  207. },
  208. {
  209. mustNewForm(t, "foo", url.Values{"dir": []string{"no"}}),
  210. etcdErr.EcodeInvalidField,
  211. },
  212. {
  213. mustNewForm(t, "foo", url.Values{"dir": []string{"file"}}),
  214. etcdErr.EcodeInvalidField,
  215. },
  216. {
  217. mustNewForm(t, "foo", url.Values{"quorum": []string{"no"}}),
  218. etcdErr.EcodeInvalidField,
  219. },
  220. {
  221. mustNewForm(t, "foo", url.Values{"quorum": []string{"file"}}),
  222. etcdErr.EcodeInvalidField,
  223. },
  224. {
  225. mustNewForm(t, "foo", url.Values{"stream": []string{"zzz"}}),
  226. etcdErr.EcodeInvalidField,
  227. },
  228. {
  229. mustNewForm(t, "foo", url.Values{"stream": []string{"something"}}),
  230. etcdErr.EcodeInvalidField,
  231. },
  232. // prevValue cannot be empty
  233. {
  234. mustNewForm(t, "foo", url.Values{"prevValue": []string{""}}),
  235. etcdErr.EcodeInvalidField,
  236. },
  237. // wait is only valid with GET requests
  238. {
  239. mustNewMethodRequest(t, "HEAD", "foo?wait=true"),
  240. etcdErr.EcodeInvalidField,
  241. },
  242. // query values are considered
  243. {
  244. mustNewRequest(t, "foo?prevExist=wrong"),
  245. etcdErr.EcodeInvalidField,
  246. },
  247. {
  248. mustNewRequest(t, "foo?ttl=wrong"),
  249. etcdErr.EcodeTTLNaN,
  250. },
  251. // but body takes precedence if both are specified
  252. {
  253. mustNewForm(
  254. t,
  255. "foo?ttl=12",
  256. url.Values{"ttl": []string{"garbage"}},
  257. ),
  258. etcdErr.EcodeTTLNaN,
  259. },
  260. {
  261. mustNewForm(
  262. t,
  263. "foo?prevExist=false",
  264. url.Values{"prevExist": []string{"yes"}},
  265. ),
  266. etcdErr.EcodeInvalidField,
  267. },
  268. }
  269. for i, tt := range tests {
  270. got, err := parseKeyRequest(tt.in, 1234, clockwork.NewFakeClock())
  271. if err == nil {
  272. t.Errorf("#%d: unexpected nil error!", i)
  273. continue
  274. }
  275. ee, ok := err.(*etcdErr.Error)
  276. if !ok {
  277. t.Errorf("#%d: err is not etcd.Error!", i)
  278. continue
  279. }
  280. if ee.ErrorCode != tt.wcode {
  281. t.Errorf("#%d: code=%d, want %v", i, ee.ErrorCode, tt.wcode)
  282. t.Logf("cause: %#v", ee.Cause)
  283. }
  284. if !reflect.DeepEqual(got, etcdserverpb.Request{}) {
  285. t.Errorf("#%d: unexpected non-empty Request: %#v", i, got)
  286. }
  287. }
  288. }
  289. func TestGoodParseRequest(t *testing.T) {
  290. fc := clockwork.NewFakeClock()
  291. fc.Advance(1111)
  292. tests := []struct {
  293. in *http.Request
  294. w etcdserverpb.Request
  295. }{
  296. {
  297. // good prefix, all other values default
  298. mustNewRequest(t, "foo"),
  299. etcdserverpb.Request{
  300. ID: 1234,
  301. Method: "GET",
  302. Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
  303. },
  304. },
  305. {
  306. // value specified
  307. mustNewForm(
  308. t,
  309. "foo",
  310. url.Values{"value": []string{"some_value"}},
  311. ),
  312. etcdserverpb.Request{
  313. ID: 1234,
  314. Method: "PUT",
  315. Val: "some_value",
  316. Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
  317. },
  318. },
  319. {
  320. // prevIndex specified
  321. mustNewForm(
  322. t,
  323. "foo",
  324. url.Values{"prevIndex": []string{"98765"}},
  325. ),
  326. etcdserverpb.Request{
  327. ID: 1234,
  328. Method: "PUT",
  329. PrevIndex: 98765,
  330. Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
  331. },
  332. },
  333. {
  334. // recursive specified
  335. mustNewForm(
  336. t,
  337. "foo",
  338. url.Values{"recursive": []string{"true"}},
  339. ),
  340. etcdserverpb.Request{
  341. ID: 1234,
  342. Method: "PUT",
  343. Recursive: true,
  344. Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
  345. },
  346. },
  347. {
  348. // sorted specified
  349. mustNewForm(
  350. t,
  351. "foo",
  352. url.Values{"sorted": []string{"true"}},
  353. ),
  354. etcdserverpb.Request{
  355. ID: 1234,
  356. Method: "PUT",
  357. Sorted: true,
  358. Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
  359. },
  360. },
  361. {
  362. // quorum specified
  363. mustNewForm(
  364. t,
  365. "foo",
  366. url.Values{"quorum": []string{"true"}},
  367. ),
  368. etcdserverpb.Request{
  369. ID: 1234,
  370. Method: "PUT",
  371. Quorum: true,
  372. Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
  373. },
  374. },
  375. {
  376. // wait specified
  377. mustNewRequest(t, "foo?wait=true"),
  378. etcdserverpb.Request{
  379. ID: 1234,
  380. Method: "GET",
  381. Wait: true,
  382. Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
  383. },
  384. },
  385. {
  386. // empty TTL specified
  387. mustNewRequest(t, "foo?ttl="),
  388. etcdserverpb.Request{
  389. ID: 1234,
  390. Method: "GET",
  391. Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
  392. Expiration: 0,
  393. },
  394. },
  395. {
  396. // non-empty TTL specified
  397. mustNewRequest(t, "foo?ttl=5678"),
  398. etcdserverpb.Request{
  399. ID: 1234,
  400. Method: "GET",
  401. Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
  402. Expiration: fc.Now().Add(5678 * time.Second).UnixNano(),
  403. },
  404. },
  405. {
  406. // zero TTL specified
  407. mustNewRequest(t, "foo?ttl=0"),
  408. etcdserverpb.Request{
  409. ID: 1234,
  410. Method: "GET",
  411. Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
  412. Expiration: fc.Now().UnixNano(),
  413. },
  414. },
  415. {
  416. // dir specified
  417. mustNewRequest(t, "foo?dir=true"),
  418. etcdserverpb.Request{
  419. ID: 1234,
  420. Method: "GET",
  421. Dir: true,
  422. Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
  423. },
  424. },
  425. {
  426. // dir specified negatively
  427. mustNewRequest(t, "foo?dir=false"),
  428. etcdserverpb.Request{
  429. ID: 1234,
  430. Method: "GET",
  431. Dir: false,
  432. Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
  433. },
  434. },
  435. {
  436. // prevExist should be non-null if specified
  437. mustNewForm(
  438. t,
  439. "foo",
  440. url.Values{"prevExist": []string{"true"}},
  441. ),
  442. etcdserverpb.Request{
  443. ID: 1234,
  444. Method: "PUT",
  445. PrevExist: boolp(true),
  446. Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
  447. },
  448. },
  449. {
  450. // prevExist should be non-null if specified
  451. mustNewForm(
  452. t,
  453. "foo",
  454. url.Values{"prevExist": []string{"false"}},
  455. ),
  456. etcdserverpb.Request{
  457. ID: 1234,
  458. Method: "PUT",
  459. PrevExist: boolp(false),
  460. Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
  461. },
  462. },
  463. // mix various fields
  464. {
  465. mustNewForm(
  466. t,
  467. "foo",
  468. url.Values{
  469. "value": []string{"some value"},
  470. "prevExist": []string{"true"},
  471. "prevValue": []string{"previous value"},
  472. },
  473. ),
  474. etcdserverpb.Request{
  475. ID: 1234,
  476. Method: "PUT",
  477. PrevExist: boolp(true),
  478. PrevValue: "previous value",
  479. Val: "some value",
  480. Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
  481. },
  482. },
  483. // query parameters should be used if given
  484. {
  485. mustNewForm(
  486. t,
  487. "foo?prevValue=woof",
  488. url.Values{},
  489. ),
  490. etcdserverpb.Request{
  491. ID: 1234,
  492. Method: "PUT",
  493. PrevValue: "woof",
  494. Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
  495. },
  496. },
  497. // but form values should take precedence over query parameters
  498. {
  499. mustNewForm(
  500. t,
  501. "foo?prevValue=woof",
  502. url.Values{
  503. "prevValue": []string{"miaow"},
  504. },
  505. ),
  506. etcdserverpb.Request{
  507. ID: 1234,
  508. Method: "PUT",
  509. PrevValue: "miaow",
  510. Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
  511. },
  512. },
  513. }
  514. for i, tt := range tests {
  515. got, err := parseKeyRequest(tt.in, 1234, fc)
  516. if err != nil {
  517. t.Errorf("#%d: err = %v, want %v", i, err, nil)
  518. }
  519. if !reflect.DeepEqual(got, tt.w) {
  520. t.Errorf("#%d: request=%#v, want %#v", i, got, tt.w)
  521. }
  522. }
  523. }
  524. func TestServeAdminMembers(t *testing.T) {
  525. memb1 := etcdserver.Member{ID: 1, Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8080"}}}
  526. memb2 := etcdserver.Member{ID: 2, Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8081"}}}
  527. cluster := &fakeCluster{
  528. members: map[uint64]*etcdserver.Member{1: &memb1, 2: &memb2},
  529. }
  530. h := &adminMembersHandler{
  531. server: &serverRecorder{},
  532. clock: clockwork.NewFakeClock(),
  533. clusterInfo: cluster,
  534. }
  535. msb, err := json.Marshal(
  536. struct {
  537. Members []etcdserver.Member `json:"members"`
  538. }{
  539. Members: []etcdserver.Member{memb1, memb2},
  540. },
  541. )
  542. if err != nil {
  543. t.Fatal(err)
  544. }
  545. wms := string(msb) + "\n"
  546. tests := []struct {
  547. path string
  548. wcode int
  549. wct string
  550. wbody string
  551. }{
  552. {adminMembersPrefix, http.StatusOK, "application/json", wms},
  553. {path.Join(adminMembersPrefix, "100"), http.StatusNotFound, "text/plain; charset=utf-8", "404 page not found\n"},
  554. {path.Join(adminMembersPrefix, "foobar"), http.StatusNotFound, "text/plain; charset=utf-8", "404 page not found\n"},
  555. }
  556. for i, tt := range tests {
  557. req, err := http.NewRequest("GET", mustNewURL(t, tt.path).String(), nil)
  558. if err != nil {
  559. t.Fatal(err)
  560. }
  561. rw := httptest.NewRecorder()
  562. h.ServeHTTP(rw, req)
  563. if rw.Code != tt.wcode {
  564. t.Errorf("#%d: code=%d, want %d", i, rw.Code, tt.wcode)
  565. }
  566. if gct := rw.Header().Get("Content-Type"); gct != tt.wct {
  567. t.Errorf("#%d: content-type = %s, want %s", i, gct, tt.wct)
  568. }
  569. if rw.Body.String() != tt.wbody {
  570. t.Errorf("#%d: body = %q, want %q", i, rw.Body.String(), tt.wbody)
  571. }
  572. }
  573. }
  574. func TestServeAdminMembersPut(t *testing.T) {
  575. u := mustNewURL(t, adminMembersPrefix)
  576. raftAttr := etcdserver.RaftAttributes{PeerURLs: []string{"http://127.0.0.1:1"}}
  577. b, err := json.Marshal(raftAttr)
  578. if err != nil {
  579. t.Fatal(err)
  580. }
  581. body := bytes.NewReader(b)
  582. req, err := http.NewRequest("POST", u.String(), body)
  583. if err != nil {
  584. t.Fatal(err)
  585. }
  586. req.Header.Set("Content-Type", "application/json")
  587. s := &serverRecorder{}
  588. h := &adminMembersHandler{
  589. server: s,
  590. clock: clockwork.NewFakeClock(),
  591. }
  592. rw := httptest.NewRecorder()
  593. h.ServeHTTP(rw, req)
  594. wcode := http.StatusCreated
  595. if rw.Code != wcode {
  596. t.Errorf("code=%d, want %d", rw.Code, wcode)
  597. }
  598. wm := etcdserver.Member{
  599. ID: 3064321551348478165,
  600. RaftAttributes: raftAttr,
  601. }
  602. wb, err := json.Marshal(wm)
  603. if err != nil {
  604. t.Fatal(err)
  605. }
  606. wct := "application/json"
  607. if gct := rw.Header().Get("Content-Type"); gct != wct {
  608. t.Errorf("content-type = %s, want %s", gct, wct)
  609. }
  610. g := rw.Body.String()
  611. w := string(wb) + "\n"
  612. if g != w {
  613. t.Errorf("got body=%q, want %q", g, w)
  614. }
  615. wactions := []action{{name: "AddMember", params: []interface{}{wm}}}
  616. if !reflect.DeepEqual(s.actions, wactions) {
  617. t.Errorf("actions = %+v, want %+v", s.actions, wactions)
  618. }
  619. }
  620. func TestServeAdminMembersDelete(t *testing.T) {
  621. req := &http.Request{
  622. Method: "DELETE",
  623. URL: mustNewURL(t, path.Join(adminMembersPrefix, "BEEF")),
  624. }
  625. s := &serverRecorder{}
  626. h := &adminMembersHandler{
  627. server: s,
  628. }
  629. rw := httptest.NewRecorder()
  630. h.ServeHTTP(rw, req)
  631. wcode := http.StatusNoContent
  632. if rw.Code != wcode {
  633. t.Errorf("code=%d, want %d", rw.Code, wcode)
  634. }
  635. g := rw.Body.String()
  636. if g != "" {
  637. t.Errorf("got body=%q, want %q", g, "")
  638. }
  639. wactions := []action{{name: "RemoveMember", params: []interface{}{uint64(0xBEEF)}}}
  640. if !reflect.DeepEqual(s.actions, wactions) {
  641. t.Errorf("actions = %+v, want %+v", s.actions, wactions)
  642. }
  643. }
  644. func TestServeAdminMembersFail(t *testing.T) {
  645. tests := []struct {
  646. req *http.Request
  647. server etcdserver.Server
  648. wcode int
  649. }{
  650. {
  651. // bad method
  652. &http.Request{
  653. Method: "CONNECT",
  654. },
  655. &resServer{},
  656. http.StatusMethodNotAllowed,
  657. },
  658. {
  659. // bad method
  660. &http.Request{
  661. Method: "TRACE",
  662. },
  663. &resServer{},
  664. http.StatusMethodNotAllowed,
  665. },
  666. {
  667. // parse body error
  668. &http.Request{
  669. URL: mustNewURL(t, adminMembersPrefix),
  670. Method: "POST",
  671. Body: ioutil.NopCloser(strings.NewReader("bad json")),
  672. },
  673. &resServer{},
  674. http.StatusBadRequest,
  675. },
  676. {
  677. // bad content type
  678. &http.Request{
  679. URL: mustNewURL(t, adminMembersPrefix),
  680. Method: "POST",
  681. Body: ioutil.NopCloser(strings.NewReader(`{"PeerURLs": ["http://127.0.0.1:1"]}`)),
  682. Header: map[string][]string{"Content-Type": []string{"application/bad"}},
  683. },
  684. &errServer{},
  685. http.StatusBadRequest,
  686. },
  687. {
  688. // bad url
  689. &http.Request{
  690. URL: mustNewURL(t, adminMembersPrefix),
  691. Method: "POST",
  692. Body: ioutil.NopCloser(strings.NewReader(`{"PeerURLs": ["http://a"]}`)),
  693. Header: map[string][]string{"Content-Type": []string{"application/json"}},
  694. },
  695. &errServer{},
  696. http.StatusBadRequest,
  697. },
  698. {
  699. // etcdserver.AddMember error
  700. &http.Request{
  701. URL: mustNewURL(t, adminMembersPrefix),
  702. Method: "POST",
  703. Body: ioutil.NopCloser(strings.NewReader(`{"PeerURLs": ["http://127.0.0.1:1"]}`)),
  704. Header: map[string][]string{"Content-Type": []string{"application/json"}},
  705. },
  706. &errServer{
  707. errors.New("blah"),
  708. },
  709. http.StatusInternalServerError,
  710. },
  711. {
  712. // etcdserver.RemoveMember error
  713. &http.Request{
  714. URL: mustNewURL(t, path.Join(adminMembersPrefix, "1")),
  715. Method: "DELETE",
  716. },
  717. &errServer{
  718. errors.New("blah"),
  719. },
  720. http.StatusInternalServerError,
  721. },
  722. }
  723. for i, tt := range tests {
  724. h := &adminMembersHandler{
  725. server: tt.server,
  726. clock: clockwork.NewFakeClock(),
  727. }
  728. rw := httptest.NewRecorder()
  729. h.ServeHTTP(rw, tt.req)
  730. if rw.Code != tt.wcode {
  731. t.Errorf("#%d: code=%d, want %d", i, rw.Code, tt.wcode)
  732. }
  733. }
  734. }
  735. func TestWriteEvent(t *testing.T) {
  736. // nil event should not panic
  737. rw := httptest.NewRecorder()
  738. writeKeyEvent(rw, nil, dummyRaftTimer{})
  739. h := rw.Header()
  740. if len(h) > 0 {
  741. t.Fatalf("unexpected non-empty headers: %#v", h)
  742. }
  743. b := rw.Body.String()
  744. if len(b) > 0 {
  745. t.Fatalf("unexpected non-empty body: %q", b)
  746. }
  747. tests := []struct {
  748. ev *store.Event
  749. idx string
  750. // TODO(jonboulle): check body as well as just status code
  751. code int
  752. err error
  753. }{
  754. // standard case, standard 200 response
  755. {
  756. &store.Event{
  757. Action: store.Get,
  758. Node: &store.NodeExtern{},
  759. PrevNode: &store.NodeExtern{},
  760. },
  761. "0",
  762. http.StatusOK,
  763. nil,
  764. },
  765. // check new nodes return StatusCreated
  766. {
  767. &store.Event{
  768. Action: store.Create,
  769. Node: &store.NodeExtern{},
  770. PrevNode: &store.NodeExtern{},
  771. },
  772. "0",
  773. http.StatusCreated,
  774. nil,
  775. },
  776. }
  777. for i, tt := range tests {
  778. rw := httptest.NewRecorder()
  779. writeKeyEvent(rw, tt.ev, dummyRaftTimer{})
  780. if gct := rw.Header().Get("Content-Type"); gct != "application/json" {
  781. t.Errorf("case %d: bad Content-Type: got %q, want application/json", i, gct)
  782. }
  783. if gri := rw.Header().Get("X-Raft-Index"); gri != "100" {
  784. t.Errorf("case %d: bad X-Raft-Index header: got %s, want %s", i, gri, "100")
  785. }
  786. if grt := rw.Header().Get("X-Raft-Term"); grt != "5" {
  787. t.Errorf("case %d: bad X-Raft-Term header: got %s, want %s", i, grt, "5")
  788. }
  789. if gei := rw.Header().Get("X-Etcd-Index"); gei != tt.idx {
  790. t.Errorf("case %d: bad X-Etcd-Index header: got %s, want %s", i, gei, tt.idx)
  791. }
  792. if rw.Code != tt.code {
  793. t.Errorf("case %d: bad response code: got %d, want %v", i, rw.Code, tt.code)
  794. }
  795. }
  796. }
  797. func TestV2DeprecatedMachinesEndpoint(t *testing.T) {
  798. tests := []struct {
  799. method string
  800. wcode int
  801. }{
  802. {"GET", http.StatusOK},
  803. {"HEAD", http.StatusOK},
  804. {"POST", http.StatusMethodNotAllowed},
  805. }
  806. m := NewClientHandler(&etcdserver.EtcdServer{Cluster: &etcdserver.Cluster{}})
  807. s := httptest.NewServer(m)
  808. defer s.Close()
  809. for _, tt := range tests {
  810. req, err := http.NewRequest(tt.method, s.URL+deprecatedMachinesPrefix, nil)
  811. if err != nil {
  812. t.Fatal(err)
  813. }
  814. resp, err := http.DefaultClient.Do(req)
  815. if err != nil {
  816. t.Fatal(err)
  817. }
  818. if resp.StatusCode != tt.wcode {
  819. t.Errorf("StatusCode = %d, expected %d", resp.StatusCode, tt.wcode)
  820. }
  821. }
  822. }
  823. func TestServeMachines(t *testing.T) {
  824. cluster := &fakeCluster{
  825. clientURLs: []string{"http://localhost:8080", "http://localhost:8081", "http://localhost:8082"},
  826. }
  827. writer := httptest.NewRecorder()
  828. req, err := http.NewRequest("GET", "", nil)
  829. if err != nil {
  830. t.Fatal(err)
  831. }
  832. h := &deprecatedMachinesHandler{clusterInfo: cluster}
  833. h.ServeHTTP(writer, req)
  834. w := "http://localhost:8080, http://localhost:8081, http://localhost:8082"
  835. if g := writer.Body.String(); g != w {
  836. t.Errorf("body = %s, want %s", g, w)
  837. }
  838. if writer.Code != http.StatusOK {
  839. t.Errorf("code = %d, want %d", writer.Code, http.StatusOK)
  840. }
  841. }
  842. type dummyStats struct {
  843. data []byte
  844. }
  845. func (ds *dummyStats) SelfStats() []byte { return ds.data }
  846. func (ds *dummyStats) LeaderStats() []byte { return ds.data }
  847. func (ds *dummyStats) StoreStats() []byte { return ds.data }
  848. func (ds *dummyStats) UpdateRecvApp(_ uint64, _ int64) {}
  849. func TestServeSelfStats(t *testing.T) {
  850. wb := []byte("some statistics")
  851. w := string(wb)
  852. sh := &statsHandler{
  853. stats: &dummyStats{data: wb},
  854. }
  855. rw := httptest.NewRecorder()
  856. sh.serveSelf(rw, &http.Request{Method: "GET"})
  857. if rw.Code != http.StatusOK {
  858. t.Errorf("code = %d, want %d", rw.Code, http.StatusOK)
  859. }
  860. wct := "application/json"
  861. if gct := rw.Header().Get("Content-Type"); gct != wct {
  862. t.Errorf("Content-Type = %q, want %q", gct, wct)
  863. }
  864. if g := rw.Body.String(); g != w {
  865. t.Errorf("body = %s, want %s", g, w)
  866. }
  867. }
  868. func TestSelfServeStatsBad(t *testing.T) {
  869. for _, m := range []string{"PUT", "POST", "DELETE"} {
  870. sh := &statsHandler{}
  871. rw := httptest.NewRecorder()
  872. sh.serveSelf(
  873. rw,
  874. &http.Request{
  875. Method: m,
  876. },
  877. )
  878. if rw.Code != http.StatusMethodNotAllowed {
  879. t.Errorf("method %s: code=%d, want %d", m, rw.Code, http.StatusMethodNotAllowed)
  880. }
  881. }
  882. }
  883. func TestLeaderServeStatsBad(t *testing.T) {
  884. for _, m := range []string{"PUT", "POST", "DELETE"} {
  885. sh := &statsHandler{}
  886. rw := httptest.NewRecorder()
  887. sh.serveLeader(
  888. rw,
  889. &http.Request{
  890. Method: m,
  891. },
  892. )
  893. if rw.Code != http.StatusMethodNotAllowed {
  894. t.Errorf("method %s: code=%d, want %d", m, rw.Code, http.StatusMethodNotAllowed)
  895. }
  896. }
  897. }
  898. func TestServeLeaderStats(t *testing.T) {
  899. wb := []byte("some statistics")
  900. w := string(wb)
  901. sh := &statsHandler{
  902. stats: &dummyStats{data: wb},
  903. }
  904. rw := httptest.NewRecorder()
  905. sh.serveLeader(rw, &http.Request{Method: "GET"})
  906. if rw.Code != http.StatusOK {
  907. t.Errorf("code = %d, want %d", rw.Code, http.StatusOK)
  908. }
  909. wct := "application/json"
  910. if gct := rw.Header().Get("Content-Type"); gct != wct {
  911. t.Errorf("Content-Type = %q, want %q", gct, wct)
  912. }
  913. if g := rw.Body.String(); g != w {
  914. t.Errorf("body = %s, want %s", g, w)
  915. }
  916. }
  917. func TestServeStoreStats(t *testing.T) {
  918. wb := []byte("some statistics")
  919. w := string(wb)
  920. sh := &statsHandler{
  921. stats: &dummyStats{data: wb},
  922. }
  923. rw := httptest.NewRecorder()
  924. sh.serveStore(rw, &http.Request{Method: "GET"})
  925. if rw.Code != http.StatusOK {
  926. t.Errorf("code = %d, want %d", rw.Code, http.StatusOK)
  927. }
  928. wct := "application/json"
  929. if gct := rw.Header().Get("Content-Type"); gct != wct {
  930. t.Errorf("Content-Type = %q, want %q", gct, wct)
  931. }
  932. if g := rw.Body.String(); g != w {
  933. t.Errorf("body = %s, want %s", g, w)
  934. }
  935. }
  936. func TestServeVersion(t *testing.T) {
  937. req, err := http.NewRequest("GET", "", nil)
  938. if err != nil {
  939. t.Fatalf("error creating request: %v", err)
  940. }
  941. rw := httptest.NewRecorder()
  942. serveVersion(rw, req)
  943. if rw.Code != http.StatusOK {
  944. t.Errorf("code=%d, want %d", rw.Code, http.StatusOK)
  945. }
  946. w := fmt.Sprintf("etcd %s", version.Version)
  947. if g := rw.Body.String(); g != w {
  948. t.Fatalf("body = %q, want %q", g, w)
  949. }
  950. }
  951. func TestServeVersionFails(t *testing.T) {
  952. for _, m := range []string{
  953. "CONNECT", "TRACE", "PUT", "POST", "HEAD",
  954. } {
  955. req, err := http.NewRequest(m, "", nil)
  956. if err != nil {
  957. t.Fatalf("error creating request: %v", err)
  958. }
  959. rw := httptest.NewRecorder()
  960. serveVersion(rw, req)
  961. if rw.Code != http.StatusMethodNotAllowed {
  962. t.Errorf("method %s: code=%d, want %d", m, rw.Code, http.StatusMethodNotAllowed)
  963. }
  964. }
  965. }
  966. func TestBadServeKeys(t *testing.T) {
  967. testBadCases := []struct {
  968. req *http.Request
  969. server etcdserver.Server
  970. wcode int
  971. wbody string
  972. }{
  973. {
  974. // bad method
  975. &http.Request{
  976. Method: "CONNECT",
  977. },
  978. &resServer{},
  979. http.StatusMethodNotAllowed,
  980. "Method Not Allowed",
  981. },
  982. {
  983. // bad method
  984. &http.Request{
  985. Method: "TRACE",
  986. },
  987. &resServer{},
  988. http.StatusMethodNotAllowed,
  989. "Method Not Allowed",
  990. },
  991. {
  992. // parseRequest error
  993. &http.Request{
  994. Body: nil,
  995. Method: "PUT",
  996. },
  997. &resServer{},
  998. http.StatusBadRequest,
  999. `{"errorCode":210,"message":"Invalid POST form","cause":"missing form body","index":0}`,
  1000. },
  1001. {
  1002. // etcdserver.Server error
  1003. mustNewRequest(t, "foo"),
  1004. &errServer{
  1005. errors.New("blah"),
  1006. },
  1007. http.StatusInternalServerError,
  1008. "Internal Server Error",
  1009. },
  1010. {
  1011. // etcdserver.Server etcd error
  1012. mustNewRequest(t, "foo"),
  1013. &errServer{
  1014. etcdErr.NewError(etcdErr.EcodeKeyNotFound, "/1/pant", 0),
  1015. },
  1016. http.StatusNotFound,
  1017. `{"errorCode":100,"message":"Key not found","cause":"/pant","index":0}`,
  1018. },
  1019. {
  1020. // non-event/watcher response from etcdserver.Server
  1021. mustNewRequest(t, "foo"),
  1022. &resServer{
  1023. etcdserver.Response{},
  1024. },
  1025. http.StatusInternalServerError,
  1026. "Internal Server Error",
  1027. },
  1028. }
  1029. for i, tt := range testBadCases {
  1030. h := &keysHandler{
  1031. timeout: 0, // context times out immediately
  1032. server: tt.server,
  1033. }
  1034. rw := httptest.NewRecorder()
  1035. h.ServeHTTP(rw, tt.req)
  1036. if rw.Code != tt.wcode {
  1037. t.Errorf("#%d: got code=%d, want %d", i, rw.Code, tt.wcode)
  1038. }
  1039. if g := strings.TrimSuffix(rw.Body.String(), "\n"); g != tt.wbody {
  1040. t.Errorf("#%d: body = %s, want %s", i, g, tt.wbody)
  1041. }
  1042. }
  1043. }
  1044. func TestServeKeysEvent(t *testing.T) {
  1045. req := mustNewRequest(t, "foo")
  1046. server := &resServer{
  1047. etcdserver.Response{
  1048. Event: &store.Event{
  1049. Action: store.Get,
  1050. Node: &store.NodeExtern{},
  1051. },
  1052. },
  1053. }
  1054. h := &keysHandler{
  1055. timeout: time.Hour,
  1056. server: server,
  1057. timer: &dummyRaftTimer{},
  1058. }
  1059. rw := httptest.NewRecorder()
  1060. h.ServeHTTP(rw, req)
  1061. wcode := http.StatusOK
  1062. wbody := mustMarshalEvent(
  1063. t,
  1064. &store.Event{
  1065. Action: store.Get,
  1066. Node: &store.NodeExtern{},
  1067. },
  1068. )
  1069. if rw.Code != wcode {
  1070. t.Errorf("got code=%d, want %d", rw.Code, wcode)
  1071. }
  1072. g := rw.Body.String()
  1073. if g != wbody {
  1074. t.Errorf("got body=%#v, want %#v", g, wbody)
  1075. }
  1076. }
  1077. func TestServeKeysWatch(t *testing.T) {
  1078. req := mustNewRequest(t, "/foo/bar")
  1079. ec := make(chan *store.Event)
  1080. dw := &dummyWatcher{
  1081. echan: ec,
  1082. }
  1083. server := &resServer{
  1084. etcdserver.Response{
  1085. Watcher: dw,
  1086. },
  1087. }
  1088. h := &keysHandler{
  1089. timeout: time.Hour,
  1090. server: server,
  1091. timer: &dummyRaftTimer{},
  1092. }
  1093. go func() {
  1094. ec <- &store.Event{
  1095. Action: store.Get,
  1096. Node: &store.NodeExtern{},
  1097. }
  1098. }()
  1099. rw := httptest.NewRecorder()
  1100. h.ServeHTTP(rw, req)
  1101. wcode := http.StatusOK
  1102. wbody := mustMarshalEvent(
  1103. t,
  1104. &store.Event{
  1105. Action: store.Get,
  1106. Node: &store.NodeExtern{},
  1107. },
  1108. )
  1109. if rw.Code != wcode {
  1110. t.Errorf("got code=%d, want %d", rw.Code, wcode)
  1111. }
  1112. g := rw.Body.String()
  1113. if g != wbody {
  1114. t.Errorf("got body=%#v, want %#v", g, wbody)
  1115. }
  1116. }
  1117. type recordingCloseNotifier struct {
  1118. *httptest.ResponseRecorder
  1119. cn chan bool
  1120. }
  1121. func (rcn *recordingCloseNotifier) CloseNotify() <-chan bool {
  1122. return rcn.cn
  1123. }
  1124. func TestHandleWatch(t *testing.T) {
  1125. defaultRwRr := func() (http.ResponseWriter, *httptest.ResponseRecorder) {
  1126. r := httptest.NewRecorder()
  1127. return r, r
  1128. }
  1129. noopEv := func(chan *store.Event) {}
  1130. tests := []struct {
  1131. getCtx func() context.Context
  1132. getRwRr func() (http.ResponseWriter, *httptest.ResponseRecorder)
  1133. doToChan func(chan *store.Event)
  1134. wbody string
  1135. }{
  1136. {
  1137. // Normal case: one event
  1138. context.Background,
  1139. defaultRwRr,
  1140. func(ch chan *store.Event) {
  1141. ch <- &store.Event{
  1142. Action: store.Get,
  1143. Node: &store.NodeExtern{},
  1144. }
  1145. },
  1146. mustMarshalEvent(
  1147. t,
  1148. &store.Event{
  1149. Action: store.Get,
  1150. Node: &store.NodeExtern{},
  1151. },
  1152. ),
  1153. },
  1154. {
  1155. // Channel is closed, no event
  1156. context.Background,
  1157. defaultRwRr,
  1158. func(ch chan *store.Event) {
  1159. close(ch)
  1160. },
  1161. "",
  1162. },
  1163. {
  1164. // Simulate a timed-out context
  1165. func() context.Context {
  1166. ctx, cancel := context.WithCancel(context.Background())
  1167. cancel()
  1168. return ctx
  1169. },
  1170. defaultRwRr,
  1171. noopEv,
  1172. "",
  1173. },
  1174. {
  1175. // Close-notifying request
  1176. context.Background,
  1177. func() (http.ResponseWriter, *httptest.ResponseRecorder) {
  1178. rw := &recordingCloseNotifier{
  1179. ResponseRecorder: httptest.NewRecorder(),
  1180. cn: make(chan bool, 1),
  1181. }
  1182. rw.cn <- true
  1183. return rw, rw.ResponseRecorder
  1184. },
  1185. noopEv,
  1186. "",
  1187. },
  1188. }
  1189. for i, tt := range tests {
  1190. rw, rr := tt.getRwRr()
  1191. wa := &dummyWatcher{
  1192. echan: make(chan *store.Event, 1),
  1193. sidx: 10,
  1194. }
  1195. tt.doToChan(wa.echan)
  1196. handleKeyWatch(tt.getCtx(), rw, wa, false, dummyRaftTimer{})
  1197. wcode := http.StatusOK
  1198. wct := "application/json"
  1199. wei := "10"
  1200. wri := "100"
  1201. wrt := "5"
  1202. if rr.Code != wcode {
  1203. t.Errorf("#%d: got code=%d, want %d", i, rr.Code, wcode)
  1204. }
  1205. h := rr.Header()
  1206. if ct := h.Get("Content-Type"); ct != wct {
  1207. t.Errorf("#%d: Content-Type=%q, want %q", i, ct, wct)
  1208. }
  1209. if ei := h.Get("X-Etcd-Index"); ei != wei {
  1210. t.Errorf("#%d: X-Etcd-Index=%q, want %q", i, ei, wei)
  1211. }
  1212. if ri := h.Get("X-Raft-Index"); ri != wri {
  1213. t.Errorf("#%d: X-Raft-Index=%q, want %q", i, ri, wri)
  1214. }
  1215. if rt := h.Get("X-Raft-Term"); rt != wrt {
  1216. t.Errorf("#%d: X-Raft-Term=%q, want %q", i, rt, wrt)
  1217. }
  1218. g := rr.Body.String()
  1219. if g != tt.wbody {
  1220. t.Errorf("#%d: got body=%#v, want %#v", i, g, tt.wbody)
  1221. }
  1222. }
  1223. }
  1224. func TestHandleWatchStreaming(t *testing.T) {
  1225. rw := &flushingRecorder{
  1226. httptest.NewRecorder(),
  1227. make(chan struct{}, 1),
  1228. }
  1229. wa := &dummyWatcher{
  1230. echan: make(chan *store.Event),
  1231. }
  1232. // Launch the streaming handler in the background with a cancellable context
  1233. ctx, cancel := context.WithCancel(context.Background())
  1234. done := make(chan struct{})
  1235. go func() {
  1236. handleKeyWatch(ctx, rw, wa, true, dummyRaftTimer{})
  1237. close(done)
  1238. }()
  1239. // Expect one Flush for the headers etc.
  1240. select {
  1241. case <-rw.ch:
  1242. case <-time.After(time.Second):
  1243. t.Fatalf("timed out waiting for flush")
  1244. }
  1245. // Expect headers but no body
  1246. wcode := http.StatusOK
  1247. wct := "application/json"
  1248. wbody := ""
  1249. if rw.Code != wcode {
  1250. t.Errorf("got code=%d, want %d", rw.Code, wcode)
  1251. }
  1252. h := rw.Header()
  1253. if ct := h.Get("Content-Type"); ct != wct {
  1254. t.Errorf("Content-Type=%q, want %q", ct, wct)
  1255. }
  1256. g := rw.Body.String()
  1257. if g != wbody {
  1258. t.Errorf("got body=%#v, want %#v", g, wbody)
  1259. }
  1260. // Now send the first event
  1261. select {
  1262. case wa.echan <- &store.Event{
  1263. Action: store.Get,
  1264. Node: &store.NodeExtern{},
  1265. }:
  1266. case <-time.After(time.Second):
  1267. t.Fatal("timed out waiting for send")
  1268. }
  1269. // Wait for it to be flushed...
  1270. select {
  1271. case <-rw.ch:
  1272. case <-time.After(time.Second):
  1273. t.Fatalf("timed out waiting for flush")
  1274. }
  1275. // And check the body is as expected
  1276. wbody = mustMarshalEvent(
  1277. t,
  1278. &store.Event{
  1279. Action: store.Get,
  1280. Node: &store.NodeExtern{},
  1281. },
  1282. )
  1283. g = rw.Body.String()
  1284. if g != wbody {
  1285. t.Errorf("got body=%#v, want %#v", g, wbody)
  1286. }
  1287. // Rinse and repeat
  1288. select {
  1289. case wa.echan <- &store.Event{
  1290. Action: store.Get,
  1291. Node: &store.NodeExtern{},
  1292. }:
  1293. case <-time.After(time.Second):
  1294. t.Fatal("timed out waiting for send")
  1295. }
  1296. select {
  1297. case <-rw.ch:
  1298. case <-time.After(time.Second):
  1299. t.Fatalf("timed out waiting for flush")
  1300. }
  1301. // This time, we expect to see both events
  1302. wbody = wbody + wbody
  1303. g = rw.Body.String()
  1304. if g != wbody {
  1305. t.Errorf("got body=%#v, want %#v", g, wbody)
  1306. }
  1307. // Finally, time out the connection and ensure the serving goroutine returns
  1308. cancel()
  1309. select {
  1310. case <-done:
  1311. case <-time.After(time.Second):
  1312. t.Fatalf("timed out waiting for done")
  1313. }
  1314. }
  1315. func TestTrimEventPrefix(t *testing.T) {
  1316. pre := "/abc"
  1317. tests := []struct {
  1318. ev *store.Event
  1319. wev *store.Event
  1320. }{
  1321. {
  1322. nil,
  1323. nil,
  1324. },
  1325. {
  1326. &store.Event{},
  1327. &store.Event{},
  1328. },
  1329. {
  1330. &store.Event{Node: &store.NodeExtern{Key: "/abc/def"}},
  1331. &store.Event{Node: &store.NodeExtern{Key: "/def"}},
  1332. },
  1333. {
  1334. &store.Event{PrevNode: &store.NodeExtern{Key: "/abc/ghi"}},
  1335. &store.Event{PrevNode: &store.NodeExtern{Key: "/ghi"}},
  1336. },
  1337. {
  1338. &store.Event{
  1339. Node: &store.NodeExtern{Key: "/abc/def"},
  1340. PrevNode: &store.NodeExtern{Key: "/abc/ghi"},
  1341. },
  1342. &store.Event{
  1343. Node: &store.NodeExtern{Key: "/def"},
  1344. PrevNode: &store.NodeExtern{Key: "/ghi"},
  1345. },
  1346. },
  1347. }
  1348. for i, tt := range tests {
  1349. ev := trimEventPrefix(tt.ev, pre)
  1350. if !reflect.DeepEqual(ev, tt.wev) {
  1351. t.Errorf("#%d: event = %+v, want %+v", i, ev, tt.wev)
  1352. }
  1353. }
  1354. }
  1355. func TestTrimNodeExternPrefix(t *testing.T) {
  1356. pre := "/abc"
  1357. tests := []struct {
  1358. n *store.NodeExtern
  1359. wn *store.NodeExtern
  1360. }{
  1361. {
  1362. nil,
  1363. nil,
  1364. },
  1365. {
  1366. &store.NodeExtern{Key: "/abc/def"},
  1367. &store.NodeExtern{Key: "/def"},
  1368. },
  1369. {
  1370. &store.NodeExtern{
  1371. Key: "/abc/def",
  1372. Nodes: []*store.NodeExtern{
  1373. {Key: "/abc/def/1"},
  1374. {Key: "/abc/def/2"},
  1375. },
  1376. },
  1377. &store.NodeExtern{
  1378. Key: "/def",
  1379. Nodes: []*store.NodeExtern{
  1380. {Key: "/def/1"},
  1381. {Key: "/def/2"},
  1382. },
  1383. },
  1384. },
  1385. }
  1386. for i, tt := range tests {
  1387. n := trimNodeExternPrefix(tt.n, pre)
  1388. if !reflect.DeepEqual(n, tt.wn) {
  1389. t.Errorf("#%d: node = %+v, want %+v", i, n, tt.wn)
  1390. }
  1391. }
  1392. }