client_test.go 41 KB


  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package etcdhttp
  14. import (
  15. "bytes"
  16. "encoding/json"
  17. "errors"
  18. "fmt"
  19. "io/ioutil"
  20. "net/http"
  21. "net/http/httptest"
  22. "net/url"
  23. "path"
  24. "reflect"
  25. "strings"
  26. "testing"
  27. "time"
  28. "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
  29. "github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork"
  30. etcdErr "github.com/coreos/etcd/error"
  31. "github.com/coreos/etcd/etcdserver"
  32. "github.com/coreos/etcd/etcdserver/etcdhttp/httptypes"
  33. "github.com/coreos/etcd/etcdserver/etcdserverpb"
  34. "github.com/coreos/etcd/pkg/types"
  35. "github.com/coreos/etcd/raft/raftpb"
  36. "github.com/coreos/etcd/store"
  37. "github.com/coreos/etcd/version"
  38. )
  39. func mustMarshalEvent(t *testing.T, ev *store.Event) string {
  40. b := new(bytes.Buffer)
  41. if err := json.NewEncoder(b).Encode(ev); err != nil {
  42. t.Fatalf("error marshalling event %#v: %v", ev, err)
  43. }
  44. return b.String()
  45. }
  46. // mustNewForm takes a set of Values and constructs a PUT *http.Request,
  47. // with a URL constructed from appending the given path to the standard keysPrefix
  48. func mustNewForm(t *testing.T, p string, vals url.Values) *http.Request {
  49. u := mustNewURL(t, path.Join(keysPrefix, p))
  50. req, err := http.NewRequest("PUT", u.String(), strings.NewReader(vals.Encode()))
  51. req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
  52. if err != nil {
  53. t.Fatalf("error creating new request: %v", err)
  54. }
  55. return req
  56. }
  57. // mustNewPostForm takes a set of Values and constructs a POST *http.Request,
  58. // with a URL constructed from appending the given path to the standard keysPrefix
  59. func mustNewPostForm(t *testing.T, p string, vals url.Values) *http.Request {
  60. u := mustNewURL(t, path.Join(keysPrefix, p))
  61. req, err := http.NewRequest("POST", u.String(), strings.NewReader(vals.Encode()))
  62. req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
  63. if err != nil {
  64. t.Fatalf("error creating new request: %v", err)
  65. }
  66. return req
  67. }
  68. // mustNewRequest takes a path, appends it to the standard keysPrefix, and constructs
  69. // a GET *http.Request referencing the resulting URL
  70. func mustNewRequest(t *testing.T, p string) *http.Request {
  71. return mustNewMethodRequest(t, "GET", p)
  72. }
  73. func mustNewMethodRequest(t *testing.T, m, p string) *http.Request {
  74. return &http.Request{
  75. Method: m,
  76. URL: mustNewURL(t, path.Join(keysPrefix, p)),
  77. }
  78. }
  79. type serverRecorder struct {
  80. actions []action
  81. }
  82. func (s *serverRecorder) Do(_ context.Context, r etcdserverpb.Request) (etcdserver.Response, error) {
  83. s.actions = append(s.actions, action{name: "Do", params: []interface{}{r}})
  84. return etcdserver.Response{}, nil
  85. }
  86. func (s *serverRecorder) Process(_ context.Context, m raftpb.Message) error {
  87. s.actions = append(s.actions, action{name: "Process", params: []interface{}{m}})
  88. return nil
  89. }
  90. func (s *serverRecorder) Start() {}
  91. func (s *serverRecorder) Stop() {}
  92. func (s *serverRecorder) AddMember(_ context.Context, m etcdserver.Member) error {
  93. s.actions = append(s.actions, action{name: "AddMember", params: []interface{}{m}})
  94. return nil
  95. }
  96. func (s *serverRecorder) RemoveMember(_ context.Context, id uint64) error {
  97. s.actions = append(s.actions, action{name: "RemoveMember", params: []interface{}{id}})
  98. return nil
  99. }
  100. type action struct {
  101. name string
  102. params []interface{}
  103. }
  104. // flushingRecorder provides a channel to allow users to block until the Recorder is Flushed()
  105. type flushingRecorder struct {
  106. *httptest.ResponseRecorder
  107. ch chan struct{}
  108. }
  109. func (fr *flushingRecorder) Flush() {
  110. fr.ResponseRecorder.Flush()
  111. fr.ch <- struct{}{}
  112. }
  113. // resServer implements the etcd.Server interface for testing.
  114. // It returns the given responsefrom any Do calls, and nil error
  115. type resServer struct {
  116. res etcdserver.Response
  117. }
  118. func (rs *resServer) Do(_ context.Context, _ etcdserverpb.Request) (etcdserver.Response, error) {
  119. return rs.res, nil
  120. }
  121. func (rs *resServer) Process(_ context.Context, _ raftpb.Message) error { return nil }
  122. func (rs *resServer) Start() {}
  123. func (rs *resServer) Stop() {}
  124. func (rs *resServer) AddMember(_ context.Context, _ etcdserver.Member) error { return nil }
  125. func (rs *resServer) RemoveMember(_ context.Context, _ uint64) error { return nil }
  126. func boolp(b bool) *bool { return &b }
  127. type dummyRaftTimer struct{}
  128. func (drt dummyRaftTimer) Index() uint64 { return uint64(100) }
  129. func (drt dummyRaftTimer) Term() uint64 { return uint64(5) }
  130. type dummyWatcher struct {
  131. echan chan *store.Event
  132. sidx uint64
  133. }
  134. func (w *dummyWatcher) EventChan() chan *store.Event {
  135. return w.echan
  136. }
  137. func (w *dummyWatcher) StartIndex() uint64 { return w.sidx }
  138. func (w *dummyWatcher) Remove() {}
  139. func TestBadParseRequest(t *testing.T) {
  140. tests := []struct {
  141. in *http.Request
  142. wcode int
  143. }{
  144. {
  145. // parseForm failure
  146. &http.Request{
  147. Body: nil,
  148. Method: "PUT",
  149. },
  150. etcdErr.EcodeInvalidForm,
  151. },
  152. {
  153. // bad key prefix
  154. &http.Request{
  155. URL: mustNewURL(t, "/badprefix/"),
  156. },
  157. etcdErr.EcodeInvalidForm,
  158. },
  159. // bad values for prevIndex, waitIndex, ttl
  160. {
  161. mustNewForm(t, "foo", url.Values{"prevIndex": []string{"garbage"}}),
  162. etcdErr.EcodeIndexNaN,
  163. },
  164. {
  165. mustNewForm(t, "foo", url.Values{"prevIndex": []string{"1.5"}}),
  166. etcdErr.EcodeIndexNaN,
  167. },
  168. {
  169. mustNewForm(t, "foo", url.Values{"prevIndex": []string{"-1"}}),
  170. etcdErr.EcodeIndexNaN,
  171. },
  172. {
  173. mustNewForm(t, "foo", url.Values{"waitIndex": []string{"garbage"}}),
  174. etcdErr.EcodeIndexNaN,
  175. },
  176. {
  177. mustNewForm(t, "foo", url.Values{"waitIndex": []string{"??"}}),
  178. etcdErr.EcodeIndexNaN,
  179. },
  180. {
  181. mustNewForm(t, "foo", url.Values{"ttl": []string{"-1"}}),
  182. etcdErr.EcodeTTLNaN,
  183. },
  184. // bad values for recursive, sorted, wait, prevExist, dir, stream
  185. {
  186. mustNewForm(t, "foo", url.Values{"recursive": []string{"hahaha"}}),
  187. etcdErr.EcodeInvalidField,
  188. },
  189. {
  190. mustNewForm(t, "foo", url.Values{"recursive": []string{"1234"}}),
  191. etcdErr.EcodeInvalidField,
  192. },
  193. {
  194. mustNewForm(t, "foo", url.Values{"recursive": []string{"?"}}),
  195. etcdErr.EcodeInvalidField,
  196. },
  197. {
  198. mustNewForm(t, "foo", url.Values{"sorted": []string{"?"}}),
  199. etcdErr.EcodeInvalidField,
  200. },
  201. {
  202. mustNewForm(t, "foo", url.Values{"sorted": []string{"x"}}),
  203. etcdErr.EcodeInvalidField,
  204. },
  205. {
  206. mustNewForm(t, "foo", url.Values{"wait": []string{"?!"}}),
  207. etcdErr.EcodeInvalidField,
  208. },
  209. {
  210. mustNewForm(t, "foo", url.Values{"wait": []string{"yes"}}),
  211. etcdErr.EcodeInvalidField,
  212. },
  213. {
  214. mustNewForm(t, "foo", url.Values{"prevExist": []string{"yes"}}),
  215. etcdErr.EcodeInvalidField,
  216. },
  217. {
  218. mustNewForm(t, "foo", url.Values{"prevExist": []string{"#2"}}),
  219. etcdErr.EcodeInvalidField,
  220. },
  221. {
  222. mustNewForm(t, "foo", url.Values{"dir": []string{"no"}}),
  223. etcdErr.EcodeInvalidField,
  224. },
  225. {
  226. mustNewForm(t, "foo", url.Values{"dir": []string{"file"}}),
  227. etcdErr.EcodeInvalidField,
  228. },
  229. {
  230. mustNewForm(t, "foo", url.Values{"quorum": []string{"no"}}),
  231. etcdErr.EcodeInvalidField,
  232. },
  233. {
  234. mustNewForm(t, "foo", url.Values{"quorum": []string{"file"}}),
  235. etcdErr.EcodeInvalidField,
  236. },
  237. {
  238. mustNewForm(t, "foo", url.Values{"stream": []string{"zzz"}}),
  239. etcdErr.EcodeInvalidField,
  240. },
  241. {
  242. mustNewForm(t, "foo", url.Values{"stream": []string{"something"}}),
  243. etcdErr.EcodeInvalidField,
  244. },
  245. // prevValue cannot be empty
  246. {
  247. mustNewForm(t, "foo", url.Values{"prevValue": []string{""}}),
  248. etcdErr.EcodePrevValueRequired,
  249. },
  250. // wait is only valid with GET requests
  251. {
  252. mustNewMethodRequest(t, "HEAD", "foo?wait=true"),
  253. etcdErr.EcodeInvalidField,
  254. },
  255. // query values are considered
  256. {
  257. mustNewRequest(t, "foo?prevExist=wrong"),
  258. etcdErr.EcodeInvalidField,
  259. },
  260. {
  261. mustNewRequest(t, "foo?ttl=wrong"),
  262. etcdErr.EcodeTTLNaN,
  263. },
  264. // but body takes precedence if both are specified
  265. {
  266. mustNewForm(
  267. t,
  268. "foo?ttl=12",
  269. url.Values{"ttl": []string{"garbage"}},
  270. ),
  271. etcdErr.EcodeTTLNaN,
  272. },
  273. {
  274. mustNewForm(
  275. t,
  276. "foo?prevExist=false",
  277. url.Values{"prevExist": []string{"yes"}},
  278. ),
  279. etcdErr.EcodeInvalidField,
  280. },
  281. }
  282. for i, tt := range tests {
  283. got, err := parseKeyRequest(tt.in, 1234, clockwork.NewFakeClock())
  284. if err == nil {
  285. t.Errorf("#%d: unexpected nil error!", i)
  286. continue
  287. }
  288. ee, ok := err.(*etcdErr.Error)
  289. if !ok {
  290. t.Errorf("#%d: err is not etcd.Error!", i)
  291. continue
  292. }
  293. if ee.ErrorCode != tt.wcode {
  294. t.Errorf("#%d: code=%d, want %v", i, ee.ErrorCode, tt.wcode)
  295. t.Logf("cause: %#v", ee.Cause)
  296. }
  297. if !reflect.DeepEqual(got, etcdserverpb.Request{}) {
  298. t.Errorf("#%d: unexpected non-empty Request: %#v", i, got)
  299. }
  300. }
  301. }
  302. func TestGoodParseRequest(t *testing.T) {
  303. fc := clockwork.NewFakeClock()
  304. fc.Advance(1111)
  305. tests := []struct {
  306. in *http.Request
  307. w etcdserverpb.Request
  308. }{
  309. {
  310. // good prefix, all other values default
  311. mustNewRequest(t, "foo"),
  312. etcdserverpb.Request{
  313. ID: 1234,
  314. Method: "GET",
  315. Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
  316. },
  317. },
  318. {
  319. // value specified
  320. mustNewForm(
  321. t,
  322. "foo",
  323. url.Values{"value": []string{"some_value"}},
  324. ),
  325. etcdserverpb.Request{
  326. ID: 1234,
  327. Method: "PUT",
  328. Val: "some_value",
  329. Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
  330. },
  331. },
  332. {
  333. // prevIndex specified
  334. mustNewForm(
  335. t,
  336. "foo",
  337. url.Values{"prevIndex": []string{"98765"}},
  338. ),
  339. etcdserverpb.Request{
  340. ID: 1234,
  341. Method: "PUT",
  342. PrevIndex: 98765,
  343. Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
  344. },
  345. },
  346. {
  347. // recursive specified
  348. mustNewForm(
  349. t,
  350. "foo",
  351. url.Values{"recursive": []string{"true"}},
  352. ),
  353. etcdserverpb.Request{
  354. ID: 1234,
  355. Method: "PUT",
  356. Recursive: true,
  357. Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
  358. },
  359. },
  360. {
  361. // sorted specified
  362. mustNewForm(
  363. t,
  364. "foo",
  365. url.Values{"sorted": []string{"true"}},
  366. ),
  367. etcdserverpb.Request{
  368. ID: 1234,
  369. Method: "PUT",
  370. Sorted: true,
  371. Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
  372. },
  373. },
  374. {
  375. // quorum specified
  376. mustNewForm(
  377. t,
  378. "foo",
  379. url.Values{"quorum": []string{"true"}},
  380. ),
  381. etcdserverpb.Request{
  382. ID: 1234,
  383. Method: "PUT",
  384. Quorum: true,
  385. Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
  386. },
  387. },
  388. {
  389. // wait specified
  390. mustNewRequest(t, "foo?wait=true"),
  391. etcdserverpb.Request{
  392. ID: 1234,
  393. Method: "GET",
  394. Wait: true,
  395. Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
  396. },
  397. },
  398. {
  399. // empty TTL specified
  400. mustNewRequest(t, "foo?ttl="),
  401. etcdserverpb.Request{
  402. ID: 1234,
  403. Method: "GET",
  404. Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
  405. Expiration: 0,
  406. },
  407. },
  408. {
  409. // non-empty TTL specified
  410. mustNewRequest(t, "foo?ttl=5678"),
  411. etcdserverpb.Request{
  412. ID: 1234,
  413. Method: "GET",
  414. Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
  415. Expiration: fc.Now().Add(5678 * time.Second).UnixNano(),
  416. },
  417. },
  418. {
  419. // zero TTL specified
  420. mustNewRequest(t, "foo?ttl=0"),
  421. etcdserverpb.Request{
  422. ID: 1234,
  423. Method: "GET",
  424. Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
  425. Expiration: fc.Now().UnixNano(),
  426. },
  427. },
  428. {
  429. // dir specified
  430. mustNewRequest(t, "foo?dir=true"),
  431. etcdserverpb.Request{
  432. ID: 1234,
  433. Method: "GET",
  434. Dir: true,
  435. Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
  436. },
  437. },
  438. {
  439. // dir specified negatively
  440. mustNewRequest(t, "foo?dir=false"),
  441. etcdserverpb.Request{
  442. ID: 1234,
  443. Method: "GET",
  444. Dir: false,
  445. Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
  446. },
  447. },
  448. {
  449. // prevExist should be non-null if specified
  450. mustNewForm(
  451. t,
  452. "foo",
  453. url.Values{"prevExist": []string{"true"}},
  454. ),
  455. etcdserverpb.Request{
  456. ID: 1234,
  457. Method: "PUT",
  458. PrevExist: boolp(true),
  459. Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
  460. },
  461. },
  462. {
  463. // prevExist should be non-null if specified
  464. mustNewForm(
  465. t,
  466. "foo",
  467. url.Values{"prevExist": []string{"false"}},
  468. ),
  469. etcdserverpb.Request{
  470. ID: 1234,
  471. Method: "PUT",
  472. PrevExist: boolp(false),
  473. Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
  474. },
  475. },
  476. // mix various fields
  477. {
  478. mustNewForm(
  479. t,
  480. "foo",
  481. url.Values{
  482. "value": []string{"some value"},
  483. "prevExist": []string{"true"},
  484. "prevValue": []string{"previous value"},
  485. },
  486. ),
  487. etcdserverpb.Request{
  488. ID: 1234,
  489. Method: "PUT",
  490. PrevExist: boolp(true),
  491. PrevValue: "previous value",
  492. Val: "some value",
  493. Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
  494. },
  495. },
  496. // query parameters should be used if given
  497. {
  498. mustNewForm(
  499. t,
  500. "foo?prevValue=woof",
  501. url.Values{},
  502. ),
  503. etcdserverpb.Request{
  504. ID: 1234,
  505. Method: "PUT",
  506. PrevValue: "woof",
  507. Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
  508. },
  509. },
  510. // but form values should take precedence over query parameters
  511. {
  512. mustNewForm(
  513. t,
  514. "foo?prevValue=woof",
  515. url.Values{
  516. "prevValue": []string{"miaow"},
  517. },
  518. ),
  519. etcdserverpb.Request{
  520. ID: 1234,
  521. Method: "PUT",
  522. PrevValue: "miaow",
  523. Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
  524. },
  525. },
  526. }
  527. for i, tt := range tests {
  528. got, err := parseKeyRequest(tt.in, 1234, fc)
  529. if err != nil {
  530. t.Errorf("#%d: err = %v, want %v", i, err, nil)
  531. }
  532. if !reflect.DeepEqual(got, tt.w) {
  533. t.Errorf("#%d: request=%#v, want %#v", i, got, tt.w)
  534. }
  535. }
  536. }
  537. func TestServeMembers(t *testing.T) {
  538. memb1 := etcdserver.Member{ID: 12, Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8080"}}}
  539. memb2 := etcdserver.Member{ID: 13, Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8081"}}}
  540. cluster := &fakeCluster{
  541. id: 1,
  542. members: map[uint64]*etcdserver.Member{1: &memb1, 2: &memb2},
  543. }
  544. h := &membersHandler{
  545. server: &serverRecorder{},
  546. clock: clockwork.NewFakeClock(),
  547. clusterInfo: cluster,
  548. }
  549. wmc := string(`{"members":[{"id":"c","name":"","peerURLs":[],"clientURLs":["http://localhost:8080"]},{"id":"d","name":"","peerURLs":[],"clientURLs":["http://localhost:8081"]}]}`)
  550. tests := []struct {
  551. path string
  552. wcode int
  553. wct string
  554. wbody string
  555. }{
  556. {membersPrefix, http.StatusOK, "application/json", wmc + "\n"},
  557. {membersPrefix + "/", http.StatusOK, "application/json", wmc + "\n"},
  558. {path.Join(membersPrefix, "100"), http.StatusNotFound, "application/json", `{"message":"Not found"}`},
  559. {path.Join(membersPrefix, "foobar"), http.StatusNotFound, "application/json", `{"message":"Not found"}`},
  560. }
  561. for i, tt := range tests {
  562. req, err := http.NewRequest("GET", mustNewURL(t, tt.path).String(), nil)
  563. if err != nil {
  564. t.Fatal(err)
  565. }
  566. rw := httptest.NewRecorder()
  567. h.ServeHTTP(rw, req)
  568. if rw.Code != tt.wcode {
  569. t.Errorf("#%d: code=%d, want %d", i, rw.Code, tt.wcode)
  570. }
  571. if gct := rw.Header().Get("Content-Type"); gct != tt.wct {
  572. t.Errorf("#%d: content-type = %s, want %s", i, gct, tt.wct)
  573. }
  574. gcid := rw.Header().Get("X-Etcd-Cluster-ID")
  575. wcid := cluster.ID().String()
  576. if gcid != wcid {
  577. t.Errorf("#%d: cid = %s, want %s", i, gcid, wcid)
  578. }
  579. if rw.Body.String() != tt.wbody {
  580. t.Errorf("#%d: body = %q, want %q", i, rw.Body.String(), tt.wbody)
  581. }
  582. }
  583. }
  584. func TestServeMembersCreate(t *testing.T) {
  585. u := mustNewURL(t, membersPrefix)
  586. b := []byte(`{"peerURLs":["http://127.0.0.1:1"]}`)
  587. req, err := http.NewRequest("POST", u.String(), bytes.NewReader(b))
  588. if err != nil {
  589. t.Fatal(err)
  590. }
  591. req.Header.Set("Content-Type", "application/json")
  592. s := &serverRecorder{}
  593. h := &membersHandler{
  594. server: s,
  595. clock: clockwork.NewFakeClock(),
  596. clusterInfo: &fakeCluster{id: 1},
  597. }
  598. rw := httptest.NewRecorder()
  599. h.ServeHTTP(rw, req)
  600. wcode := http.StatusCreated
  601. if rw.Code != wcode {
  602. t.Errorf("code=%d, want %d", rw.Code, wcode)
  603. }
  604. wct := "application/json"
  605. if gct := rw.Header().Get("Content-Type"); gct != wct {
  606. t.Errorf("content-type = %s, want %s", gct, wct)
  607. }
  608. gcid := rw.Header().Get("X-Etcd-Cluster-ID")
  609. wcid := h.clusterInfo.ID().String()
  610. if gcid != wcid {
  611. t.Errorf("cid = %s, want %s", gcid, wcid)
  612. }
  613. wb := `{"id":"2a86a83729b330d5","name":"","peerURLs":["http://127.0.0.1:1"],"clientURLs":[]}` + "\n"
  614. g := rw.Body.String()
  615. if g != wb {
  616. t.Errorf("got body=%q, want %q", g, wb)
  617. }
  618. wm := etcdserver.Member{
  619. ID: 3064321551348478165,
  620. RaftAttributes: etcdserver.RaftAttributes{
  621. PeerURLs: []string{"http://127.0.0.1:1"},
  622. },
  623. }
  624. wactions := []action{{name: "AddMember", params: []interface{}{wm}}}
  625. if !reflect.DeepEqual(s.actions, wactions) {
  626. t.Errorf("actions = %+v, want %+v", s.actions, wactions)
  627. }
  628. }
  629. func TestServeMembersDelete(t *testing.T) {
  630. req := &http.Request{
  631. Method: "DELETE",
  632. URL: mustNewURL(t, path.Join(membersPrefix, "BEEF")),
  633. }
  634. s := &serverRecorder{}
  635. h := &membersHandler{
  636. server: s,
  637. clusterInfo: &fakeCluster{id: 1},
  638. }
  639. rw := httptest.NewRecorder()
  640. h.ServeHTTP(rw, req)
  641. wcode := http.StatusNoContent
  642. if rw.Code != wcode {
  643. t.Errorf("code=%d, want %d", rw.Code, wcode)
  644. }
  645. gcid := rw.Header().Get("X-Etcd-Cluster-ID")
  646. wcid := h.clusterInfo.ID().String()
  647. if gcid != wcid {
  648. t.Errorf("cid = %s, want %s", gcid, wcid)
  649. }
  650. g := rw.Body.String()
  651. if g != "" {
  652. t.Errorf("got body=%q, want %q", g, "")
  653. }
  654. wactions := []action{{name: "RemoveMember", params: []interface{}{uint64(0xBEEF)}}}
  655. if !reflect.DeepEqual(s.actions, wactions) {
  656. t.Errorf("actions = %+v, want %+v", s.actions, wactions)
  657. }
  658. }
  659. func TestServeMembersFail(t *testing.T) {
  660. tests := []struct {
  661. req *http.Request
  662. server etcdserver.Server
  663. wcode int
  664. }{
  665. {
  666. // bad method
  667. &http.Request{
  668. Method: "CONNECT",
  669. },
  670. &resServer{},
  671. http.StatusMethodNotAllowed,
  672. },
  673. {
  674. // bad method
  675. &http.Request{
  676. Method: "TRACE",
  677. },
  678. &resServer{},
  679. http.StatusMethodNotAllowed,
  680. },
  681. {
  682. // parse body error
  683. &http.Request{
  684. URL: mustNewURL(t, membersPrefix),
  685. Method: "POST",
  686. Body: ioutil.NopCloser(strings.NewReader("bad json")),
  687. Header: map[string][]string{"Content-Type": []string{"application/json"}},
  688. },
  689. &resServer{},
  690. http.StatusBadRequest,
  691. },
  692. {
  693. // bad content type
  694. &http.Request{
  695. URL: mustNewURL(t, membersPrefix),
  696. Method: "POST",
  697. Body: ioutil.NopCloser(strings.NewReader(`{"PeerURLs": ["http://127.0.0.1:1"]}`)),
  698. Header: map[string][]string{"Content-Type": []string{"application/bad"}},
  699. },
  700. &errServer{},
  701. http.StatusUnsupportedMediaType,
  702. },
  703. {
  704. // bad url
  705. &http.Request{
  706. URL: mustNewURL(t, membersPrefix),
  707. Method: "POST",
  708. Body: ioutil.NopCloser(strings.NewReader(`{"PeerURLs": ["http://a"]}`)),
  709. Header: map[string][]string{"Content-Type": []string{"application/json"}},
  710. },
  711. &errServer{},
  712. http.StatusBadRequest,
  713. },
  714. {
  715. // etcdserver.AddMember error
  716. &http.Request{
  717. URL: mustNewURL(t, membersPrefix),
  718. Method: "POST",
  719. Body: ioutil.NopCloser(strings.NewReader(`{"PeerURLs": ["http://127.0.0.1:1"]}`)),
  720. Header: map[string][]string{"Content-Type": []string{"application/json"}},
  721. },
  722. &errServer{
  723. errors.New("blah"),
  724. },
  725. http.StatusInternalServerError,
  726. },
  727. {
  728. // etcdserver.AddMember error
  729. &http.Request{
  730. URL: mustNewURL(t, membersPrefix),
  731. Method: "POST",
  732. Body: ioutil.NopCloser(strings.NewReader(`{"PeerURLs": ["http://127.0.0.1:1"]}`)),
  733. Header: map[string][]string{"Content-Type": []string{"application/json"}},
  734. },
  735. &errServer{
  736. etcdserver.ErrIDExists,
  737. },
  738. http.StatusConflict,
  739. },
  740. {
  741. // etcdserver.AddMember error
  742. &http.Request{
  743. URL: mustNewURL(t, membersPrefix),
  744. Method: "POST",
  745. Body: ioutil.NopCloser(strings.NewReader(`{"PeerURLs": ["http://127.0.0.1:1"]}`)),
  746. Header: map[string][]string{"Content-Type": []string{"application/json"}},
  747. },
  748. &errServer{
  749. etcdserver.ErrPeerURLexists,
  750. },
  751. http.StatusConflict,
  752. },
  753. {
  754. // etcdserver.RemoveMember error with arbitrary server error
  755. &http.Request{
  756. URL: mustNewURL(t, path.Join(membersPrefix, "1")),
  757. Method: "DELETE",
  758. },
  759. &errServer{
  760. errors.New("blah"),
  761. },
  762. http.StatusInternalServerError,
  763. },
  764. {
  765. // etcdserver.RemoveMember error with previously removed ID
  766. &http.Request{
  767. URL: mustNewURL(t, path.Join(membersPrefix, "0")),
  768. Method: "DELETE",
  769. },
  770. &errServer{
  771. etcdserver.ErrIDRemoved,
  772. },
  773. http.StatusGone,
  774. },
  775. {
  776. // etcdserver.RemoveMember error with nonexistent ID
  777. &http.Request{
  778. URL: mustNewURL(t, path.Join(membersPrefix, "0")),
  779. Method: "DELETE",
  780. },
  781. &errServer{
  782. etcdserver.ErrIDNotFound,
  783. },
  784. http.StatusNotFound,
  785. },
  786. {
  787. // etcdserver.RemoveMember error with badly formed ID
  788. &http.Request{
  789. URL: mustNewURL(t, path.Join(membersPrefix, "bad_id")),
  790. Method: "DELETE",
  791. },
  792. nil,
  793. http.StatusNotFound,
  794. },
  795. {
  796. // etcdserver.RemoveMember with no ID
  797. &http.Request{
  798. URL: mustNewURL(t, membersPrefix),
  799. Method: "DELETE",
  800. },
  801. nil,
  802. http.StatusMethodNotAllowed,
  803. },
  804. }
  805. for i, tt := range tests {
  806. h := &membersHandler{
  807. server: tt.server,
  808. clusterInfo: &fakeCluster{id: 1},
  809. clock: clockwork.NewFakeClock(),
  810. }
  811. rw := httptest.NewRecorder()
  812. h.ServeHTTP(rw, tt.req)
  813. if rw.Code != tt.wcode {
  814. t.Errorf("#%d: code=%d, want %d", i, rw.Code, tt.wcode)
  815. }
  816. if rw.Code != http.StatusMethodNotAllowed {
  817. gcid := rw.Header().Get("X-Etcd-Cluster-ID")
  818. wcid := h.clusterInfo.ID().String()
  819. if gcid != wcid {
  820. t.Errorf("#%d: cid = %s, want %s", i, gcid, wcid)
  821. }
  822. }
  823. }
  824. }
  825. func TestWriteEvent(t *testing.T) {
  826. // nil event should not panic
  827. rw := httptest.NewRecorder()
  828. writeKeyEvent(rw, nil, dummyRaftTimer{})
  829. h := rw.Header()
  830. if len(h) > 0 {
  831. t.Fatalf("unexpected non-empty headers: %#v", h)
  832. }
  833. b := rw.Body.String()
  834. if len(b) > 0 {
  835. t.Fatalf("unexpected non-empty body: %q", b)
  836. }
  837. tests := []struct {
  838. ev *store.Event
  839. idx string
  840. // TODO(jonboulle): check body as well as just status code
  841. code int
  842. err error
  843. }{
  844. // standard case, standard 200 response
  845. {
  846. &store.Event{
  847. Action: store.Get,
  848. Node: &store.NodeExtern{},
  849. PrevNode: &store.NodeExtern{},
  850. },
  851. "0",
  852. http.StatusOK,
  853. nil,
  854. },
  855. // check new nodes return StatusCreated
  856. {
  857. &store.Event{
  858. Action: store.Create,
  859. Node: &store.NodeExtern{},
  860. PrevNode: &store.NodeExtern{},
  861. },
  862. "0",
  863. http.StatusCreated,
  864. nil,
  865. },
  866. }
  867. for i, tt := range tests {
  868. rw := httptest.NewRecorder()
  869. writeKeyEvent(rw, tt.ev, dummyRaftTimer{})
  870. if gct := rw.Header().Get("Content-Type"); gct != "application/json" {
  871. t.Errorf("case %d: bad Content-Type: got %q, want application/json", i, gct)
  872. }
  873. if gri := rw.Header().Get("X-Raft-Index"); gri != "100" {
  874. t.Errorf("case %d: bad X-Raft-Index header: got %s, want %s", i, gri, "100")
  875. }
  876. if grt := rw.Header().Get("X-Raft-Term"); grt != "5" {
  877. t.Errorf("case %d: bad X-Raft-Term header: got %s, want %s", i, grt, "5")
  878. }
  879. if gei := rw.Header().Get("X-Etcd-Index"); gei != tt.idx {
  880. t.Errorf("case %d: bad X-Etcd-Index header: got %s, want %s", i, gei, tt.idx)
  881. }
  882. if rw.Code != tt.code {
  883. t.Errorf("case %d: bad response code: got %d, want %v", i, rw.Code, tt.code)
  884. }
  885. }
  886. }
  887. func TestV2DeprecatedMachinesEndpoint(t *testing.T) {
  888. tests := []struct {
  889. method string
  890. wcode int
  891. }{
  892. {"GET", http.StatusOK},
  893. {"HEAD", http.StatusOK},
  894. {"POST", http.StatusMethodNotAllowed},
  895. }
  896. m := NewClientHandler(&etcdserver.EtcdServer{Cluster: &etcdserver.Cluster{}})
  897. s := httptest.NewServer(m)
  898. defer s.Close()
  899. for _, tt := range tests {
  900. req, err := http.NewRequest(tt.method, s.URL+deprecatedMachinesPrefix, nil)
  901. if err != nil {
  902. t.Fatal(err)
  903. }
  904. resp, err := http.DefaultClient.Do(req)
  905. if err != nil {
  906. t.Fatal(err)
  907. }
  908. if resp.StatusCode != tt.wcode {
  909. t.Errorf("StatusCode = %d, expected %d", resp.StatusCode, tt.wcode)
  910. }
  911. }
  912. }
  913. func TestServeMachines(t *testing.T) {
  914. cluster := &fakeCluster{
  915. clientURLs: []string{"http://localhost:8080", "http://localhost:8081", "http://localhost:8082"},
  916. }
  917. writer := httptest.NewRecorder()
  918. req, err := http.NewRequest("GET", "", nil)
  919. if err != nil {
  920. t.Fatal(err)
  921. }
  922. h := &deprecatedMachinesHandler{clusterInfo: cluster}
  923. h.ServeHTTP(writer, req)
  924. w := "http://localhost:8080, http://localhost:8081, http://localhost:8082"
  925. if g := writer.Body.String(); g != w {
  926. t.Errorf("body = %s, want %s", g, w)
  927. }
  928. if writer.Code != http.StatusOK {
  929. t.Errorf("code = %d, want %d", writer.Code, http.StatusOK)
  930. }
  931. }
  932. type dummyStats struct {
  933. data []byte
  934. }
  935. func (ds *dummyStats) SelfStats() []byte { return ds.data }
  936. func (ds *dummyStats) LeaderStats() []byte { return ds.data }
  937. func (ds *dummyStats) StoreStats() []byte { return ds.data }
  938. func (ds *dummyStats) UpdateRecvApp(_ types.ID, _ int64) {}
  939. func TestServeSelfStats(t *testing.T) {
  940. wb := []byte("some statistics")
  941. w := string(wb)
  942. sh := &statsHandler{
  943. stats: &dummyStats{data: wb},
  944. }
  945. rw := httptest.NewRecorder()
  946. sh.serveSelf(rw, &http.Request{Method: "GET"})
  947. if rw.Code != http.StatusOK {
  948. t.Errorf("code = %d, want %d", rw.Code, http.StatusOK)
  949. }
  950. wct := "application/json"
  951. if gct := rw.Header().Get("Content-Type"); gct != wct {
  952. t.Errorf("Content-Type = %q, want %q", gct, wct)
  953. }
  954. if g := rw.Body.String(); g != w {
  955. t.Errorf("body = %s, want %s", g, w)
  956. }
  957. }
  958. func TestSelfServeStatsBad(t *testing.T) {
  959. for _, m := range []string{"PUT", "POST", "DELETE"} {
  960. sh := &statsHandler{}
  961. rw := httptest.NewRecorder()
  962. sh.serveSelf(
  963. rw,
  964. &http.Request{
  965. Method: m,
  966. },
  967. )
  968. if rw.Code != http.StatusMethodNotAllowed {
  969. t.Errorf("method %s: code=%d, want %d", m, rw.Code, http.StatusMethodNotAllowed)
  970. }
  971. }
  972. }
  973. func TestLeaderServeStatsBad(t *testing.T) {
  974. for _, m := range []string{"PUT", "POST", "DELETE"} {
  975. sh := &statsHandler{}
  976. rw := httptest.NewRecorder()
  977. sh.serveLeader(
  978. rw,
  979. &http.Request{
  980. Method: m,
  981. },
  982. )
  983. if rw.Code != http.StatusMethodNotAllowed {
  984. t.Errorf("method %s: code=%d, want %d", m, rw.Code, http.StatusMethodNotAllowed)
  985. }
  986. }
  987. }
  988. func TestServeLeaderStats(t *testing.T) {
  989. wb := []byte("some statistics")
  990. w := string(wb)
  991. sh := &statsHandler{
  992. stats: &dummyStats{data: wb},
  993. }
  994. rw := httptest.NewRecorder()
  995. sh.serveLeader(rw, &http.Request{Method: "GET"})
  996. if rw.Code != http.StatusOK {
  997. t.Errorf("code = %d, want %d", rw.Code, http.StatusOK)
  998. }
  999. wct := "application/json"
  1000. if gct := rw.Header().Get("Content-Type"); gct != wct {
  1001. t.Errorf("Content-Type = %q, want %q", gct, wct)
  1002. }
  1003. if g := rw.Body.String(); g != w {
  1004. t.Errorf("body = %s, want %s", g, w)
  1005. }
  1006. }
  1007. func TestServeStoreStats(t *testing.T) {
  1008. wb := []byte("some statistics")
  1009. w := string(wb)
  1010. sh := &statsHandler{
  1011. stats: &dummyStats{data: wb},
  1012. }
  1013. rw := httptest.NewRecorder()
  1014. sh.serveStore(rw, &http.Request{Method: "GET"})
  1015. if rw.Code != http.StatusOK {
  1016. t.Errorf("code = %d, want %d", rw.Code, http.StatusOK)
  1017. }
  1018. wct := "application/json"
  1019. if gct := rw.Header().Get("Content-Type"); gct != wct {
  1020. t.Errorf("Content-Type = %q, want %q", gct, wct)
  1021. }
  1022. if g := rw.Body.String(); g != w {
  1023. t.Errorf("body = %s, want %s", g, w)
  1024. }
  1025. }
  1026. func TestServeVersion(t *testing.T) {
  1027. req, err := http.NewRequest("GET", "", nil)
  1028. if err != nil {
  1029. t.Fatalf("error creating request: %v", err)
  1030. }
  1031. rw := httptest.NewRecorder()
  1032. serveVersion(rw, req)
  1033. if rw.Code != http.StatusOK {
  1034. t.Errorf("code=%d, want %d", rw.Code, http.StatusOK)
  1035. }
  1036. w := fmt.Sprintf("etcd %s", version.Version)
  1037. if g := rw.Body.String(); g != w {
  1038. t.Fatalf("body = %q, want %q", g, w)
  1039. }
  1040. }
  1041. func TestServeVersionFails(t *testing.T) {
  1042. for _, m := range []string{
  1043. "CONNECT", "TRACE", "PUT", "POST", "HEAD",
  1044. } {
  1045. req, err := http.NewRequest(m, "", nil)
  1046. if err != nil {
  1047. t.Fatalf("error creating request: %v", err)
  1048. }
  1049. rw := httptest.NewRecorder()
  1050. serveVersion(rw, req)
  1051. if rw.Code != http.StatusMethodNotAllowed {
  1052. t.Errorf("method %s: code=%d, want %d", m, rw.Code, http.StatusMethodNotAllowed)
  1053. }
  1054. }
  1055. }
  1056. func TestBadServeKeys(t *testing.T) {
  1057. testBadCases := []struct {
  1058. req *http.Request
  1059. server etcdserver.Server
  1060. wcode int
  1061. wbody string
  1062. }{
  1063. {
  1064. // bad method
  1065. &http.Request{
  1066. Method: "CONNECT",
  1067. },
  1068. &resServer{},
  1069. http.StatusMethodNotAllowed,
  1070. "Method Not Allowed",
  1071. },
  1072. {
  1073. // bad method
  1074. &http.Request{
  1075. Method: "TRACE",
  1076. },
  1077. &resServer{},
  1078. http.StatusMethodNotAllowed,
  1079. "Method Not Allowed",
  1080. },
  1081. {
  1082. // parseRequest error
  1083. &http.Request{
  1084. Body: nil,
  1085. Method: "PUT",
  1086. },
  1087. &resServer{},
  1088. http.StatusBadRequest,
  1089. `{"errorCode":210,"message":"Invalid POST form","cause":"missing form body","index":0}`,
  1090. },
  1091. {
  1092. // etcdserver.Server error
  1093. mustNewRequest(t, "foo"),
  1094. &errServer{
  1095. errors.New("blah"),
  1096. },
  1097. http.StatusInternalServerError,
  1098. `{"message":"Internal Server Error"}`,
  1099. },
  1100. {
  1101. // etcdserver.Server etcd error
  1102. mustNewRequest(t, "foo"),
  1103. &errServer{
  1104. etcdErr.NewError(etcdErr.EcodeKeyNotFound, "/1/pant", 0),
  1105. },
  1106. http.StatusNotFound,
  1107. `{"errorCode":100,"message":"Key not found","cause":"/pant","index":0}`,
  1108. },
  1109. {
  1110. // non-event/watcher response from etcdserver.Server
  1111. mustNewRequest(t, "foo"),
  1112. &resServer{
  1113. etcdserver.Response{},
  1114. },
  1115. http.StatusInternalServerError,
  1116. `{"message":"Internal Server Error"}`,
  1117. },
  1118. }
  1119. for i, tt := range testBadCases {
  1120. h := &keysHandler{
  1121. timeout: 0, // context times out immediately
  1122. server: tt.server,
  1123. clusterInfo: &fakeCluster{id: 1},
  1124. }
  1125. rw := httptest.NewRecorder()
  1126. h.ServeHTTP(rw, tt.req)
  1127. if rw.Code != tt.wcode {
  1128. t.Errorf("#%d: got code=%d, want %d", i, rw.Code, tt.wcode)
  1129. }
  1130. if rw.Code != http.StatusMethodNotAllowed {
  1131. gcid := rw.Header().Get("X-Etcd-Cluster-ID")
  1132. wcid := h.clusterInfo.ID().String()
  1133. if gcid != wcid {
  1134. t.Errorf("#%d: cid = %s, want %s", i, gcid, wcid)
  1135. }
  1136. }
  1137. if g := strings.TrimSuffix(rw.Body.String(), "\n"); g != tt.wbody {
  1138. t.Errorf("#%d: body = %s, want %s", i, g, tt.wbody)
  1139. }
  1140. }
  1141. }
  1142. func TestServeKeysGood(t *testing.T) {
  1143. tests := []struct {
  1144. req *http.Request
  1145. wcode int
  1146. }{
  1147. {
  1148. mustNewMethodRequest(t, "HEAD", "foo"),
  1149. http.StatusOK,
  1150. },
  1151. {
  1152. mustNewMethodRequest(t, "GET", "foo"),
  1153. http.StatusOK,
  1154. },
  1155. {
  1156. mustNewForm(t, "foo", url.Values{"value": []string{"bar"}}),
  1157. http.StatusOK,
  1158. },
  1159. {
  1160. mustNewMethodRequest(t, "DELETE", "foo"),
  1161. http.StatusOK,
  1162. },
  1163. {
  1164. mustNewPostForm(t, "foo", url.Values{"value": []string{"bar"}}),
  1165. http.StatusOK,
  1166. },
  1167. }
  1168. server := &resServer{
  1169. etcdserver.Response{
  1170. Event: &store.Event{
  1171. Action: store.Get,
  1172. Node: &store.NodeExtern{},
  1173. },
  1174. },
  1175. }
  1176. for i, tt := range tests {
  1177. h := &keysHandler{
  1178. timeout: time.Hour,
  1179. server: server,
  1180. timer: &dummyRaftTimer{},
  1181. clusterInfo: &fakeCluster{id: 1},
  1182. }
  1183. rw := httptest.NewRecorder()
  1184. h.ServeHTTP(rw, tt.req)
  1185. if rw.Code != tt.wcode {
  1186. t.Errorf("#%d: got code=%d, want %d", i, rw.Code, tt.wcode)
  1187. }
  1188. }
  1189. }
  1190. func TestServeKeysEvent(t *testing.T) {
  1191. req := mustNewRequest(t, "foo")
  1192. server := &resServer{
  1193. etcdserver.Response{
  1194. Event: &store.Event{
  1195. Action: store.Get,
  1196. Node: &store.NodeExtern{},
  1197. },
  1198. },
  1199. }
  1200. h := &keysHandler{
  1201. timeout: time.Hour,
  1202. server: server,
  1203. clusterInfo: &fakeCluster{id: 1},
  1204. timer: &dummyRaftTimer{},
  1205. }
  1206. rw := httptest.NewRecorder()
  1207. h.ServeHTTP(rw, req)
  1208. wcode := http.StatusOK
  1209. wbody := mustMarshalEvent(
  1210. t,
  1211. &store.Event{
  1212. Action: store.Get,
  1213. Node: &store.NodeExtern{},
  1214. },
  1215. )
  1216. if rw.Code != wcode {
  1217. t.Errorf("got code=%d, want %d", rw.Code, wcode)
  1218. }
  1219. gcid := rw.Header().Get("X-Etcd-Cluster-ID")
  1220. wcid := h.clusterInfo.ID().String()
  1221. if gcid != wcid {
  1222. t.Errorf("cid = %s, want %s", gcid, wcid)
  1223. }
  1224. g := rw.Body.String()
  1225. if g != wbody {
  1226. t.Errorf("got body=%#v, want %#v", g, wbody)
  1227. }
  1228. }
  1229. func TestServeKeysWatch(t *testing.T) {
  1230. req := mustNewRequest(t, "/foo/bar")
  1231. ec := make(chan *store.Event)
  1232. dw := &dummyWatcher{
  1233. echan: ec,
  1234. }
  1235. server := &resServer{
  1236. etcdserver.Response{
  1237. Watcher: dw,
  1238. },
  1239. }
  1240. h := &keysHandler{
  1241. timeout: time.Hour,
  1242. server: server,
  1243. clusterInfo: &fakeCluster{id: 1},
  1244. timer: &dummyRaftTimer{},
  1245. }
  1246. go func() {
  1247. ec <- &store.Event{
  1248. Action: store.Get,
  1249. Node: &store.NodeExtern{},
  1250. }
  1251. }()
  1252. rw := httptest.NewRecorder()
  1253. h.ServeHTTP(rw, req)
  1254. wcode := http.StatusOK
  1255. wbody := mustMarshalEvent(
  1256. t,
  1257. &store.Event{
  1258. Action: store.Get,
  1259. Node: &store.NodeExtern{},
  1260. },
  1261. )
  1262. if rw.Code != wcode {
  1263. t.Errorf("got code=%d, want %d", rw.Code, wcode)
  1264. }
  1265. gcid := rw.Header().Get("X-Etcd-Cluster-ID")
  1266. wcid := h.clusterInfo.ID().String()
  1267. if gcid != wcid {
  1268. t.Errorf("cid = %s, want %s", gcid, wcid)
  1269. }
  1270. g := rw.Body.String()
  1271. if g != wbody {
  1272. t.Errorf("got body=%#v, want %#v", g, wbody)
  1273. }
  1274. }
  1275. type recordingCloseNotifier struct {
  1276. *httptest.ResponseRecorder
  1277. cn chan bool
  1278. }
  1279. func (rcn *recordingCloseNotifier) CloseNotify() <-chan bool {
  1280. return rcn.cn
  1281. }
  1282. func TestHandleWatch(t *testing.T) {
  1283. defaultRwRr := func() (http.ResponseWriter, *httptest.ResponseRecorder) {
  1284. r := httptest.NewRecorder()
  1285. return r, r
  1286. }
  1287. noopEv := func(chan *store.Event) {}
  1288. tests := []struct {
  1289. getCtx func() context.Context
  1290. getRwRr func() (http.ResponseWriter, *httptest.ResponseRecorder)
  1291. doToChan func(chan *store.Event)
  1292. wbody string
  1293. }{
  1294. {
  1295. // Normal case: one event
  1296. context.Background,
  1297. defaultRwRr,
  1298. func(ch chan *store.Event) {
  1299. ch <- &store.Event{
  1300. Action: store.Get,
  1301. Node: &store.NodeExtern{},
  1302. }
  1303. },
  1304. mustMarshalEvent(
  1305. t,
  1306. &store.Event{
  1307. Action: store.Get,
  1308. Node: &store.NodeExtern{},
  1309. },
  1310. ),
  1311. },
  1312. {
  1313. // Channel is closed, no event
  1314. context.Background,
  1315. defaultRwRr,
  1316. func(ch chan *store.Event) {
  1317. close(ch)
  1318. },
  1319. "",
  1320. },
  1321. {
  1322. // Simulate a timed-out context
  1323. func() context.Context {
  1324. ctx, cancel := context.WithCancel(context.Background())
  1325. cancel()
  1326. return ctx
  1327. },
  1328. defaultRwRr,
  1329. noopEv,
  1330. "",
  1331. },
  1332. {
  1333. // Close-notifying request
  1334. context.Background,
  1335. func() (http.ResponseWriter, *httptest.ResponseRecorder) {
  1336. rw := &recordingCloseNotifier{
  1337. ResponseRecorder: httptest.NewRecorder(),
  1338. cn: make(chan bool, 1),
  1339. }
  1340. rw.cn <- true
  1341. return rw, rw.ResponseRecorder
  1342. },
  1343. noopEv,
  1344. "",
  1345. },
  1346. }
  1347. for i, tt := range tests {
  1348. rw, rr := tt.getRwRr()
  1349. wa := &dummyWatcher{
  1350. echan: make(chan *store.Event, 1),
  1351. sidx: 10,
  1352. }
  1353. tt.doToChan(wa.echan)
  1354. handleKeyWatch(tt.getCtx(), rw, wa, false, dummyRaftTimer{})
  1355. wcode := http.StatusOK
  1356. wct := "application/json"
  1357. wei := "10"
  1358. wri := "100"
  1359. wrt := "5"
  1360. if rr.Code != wcode {
  1361. t.Errorf("#%d: got code=%d, want %d", i, rr.Code, wcode)
  1362. }
  1363. h := rr.Header()
  1364. if ct := h.Get("Content-Type"); ct != wct {
  1365. t.Errorf("#%d: Content-Type=%q, want %q", i, ct, wct)
  1366. }
  1367. if ei := h.Get("X-Etcd-Index"); ei != wei {
  1368. t.Errorf("#%d: X-Etcd-Index=%q, want %q", i, ei, wei)
  1369. }
  1370. if ri := h.Get("X-Raft-Index"); ri != wri {
  1371. t.Errorf("#%d: X-Raft-Index=%q, want %q", i, ri, wri)
  1372. }
  1373. if rt := h.Get("X-Raft-Term"); rt != wrt {
  1374. t.Errorf("#%d: X-Raft-Term=%q, want %q", i, rt, wrt)
  1375. }
  1376. g := rr.Body.String()
  1377. if g != tt.wbody {
  1378. t.Errorf("#%d: got body=%#v, want %#v", i, g, tt.wbody)
  1379. }
  1380. }
  1381. }
  1382. func TestHandleWatchStreaming(t *testing.T) {
  1383. rw := &flushingRecorder{
  1384. httptest.NewRecorder(),
  1385. make(chan struct{}, 1),
  1386. }
  1387. wa := &dummyWatcher{
  1388. echan: make(chan *store.Event),
  1389. }
  1390. // Launch the streaming handler in the background with a cancellable context
  1391. ctx, cancel := context.WithCancel(context.Background())
  1392. done := make(chan struct{})
  1393. go func() {
  1394. handleKeyWatch(ctx, rw, wa, true, dummyRaftTimer{})
  1395. close(done)
  1396. }()
  1397. // Expect one Flush for the headers etc.
  1398. select {
  1399. case <-rw.ch:
  1400. case <-time.After(time.Second):
  1401. t.Fatalf("timed out waiting for flush")
  1402. }
  1403. // Expect headers but no body
  1404. wcode := http.StatusOK
  1405. wct := "application/json"
  1406. wbody := ""
  1407. if rw.Code != wcode {
  1408. t.Errorf("got code=%d, want %d", rw.Code, wcode)
  1409. }
  1410. h := rw.Header()
  1411. if ct := h.Get("Content-Type"); ct != wct {
  1412. t.Errorf("Content-Type=%q, want %q", ct, wct)
  1413. }
  1414. g := rw.Body.String()
  1415. if g != wbody {
  1416. t.Errorf("got body=%#v, want %#v", g, wbody)
  1417. }
  1418. // Now send the first event
  1419. select {
  1420. case wa.echan <- &store.Event{
  1421. Action: store.Get,
  1422. Node: &store.NodeExtern{},
  1423. }:
  1424. case <-time.After(time.Second):
  1425. t.Fatal("timed out waiting for send")
  1426. }
  1427. // Wait for it to be flushed...
  1428. select {
  1429. case <-rw.ch:
  1430. case <-time.After(time.Second):
  1431. t.Fatalf("timed out waiting for flush")
  1432. }
  1433. // And check the body is as expected
  1434. wbody = mustMarshalEvent(
  1435. t,
  1436. &store.Event{
  1437. Action: store.Get,
  1438. Node: &store.NodeExtern{},
  1439. },
  1440. )
  1441. g = rw.Body.String()
  1442. if g != wbody {
  1443. t.Errorf("got body=%#v, want %#v", g, wbody)
  1444. }
  1445. // Rinse and repeat
  1446. select {
  1447. case wa.echan <- &store.Event{
  1448. Action: store.Get,
  1449. Node: &store.NodeExtern{},
  1450. }:
  1451. case <-time.After(time.Second):
  1452. t.Fatal("timed out waiting for send")
  1453. }
  1454. select {
  1455. case <-rw.ch:
  1456. case <-time.After(time.Second):
  1457. t.Fatalf("timed out waiting for flush")
  1458. }
  1459. // This time, we expect to see both events
  1460. wbody = wbody + wbody
  1461. g = rw.Body.String()
  1462. if g != wbody {
  1463. t.Errorf("got body=%#v, want %#v", g, wbody)
  1464. }
  1465. // Finally, time out the connection and ensure the serving goroutine returns
  1466. cancel()
  1467. select {
  1468. case <-done:
  1469. case <-time.After(time.Second):
  1470. t.Fatalf("timed out waiting for done")
  1471. }
  1472. }
  1473. func TestTrimEventPrefix(t *testing.T) {
  1474. pre := "/abc"
  1475. tests := []struct {
  1476. ev *store.Event
  1477. wev *store.Event
  1478. }{
  1479. {
  1480. nil,
  1481. nil,
  1482. },
  1483. {
  1484. &store.Event{},
  1485. &store.Event{},
  1486. },
  1487. {
  1488. &store.Event{Node: &store.NodeExtern{Key: "/abc/def"}},
  1489. &store.Event{Node: &store.NodeExtern{Key: "/def"}},
  1490. },
  1491. {
  1492. &store.Event{PrevNode: &store.NodeExtern{Key: "/abc/ghi"}},
  1493. &store.Event{PrevNode: &store.NodeExtern{Key: "/ghi"}},
  1494. },
  1495. {
  1496. &store.Event{
  1497. Node: &store.NodeExtern{Key: "/abc/def"},
  1498. PrevNode: &store.NodeExtern{Key: "/abc/ghi"},
  1499. },
  1500. &store.Event{
  1501. Node: &store.NodeExtern{Key: "/def"},
  1502. PrevNode: &store.NodeExtern{Key: "/ghi"},
  1503. },
  1504. },
  1505. }
  1506. for i, tt := range tests {
  1507. ev := trimEventPrefix(tt.ev, pre)
  1508. if !reflect.DeepEqual(ev, tt.wev) {
  1509. t.Errorf("#%d: event = %+v, want %+v", i, ev, tt.wev)
  1510. }
  1511. }
  1512. }
  1513. func TestTrimNodeExternPrefix(t *testing.T) {
  1514. pre := "/abc"
  1515. tests := []struct {
  1516. n *store.NodeExtern
  1517. wn *store.NodeExtern
  1518. }{
  1519. {
  1520. nil,
  1521. nil,
  1522. },
  1523. {
  1524. &store.NodeExtern{Key: "/abc/def"},
  1525. &store.NodeExtern{Key: "/def"},
  1526. },
  1527. {
  1528. &store.NodeExtern{
  1529. Key: "/abc/def",
  1530. Nodes: []*store.NodeExtern{
  1531. {Key: "/abc/def/1"},
  1532. {Key: "/abc/def/2"},
  1533. },
  1534. },
  1535. &store.NodeExtern{
  1536. Key: "/def",
  1537. Nodes: []*store.NodeExtern{
  1538. {Key: "/def/1"},
  1539. {Key: "/def/2"},
  1540. },
  1541. },
  1542. },
  1543. }
  1544. for i, tt := range tests {
  1545. n := trimNodeExternPrefix(tt.n, pre)
  1546. if !reflect.DeepEqual(n, tt.wn) {
  1547. t.Errorf("#%d: node = %+v, want %+v", i, n, tt.wn)
  1548. }
  1549. }
  1550. }
  1551. func TestTrimPrefix(t *testing.T) {
  1552. tests := []struct {
  1553. in string
  1554. prefix string
  1555. w string
  1556. }{
  1557. {"/v2/members", "/v2/members", ""},
  1558. {"/v2/members/", "/v2/members", ""},
  1559. {"/v2/members/foo", "/v2/members", "foo"},
  1560. }
  1561. for i, tt := range tests {
  1562. if g := trimPrefix(tt.in, tt.prefix); g != tt.w {
  1563. t.Errorf("#%d: trimPrefix = %q, want %q", i, g, tt.w)
  1564. }
  1565. }
  1566. }
  1567. func TestNewMemberCollection(t *testing.T) {
  1568. fixture := []*etcdserver.Member{
  1569. &etcdserver.Member{
  1570. ID: 12,
  1571. Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8080", "http://localhost:8081"}},
  1572. RaftAttributes: etcdserver.RaftAttributes{PeerURLs: []string{"http://localhost:8082", "http://localhost:8083"}},
  1573. },
  1574. &etcdserver.Member{
  1575. ID: 13,
  1576. Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:9090", "http://localhost:9091"}},
  1577. RaftAttributes: etcdserver.RaftAttributes{PeerURLs: []string{"http://localhost:9092", "http://localhost:9093"}},
  1578. },
  1579. }
  1580. got := newMemberCollection(fixture)
  1581. want := httptypes.MemberCollection([]httptypes.Member{
  1582. httptypes.Member{
  1583. ID: "c",
  1584. ClientURLs: []string{"http://localhost:8080", "http://localhost:8081"},
  1585. PeerURLs: []string{"http://localhost:8082", "http://localhost:8083"},
  1586. },
  1587. httptypes.Member{
  1588. ID: "d",
  1589. ClientURLs: []string{"http://localhost:9090", "http://localhost:9091"},
  1590. PeerURLs: []string{"http://localhost:9092", "http://localhost:9093"},
  1591. },
  1592. })
  1593. if !reflect.DeepEqual(&want, got) {
  1594. t.Fatalf("newMemberCollection failure: want=%#v, got=%#v", &want, got)
  1595. }
  1596. }
  1597. func TestNewMember(t *testing.T) {
  1598. fixture := &etcdserver.Member{
  1599. ID: 12,
  1600. Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8080", "http://localhost:8081"}},
  1601. RaftAttributes: etcdserver.RaftAttributes{PeerURLs: []string{"http://localhost:8082", "http://localhost:8083"}},
  1602. }
  1603. got := newMember(fixture)
  1604. want := httptypes.Member{
  1605. ID: "c",
  1606. ClientURLs: []string{"http://localhost:8080", "http://localhost:8081"},
  1607. PeerURLs: []string{"http://localhost:8082", "http://localhost:8083"},
  1608. }
  1609. if !reflect.DeepEqual(want, got) {
  1610. t.Fatalf("newMember failure: want=%#v, got=%#v", want, got)
  1611. }
  1612. }