client_test.go 24 KB


  1. // Copyright 2015 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package client
  15. import (
  16. "errors"
  17. "io"
  18. "io/ioutil"
  19. "math/rand"
  20. "net/http"
  21. "net/url"
  22. "reflect"
  23. "sort"
  24. "strings"
  25. "testing"
  26. "time"
  27. "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
  28. "github.com/coreos/etcd/pkg/testutil"
  29. )
  30. type actionAssertingHTTPClient struct {
  31. t *testing.T
  32. num int
  33. act httpAction
  34. resp http.Response
  35. body []byte
  36. err error
  37. }
  38. func (a *actionAssertingHTTPClient) Do(_ context.Context, act httpAction) (*http.Response, []byte, error) {
  39. if !reflect.DeepEqual(a.act, act) {
  40. a.t.Errorf("#%d: unexpected httpAction: want=%#v got=%#v", a.num, a.act, act)
  41. }
  42. return &a.resp, a.body, a.err
  43. }
  44. type staticHTTPClient struct {
  45. resp http.Response
  46. body []byte
  47. err error
  48. }
  49. func (s *staticHTTPClient) Do(context.Context, httpAction) (*http.Response, []byte, error) {
  50. return &s.resp, s.body, s.err
  51. }
  52. type staticHTTPAction struct {
  53. request http.Request
  54. }
  55. func (s *staticHTTPAction) HTTPRequest(url.URL) *http.Request {
  56. return &s.request
  57. }
  58. type staticHTTPResponse struct {
  59. resp http.Response
  60. body []byte
  61. err error
  62. }
  63. type multiStaticHTTPClient struct {
  64. responses []staticHTTPResponse
  65. cur int
  66. }
  67. func (s *multiStaticHTTPClient) Do(context.Context, httpAction) (*http.Response, []byte, error) {
  68. r := s.responses[s.cur]
  69. s.cur++
  70. return &r.resp, r.body, r.err
  71. }
  72. func newStaticHTTPClientFactory(responses []staticHTTPResponse) httpClientFactory {
  73. var cur int
  74. return func(url.URL) httpClient {
  75. r := responses[cur]
  76. cur++
  77. return &staticHTTPClient{resp: r.resp, body: r.body, err: r.err}
  78. }
  79. }
  80. type fakeTransport struct {
  81. respchan chan *http.Response
  82. errchan chan error
  83. startCancel chan struct{}
  84. finishCancel chan struct{}
  85. }
  86. func newFakeTransport() *fakeTransport {
  87. return &fakeTransport{
  88. respchan: make(chan *http.Response, 1),
  89. errchan: make(chan error, 1),
  90. startCancel: make(chan struct{}, 1),
  91. finishCancel: make(chan struct{}, 1),
  92. }
  93. }
  94. func (t *fakeTransport) RoundTrip(*http.Request) (*http.Response, error) {
  95. select {
  96. case resp := <-t.respchan:
  97. return resp, nil
  98. case err := <-t.errchan:
  99. return nil, err
  100. case <-t.startCancel:
  101. select {
  102. // this simulates that the request is finished before cancel effects
  103. case resp := <-t.respchan:
  104. return resp, nil
  105. // wait on finishCancel to simulate taking some amount of
  106. // time while calling CancelRequest
  107. case <-t.finishCancel:
  108. return nil, errors.New("cancelled")
  109. }
  110. }
  111. }
  112. func (t *fakeTransport) CancelRequest(*http.Request) {
  113. t.startCancel <- struct{}{}
  114. }
  115. type fakeAction struct{}
  116. func (a *fakeAction) HTTPRequest(url.URL) *http.Request {
  117. return &http.Request{}
  118. }
  119. func TestSimpleHTTPClientDoSuccess(t *testing.T) {
  120. tr := newFakeTransport()
  121. c := &simpleHTTPClient{transport: tr}
  122. tr.respchan <- &http.Response{
  123. StatusCode: http.StatusTeapot,
  124. Body: ioutil.NopCloser(strings.NewReader("foo")),
  125. }
  126. resp, body, err := c.Do(context.Background(), &fakeAction{})
  127. if err != nil {
  128. t.Fatalf("incorrect error value: want=nil got=%v", err)
  129. }
  130. wantCode := http.StatusTeapot
  131. if wantCode != resp.StatusCode {
  132. t.Fatalf("invalid response code: want=%d got=%d", wantCode, resp.StatusCode)
  133. }
  134. wantBody := []byte("foo")
  135. if !reflect.DeepEqual(wantBody, body) {
  136. t.Fatalf("invalid response body: want=%q got=%q", wantBody, body)
  137. }
  138. }
  139. func TestSimpleHTTPClientDoError(t *testing.T) {
  140. tr := newFakeTransport()
  141. c := &simpleHTTPClient{transport: tr}
  142. tr.errchan <- errors.New("fixture")
  143. _, _, err := c.Do(context.Background(), &fakeAction{})
  144. if err == nil {
  145. t.Fatalf("expected non-nil error, got nil")
  146. }
  147. }
  148. func TestSimpleHTTPClientDoCancelContext(t *testing.T) {
  149. tr := newFakeTransport()
  150. c := &simpleHTTPClient{transport: tr}
  151. tr.startCancel <- struct{}{}
  152. tr.finishCancel <- struct{}{}
  153. _, _, err := c.Do(context.Background(), &fakeAction{})
  154. if err == nil {
  155. t.Fatalf("expected non-nil error, got nil")
  156. }
  157. }
  158. type checkableReadCloser struct {
  159. io.ReadCloser
  160. closed bool
  161. }
  162. func (c *checkableReadCloser) Close() error {
  163. if !c.closed {
  164. c.closed = true
  165. return c.ReadCloser.Close()
  166. }
  167. return nil
  168. }
  169. func TestSimpleHTTPClientDoCancelContextResponseBodyClosed(t *testing.T) {
  170. tr := newFakeTransport()
  171. c := &simpleHTTPClient{transport: tr}
  172. // create an already-cancelled context
  173. ctx, cancel := context.WithCancel(context.Background())
  174. cancel()
  175. body := &checkableReadCloser{ReadCloser: ioutil.NopCloser(strings.NewReader("foo"))}
  176. go func() {
  177. // wait that simpleHTTPClient knows the context is already timed out,
  178. // and calls CancelRequest
  179. testutil.WaitSchedule()
  180. // response is returned before cancel effects
  181. tr.respchan <- &http.Response{Body: body}
  182. }()
  183. _, _, err := c.Do(ctx, &fakeAction{})
  184. if err == nil {
  185. t.Fatalf("expected non-nil error, got nil")
  186. }
  187. if !body.closed {
  188. t.Fatalf("expected closed body")
  189. }
  190. }
  191. type blockingBody struct {
  192. c chan struct{}
  193. }
  194. func (bb *blockingBody) Read(p []byte) (n int, err error) {
  195. <-bb.c
  196. return 0, errors.New("closed")
  197. }
  198. func (bb *blockingBody) Close() error {
  199. close(bb.c)
  200. return nil
  201. }
  202. func TestSimpleHTTPClientDoCancelContextResponseBodyClosedWithBlockingBody(t *testing.T) {
  203. tr := newFakeTransport()
  204. c := &simpleHTTPClient{transport: tr}
  205. ctx, cancel := context.WithCancel(context.Background())
  206. body := &checkableReadCloser{ReadCloser: &blockingBody{c: make(chan struct{})}}
  207. go func() {
  208. tr.respchan <- &http.Response{Body: body}
  209. time.Sleep(2 * time.Millisecond)
  210. // cancel after the body is received
  211. cancel()
  212. }()
  213. _, _, err := c.Do(ctx, &fakeAction{})
  214. if err != context.Canceled {
  215. t.Fatalf("expected %+v, got %+v", context.Canceled, err)
  216. }
  217. if !body.closed {
  218. t.Fatalf("expected closed body")
  219. }
  220. }
  221. func TestSimpleHTTPClientDoCancelContextWaitForRoundTrip(t *testing.T) {
  222. tr := newFakeTransport()
  223. c := &simpleHTTPClient{transport: tr}
  224. donechan := make(chan struct{})
  225. ctx, cancel := context.WithCancel(context.Background())
  226. go func() {
  227. c.Do(ctx, &fakeAction{})
  228. close(donechan)
  229. }()
  230. // This should call CancelRequest and begin the cancellation process
  231. cancel()
  232. select {
  233. case <-donechan:
  234. t.Fatalf("simpleHTTPClient.Do should not have exited yet")
  235. default:
  236. }
  237. tr.finishCancel <- struct{}{}
  238. select {
  239. case <-donechan:
  240. //expected behavior
  241. return
  242. case <-time.After(time.Second):
  243. t.Fatalf("simpleHTTPClient.Do did not exit within 1s")
  244. }
  245. }
  246. func TestSimpleHTTPClientDoHeaderTimeout(t *testing.T) {
  247. tr := newFakeTransport()
  248. tr.finishCancel <- struct{}{}
  249. c := &simpleHTTPClient{transport: tr, headerTimeout: time.Millisecond}
  250. errc := make(chan error)
  251. go func() {
  252. _, _, err := c.Do(context.Background(), &fakeAction{})
  253. errc <- err
  254. }()
  255. select {
  256. case err := <-errc:
  257. if err == nil {
  258. t.Fatalf("expected non-nil error, got nil")
  259. }
  260. case <-time.After(time.Second):
  261. t.Fatalf("unexpected timeout when waitting for the test to finish")
  262. }
  263. }
  264. func TestHTTPClusterClientDo(t *testing.T) {
  265. fakeErr := errors.New("fake!")
  266. fakeURL := url.URL{}
  267. tests := []struct {
  268. client *httpClusterClient
  269. wantCode int
  270. wantErr error
  271. wantPinned int
  272. }{
  273. // first good response short-circuits Do
  274. {
  275. client: &httpClusterClient{
  276. endpoints: []url.URL{fakeURL, fakeURL},
  277. clientFactory: newStaticHTTPClientFactory(
  278. []staticHTTPResponse{
  279. staticHTTPResponse{resp: http.Response{StatusCode: http.StatusTeapot}},
  280. staticHTTPResponse{err: fakeErr},
  281. },
  282. ),
  283. rand: rand.New(rand.NewSource(0)),
  284. },
  285. wantCode: http.StatusTeapot,
  286. },
  287. // fall through to good endpoint if err is arbitrary
  288. {
  289. client: &httpClusterClient{
  290. endpoints: []url.URL{fakeURL, fakeURL},
  291. clientFactory: newStaticHTTPClientFactory(
  292. []staticHTTPResponse{
  293. staticHTTPResponse{err: fakeErr},
  294. staticHTTPResponse{resp: http.Response{StatusCode: http.StatusTeapot}},
  295. },
  296. ),
  297. rand: rand.New(rand.NewSource(0)),
  298. },
  299. wantCode: http.StatusTeapot,
  300. wantPinned: 1,
  301. },
  302. // context.Canceled short-circuits Do
  303. {
  304. client: &httpClusterClient{
  305. endpoints: []url.URL{fakeURL, fakeURL},
  306. clientFactory: newStaticHTTPClientFactory(
  307. []staticHTTPResponse{
  308. staticHTTPResponse{err: context.Canceled},
  309. staticHTTPResponse{resp: http.Response{StatusCode: http.StatusTeapot}},
  310. },
  311. ),
  312. rand: rand.New(rand.NewSource(0)),
  313. },
  314. wantErr: context.Canceled,
  315. },
  316. // return err if there are no endpoints
  317. {
  318. client: &httpClusterClient{
  319. endpoints: []url.URL{},
  320. clientFactory: newHTTPClientFactory(nil, nil, 0),
  321. rand: rand.New(rand.NewSource(0)),
  322. },
  323. wantErr: ErrNoEndpoints,
  324. },
  325. // return err if all endpoints return arbitrary errors
  326. {
  327. client: &httpClusterClient{
  328. endpoints: []url.URL{fakeURL, fakeURL},
  329. clientFactory: newStaticHTTPClientFactory(
  330. []staticHTTPResponse{
  331. staticHTTPResponse{err: fakeErr},
  332. staticHTTPResponse{err: fakeErr},
  333. },
  334. ),
  335. rand: rand.New(rand.NewSource(0)),
  336. },
  337. wantErr: &ClusterError{Errors: []error{fakeErr, fakeErr}},
  338. },
  339. // 500-level errors cause Do to fallthrough to next endpoint
  340. {
  341. client: &httpClusterClient{
  342. endpoints: []url.URL{fakeURL, fakeURL},
  343. clientFactory: newStaticHTTPClientFactory(
  344. []staticHTTPResponse{
  345. staticHTTPResponse{resp: http.Response{StatusCode: http.StatusBadGateway}},
  346. staticHTTPResponse{resp: http.Response{StatusCode: http.StatusTeapot}},
  347. },
  348. ),
  349. rand: rand.New(rand.NewSource(0)),
  350. },
  351. wantCode: http.StatusTeapot,
  352. wantPinned: 1,
  353. },
  354. }
  355. for i, tt := range tests {
  356. resp, _, err := tt.client.Do(context.Background(), nil)
  357. if !reflect.DeepEqual(tt.wantErr, err) {
  358. t.Errorf("#%d: got err=%v, want=%v", i, err, tt.wantErr)
  359. continue
  360. }
  361. if resp == nil {
  362. if tt.wantCode != 0 {
  363. t.Errorf("#%d: resp is nil, want=%d", i, tt.wantCode)
  364. }
  365. continue
  366. }
  367. if resp.StatusCode != tt.wantCode {
  368. t.Errorf("#%d: resp code=%d, want=%d", i, resp.StatusCode, tt.wantCode)
  369. continue
  370. }
  371. if tt.client.pinned != tt.wantPinned {
  372. t.Errorf("#%d: pinned=%d, want=%d", i, tt.client.pinned, tt.wantPinned)
  373. }
  374. }
  375. }
  376. func TestHTTPClusterClientDoDeadlineExceedContext(t *testing.T) {
  377. fakeURL := url.URL{}
  378. tr := newFakeTransport()
  379. tr.finishCancel <- struct{}{}
  380. c := &httpClusterClient{
  381. clientFactory: newHTTPClientFactory(tr, DefaultCheckRedirect, 0),
  382. endpoints: []url.URL{fakeURL},
  383. }
  384. errc := make(chan error)
  385. go func() {
  386. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
  387. defer cancel()
  388. _, _, err := c.Do(ctx, &fakeAction{})
  389. errc <- err
  390. }()
  391. select {
  392. case err := <-errc:
  393. werr := &ClusterError{Errors: []error{context.DeadlineExceeded}}
  394. if !reflect.DeepEqual(err, werr) {
  395. t.Errorf("err = %+v, want %+v", err, werr)
  396. }
  397. case <-time.After(time.Second):
  398. t.Fatalf("unexpected timeout when waitting for request to deadline exceed")
  399. }
  400. }
  401. func TestRedirectedHTTPAction(t *testing.T) {
  402. act := &redirectedHTTPAction{
  403. action: &staticHTTPAction{
  404. request: http.Request{
  405. Method: "DELETE",
  406. URL: &url.URL{
  407. Scheme: "https",
  408. Host: "foo.example.com",
  409. Path: "/ping",
  410. },
  411. },
  412. },
  413. location: url.URL{
  414. Scheme: "https",
  415. Host: "bar.example.com",
  416. Path: "/pong",
  417. },
  418. }
  419. want := &http.Request{
  420. Method: "DELETE",
  421. URL: &url.URL{
  422. Scheme: "https",
  423. Host: "bar.example.com",
  424. Path: "/pong",
  425. },
  426. }
  427. got := act.HTTPRequest(url.URL{Scheme: "http", Host: "baz.example.com", Path: "/pang"})
  428. if !reflect.DeepEqual(want, got) {
  429. t.Fatalf("HTTPRequest is %#v, want %#v", want, got)
  430. }
  431. }
  432. func TestRedirectFollowingHTTPClient(t *testing.T) {
  433. tests := []struct {
  434. checkRedirect CheckRedirectFunc
  435. client httpClient
  436. wantCode int
  437. wantErr error
  438. }{
  439. // errors bubbled up
  440. {
  441. checkRedirect: func(int) error { return ErrTooManyRedirects },
  442. client: &multiStaticHTTPClient{
  443. responses: []staticHTTPResponse{
  444. staticHTTPResponse{
  445. err: errors.New("fail!"),
  446. },
  447. },
  448. },
  449. wantErr: errors.New("fail!"),
  450. },
  451. // no need to follow redirect if none given
  452. {
  453. checkRedirect: func(int) error { return ErrTooManyRedirects },
  454. client: &multiStaticHTTPClient{
  455. responses: []staticHTTPResponse{
  456. staticHTTPResponse{
  457. resp: http.Response{
  458. StatusCode: http.StatusTeapot,
  459. },
  460. },
  461. },
  462. },
  463. wantCode: http.StatusTeapot,
  464. },
  465. // redirects if less than max
  466. {
  467. checkRedirect: func(via int) error {
  468. if via >= 2 {
  469. return ErrTooManyRedirects
  470. }
  471. return nil
  472. },
  473. client: &multiStaticHTTPClient{
  474. responses: []staticHTTPResponse{
  475. staticHTTPResponse{
  476. resp: http.Response{
  477. StatusCode: http.StatusTemporaryRedirect,
  478. Header: http.Header{"Location": []string{"http://example.com"}},
  479. },
  480. },
  481. staticHTTPResponse{
  482. resp: http.Response{
  483. StatusCode: http.StatusTeapot,
  484. },
  485. },
  486. },
  487. },
  488. wantCode: http.StatusTeapot,
  489. },
  490. // succeed after reaching max redirects
  491. {
  492. checkRedirect: func(via int) error {
  493. if via >= 3 {
  494. return ErrTooManyRedirects
  495. }
  496. return nil
  497. },
  498. client: &multiStaticHTTPClient{
  499. responses: []staticHTTPResponse{
  500. staticHTTPResponse{
  501. resp: http.Response{
  502. StatusCode: http.StatusTemporaryRedirect,
  503. Header: http.Header{"Location": []string{"http://example.com"}},
  504. },
  505. },
  506. staticHTTPResponse{
  507. resp: http.Response{
  508. StatusCode: http.StatusTemporaryRedirect,
  509. Header: http.Header{"Location": []string{"http://example.com"}},
  510. },
  511. },
  512. staticHTTPResponse{
  513. resp: http.Response{
  514. StatusCode: http.StatusTeapot,
  515. },
  516. },
  517. },
  518. },
  519. wantCode: http.StatusTeapot,
  520. },
  521. // fail if too many redirects
  522. {
  523. checkRedirect: func(via int) error {
  524. if via >= 2 {
  525. return ErrTooManyRedirects
  526. }
  527. return nil
  528. },
  529. client: &multiStaticHTTPClient{
  530. responses: []staticHTTPResponse{
  531. staticHTTPResponse{
  532. resp: http.Response{
  533. StatusCode: http.StatusTemporaryRedirect,
  534. Header: http.Header{"Location": []string{"http://example.com"}},
  535. },
  536. },
  537. staticHTTPResponse{
  538. resp: http.Response{
  539. StatusCode: http.StatusTemporaryRedirect,
  540. Header: http.Header{"Location": []string{"http://example.com"}},
  541. },
  542. },
  543. staticHTTPResponse{
  544. resp: http.Response{
  545. StatusCode: http.StatusTeapot,
  546. },
  547. },
  548. },
  549. },
  550. wantErr: ErrTooManyRedirects,
  551. },
  552. // fail if Location header not set
  553. {
  554. checkRedirect: func(int) error { return ErrTooManyRedirects },
  555. client: &multiStaticHTTPClient{
  556. responses: []staticHTTPResponse{
  557. staticHTTPResponse{
  558. resp: http.Response{
  559. StatusCode: http.StatusTemporaryRedirect,
  560. },
  561. },
  562. },
  563. },
  564. wantErr: errors.New("Location header not set"),
  565. },
  566. // fail if Location header is invalid
  567. {
  568. checkRedirect: func(int) error { return ErrTooManyRedirects },
  569. client: &multiStaticHTTPClient{
  570. responses: []staticHTTPResponse{
  571. staticHTTPResponse{
  572. resp: http.Response{
  573. StatusCode: http.StatusTemporaryRedirect,
  574. Header: http.Header{"Location": []string{":"}},
  575. },
  576. },
  577. },
  578. },
  579. wantErr: errors.New("Location header not valid URL: :"),
  580. },
  581. // fail if redirects checked way too many times
  582. {
  583. checkRedirect: func(int) error { return nil },
  584. client: &staticHTTPClient{
  585. resp: http.Response{
  586. StatusCode: http.StatusTemporaryRedirect,
  587. Header: http.Header{"Location": []string{"http://example.com"}},
  588. },
  589. },
  590. wantErr: errTooManyRedirectChecks,
  591. },
  592. }
  593. for i, tt := range tests {
  594. client := &redirectFollowingHTTPClient{client: tt.client, checkRedirect: tt.checkRedirect}
  595. resp, _, err := client.Do(context.Background(), nil)
  596. if !reflect.DeepEqual(tt.wantErr, err) {
  597. t.Errorf("#%d: got err=%v, want=%v", i, err, tt.wantErr)
  598. continue
  599. }
  600. if resp == nil {
  601. if tt.wantCode != 0 {
  602. t.Errorf("#%d: resp is nil, want=%d", i, tt.wantCode)
  603. }
  604. continue
  605. }
  606. if resp.StatusCode != tt.wantCode {
  607. t.Errorf("#%d: resp code=%d, want=%d", i, resp.StatusCode, tt.wantCode)
  608. continue
  609. }
  610. }
  611. }
  612. func TestDefaultCheckRedirect(t *testing.T) {
  613. tests := []struct {
  614. num int
  615. err error
  616. }{
  617. {0, nil},
  618. {5, nil},
  619. {10, nil},
  620. {11, ErrTooManyRedirects},
  621. {29, ErrTooManyRedirects},
  622. }
  623. for i, tt := range tests {
  624. err := DefaultCheckRedirect(tt.num)
  625. if !reflect.DeepEqual(tt.err, err) {
  626. t.Errorf("#%d: want=%#v got=%#v", i, tt.err, err)
  627. }
  628. }
  629. }
  630. func TestHTTPClusterClientSync(t *testing.T) {
  631. cf := newStaticHTTPClientFactory([]staticHTTPResponse{
  632. staticHTTPResponse{
  633. resp: http.Response{StatusCode: http.StatusOK, Header: http.Header{"Content-Type": []string{"application/json"}}},
  634. body: []byte(`{"members":[{"id":"2745e2525fce8fe","peerURLs":["http://127.0.0.1:7003"],"name":"node3","clientURLs":["http://127.0.0.1:4003"]},{"id":"42134f434382925","peerURLs":["http://127.0.0.1:2380","http://127.0.0.1:7001"],"name":"node1","clientURLs":["http://127.0.0.1:2379","http://127.0.0.1:4001"]},{"id":"94088180e21eb87b","peerURLs":["http://127.0.0.1:7002"],"name":"node2","clientURLs":["http://127.0.0.1:4002"]}]}`),
  635. },
  636. })
  637. hc := &httpClusterClient{
  638. clientFactory: cf,
  639. rand: rand.New(rand.NewSource(0)),
  640. }
  641. err := hc.reset([]string{"http://127.0.0.1:2379"})
  642. if err != nil {
  643. t.Fatalf("unexpected error during setup: %#v", err)
  644. }
  645. want := []string{"http://127.0.0.1:2379"}
  646. got := hc.Endpoints()
  647. if !reflect.DeepEqual(want, got) {
  648. t.Fatalf("incorrect endpoints: want=%#v got=%#v", want, got)
  649. }
  650. err = hc.Sync(context.Background())
  651. if err != nil {
  652. t.Fatalf("unexpected error during Sync: %#v", err)
  653. }
  654. want = []string{"http://127.0.0.1:2379", "http://127.0.0.1:4001", "http://127.0.0.1:4002", "http://127.0.0.1:4003"}
  655. got = hc.Endpoints()
  656. sort.Sort(sort.StringSlice(got))
  657. if !reflect.DeepEqual(want, got) {
  658. t.Fatalf("incorrect endpoints post-Sync: want=%#v got=%#v", want, got)
  659. }
  660. err = hc.reset([]string{"http://127.0.0.1:4009"})
  661. if err != nil {
  662. t.Fatalf("unexpected error during reset: %#v", err)
  663. }
  664. want = []string{"http://127.0.0.1:4009"}
  665. got = hc.Endpoints()
  666. if !reflect.DeepEqual(want, got) {
  667. t.Fatalf("incorrect endpoints post-reset: want=%#v got=%#v", want, got)
  668. }
  669. }
  670. func TestHTTPClusterClientSyncFail(t *testing.T) {
  671. cf := newStaticHTTPClientFactory([]staticHTTPResponse{
  672. staticHTTPResponse{err: errors.New("fail!")},
  673. })
  674. hc := &httpClusterClient{
  675. clientFactory: cf,
  676. rand: rand.New(rand.NewSource(0)),
  677. }
  678. err := hc.reset([]string{"http://127.0.0.1:2379"})
  679. if err != nil {
  680. t.Fatalf("unexpected error during setup: %#v", err)
  681. }
  682. want := []string{"http://127.0.0.1:2379"}
  683. got := hc.Endpoints()
  684. if !reflect.DeepEqual(want, got) {
  685. t.Fatalf("incorrect endpoints: want=%#v got=%#v", want, got)
  686. }
  687. err = hc.Sync(context.Background())
  688. if err == nil {
  689. t.Fatalf("got nil error during Sync")
  690. }
  691. got = hc.Endpoints()
  692. if !reflect.DeepEqual(want, got) {
  693. t.Fatalf("incorrect endpoints after failed Sync: want=%#v got=%#v", want, got)
  694. }
  695. }
  696. func TestHTTPClusterClientAutoSyncCancelContext(t *testing.T) {
  697. cf := newStaticHTTPClientFactory([]staticHTTPResponse{
  698. staticHTTPResponse{
  699. resp: http.Response{StatusCode: http.StatusOK, Header: http.Header{"Content-Type": []string{"application/json"}}},
  700. body: []byte(`{"members":[{"id":"2745e2525fce8fe","peerURLs":["http://127.0.0.1:7003"],"name":"node3","clientURLs":["http://127.0.0.1:4003"]},{"id":"42134f434382925","peerURLs":["http://127.0.0.1:2380","http://127.0.0.1:7001"],"name":"node1","clientURLs":["http://127.0.0.1:2379","http://127.0.0.1:4001"]},{"id":"94088180e21eb87b","peerURLs":["http://127.0.0.1:7002"],"name":"node2","clientURLs":["http://127.0.0.1:4002"]}]}`),
  701. },
  702. })
  703. hc := &httpClusterClient{
  704. clientFactory: cf,
  705. rand: rand.New(rand.NewSource(0)),
  706. }
  707. err := hc.reset([]string{"http://127.0.0.1:2379"})
  708. if err != nil {
  709. t.Fatalf("unexpected error during setup: %#v", err)
  710. }
  711. ctx, cancel := context.WithCancel(context.Background())
  712. cancel()
  713. err = hc.AutoSync(ctx, time.Hour)
  714. if err != context.Canceled {
  715. t.Fatalf("incorrect error value: want=%v got=%v", context.Canceled, err)
  716. }
  717. }
  718. func TestHTTPClusterClientAutoSyncFail(t *testing.T) {
  719. cf := newStaticHTTPClientFactory([]staticHTTPResponse{
  720. staticHTTPResponse{err: errors.New("fail!")},
  721. })
  722. hc := &httpClusterClient{
  723. clientFactory: cf,
  724. rand: rand.New(rand.NewSource(0)),
  725. }
  726. err := hc.reset([]string{"http://127.0.0.1:2379"})
  727. if err != nil {
  728. t.Fatalf("unexpected error during setup: %#v", err)
  729. }
  730. err = hc.AutoSync(context.Background(), time.Hour)
  731. if err.Error() != ErrClusterUnavailable.Error() {
  732. t.Fatalf("incorrect error value: want=%v got=%v", ErrClusterUnavailable, err)
  733. }
  734. }
  735. // TestHTTPClusterClientSyncPinEndpoint tests that Sync() pins the endpoint when
  736. // it gets the exactly same member list as before.
  737. func TestHTTPClusterClientSyncPinEndpoint(t *testing.T) {
  738. cf := newStaticHTTPClientFactory([]staticHTTPResponse{
  739. staticHTTPResponse{
  740. resp: http.Response{StatusCode: http.StatusOK, Header: http.Header{"Content-Type": []string{"application/json"}}},
  741. body: []byte(`{"members":[{"id":"2745e2525fce8fe","peerURLs":["http://127.0.0.1:7003"],"name":"node3","clientURLs":["http://127.0.0.1:4003"]},{"id":"42134f434382925","peerURLs":["http://127.0.0.1:2380","http://127.0.0.1:7001"],"name":"node1","clientURLs":["http://127.0.0.1:2379","http://127.0.0.1:4001"]},{"id":"94088180e21eb87b","peerURLs":["http://127.0.0.1:7002"],"name":"node2","clientURLs":["http://127.0.0.1:4002"]}]}`),
  742. },
  743. staticHTTPResponse{
  744. resp: http.Response{StatusCode: http.StatusOK, Header: http.Header{"Content-Type": []string{"application/json"}}},
  745. body: []byte(`{"members":[{"id":"2745e2525fce8fe","peerURLs":["http://127.0.0.1:7003"],"name":"node3","clientURLs":["http://127.0.0.1:4003"]},{"id":"42134f434382925","peerURLs":["http://127.0.0.1:2380","http://127.0.0.1:7001"],"name":"node1","clientURLs":["http://127.0.0.1:2379","http://127.0.0.1:4001"]},{"id":"94088180e21eb87b","peerURLs":["http://127.0.0.1:7002"],"name":"node2","clientURLs":["http://127.0.0.1:4002"]}]}`),
  746. },
  747. staticHTTPResponse{
  748. resp: http.Response{StatusCode: http.StatusOK, Header: http.Header{"Content-Type": []string{"application/json"}}},
  749. body: []byte(`{"members":[{"id":"2745e2525fce8fe","peerURLs":["http://127.0.0.1:7003"],"name":"node3","clientURLs":["http://127.0.0.1:4003"]},{"id":"42134f434382925","peerURLs":["http://127.0.0.1:2380","http://127.0.0.1:7001"],"name":"node1","clientURLs":["http://127.0.0.1:2379","http://127.0.0.1:4001"]},{"id":"94088180e21eb87b","peerURLs":["http://127.0.0.1:7002"],"name":"node2","clientURLs":["http://127.0.0.1:4002"]}]}`),
  750. },
  751. })
  752. hc := &httpClusterClient{
  753. clientFactory: cf,
  754. rand: rand.New(rand.NewSource(0)),
  755. }
  756. err := hc.reset([]string{"http://127.0.0.1:4003", "http://127.0.0.1:2379", "http://127.0.0.1:4001", "http://127.0.0.1:4002"})
  757. if err != nil {
  758. t.Fatalf("unexpected error during setup: %#v", err)
  759. }
  760. pinnedEndpoint := hc.endpoints[hc.pinned]
  761. for i := 0; i < 3; i++ {
  762. err = hc.Sync(context.Background())
  763. if err != nil {
  764. t.Fatalf("#%d: unexpected error during Sync: %#v", i, err)
  765. }
  766. if g := hc.endpoints[hc.pinned]; g != pinnedEndpoint {
  767. t.Errorf("#%d: pinned endpoint = %s, want %s", i, g, pinnedEndpoint)
  768. }
  769. }
  770. }
  771. func TestHTTPClusterClientResetFail(t *testing.T) {
  772. tests := [][]string{
  773. // need at least one endpoint
  774. []string{},
  775. // urls must be valid
  776. []string{":"},
  777. }
  778. for i, tt := range tests {
  779. hc := &httpClusterClient{rand: rand.New(rand.NewSource(0))}
  780. err := hc.reset(tt)
  781. if err == nil {
  782. t.Errorf("#%d: expected non-nil error", i)
  783. }
  784. }
  785. }
  786. func TestHTTPClusterClientResetPinRandom(t *testing.T) {
  787. round := 2000
  788. pinNum := 0
  789. for i := 0; i < round; i++ {
  790. hc := &httpClusterClient{rand: rand.New(rand.NewSource(int64(i)))}
  791. err := hc.reset([]string{"http://127.0.0.1:4001", "http://127.0.0.1:4002", "http://127.0.0.1:4003"})
  792. if err != nil {
  793. t.Fatalf("#%d: reset error (%v)", i, err)
  794. }
  795. if hc.endpoints[hc.pinned].String() == "http://127.0.0.1:4001" {
  796. pinNum++
  797. }
  798. }
  799. min := 1.0/3.0 - 0.05
  800. max := 1.0/3.0 + 0.05
  801. if ratio := float64(pinNum) / float64(round); ratio > max || ratio < min {
  802. t.Errorf("pinned ratio = %v, want [%v, %v]", ratio, min, max)
  803. }
  804. }