http.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package client
  14. import (
  15. "io/ioutil"
  16. "net/http"
  17. "net/url"
  18. "time"
  19. "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
  20. )
  21. var (
  22. ErrTimeout = context.DeadlineExceeded
  23. ErrCanceled = context.Canceled
  24. DefaultRequestTimeout = 5 * time.Second
  25. )
  26. type SyncableHTTPClient interface {
  27. HTTPClient
  28. Sync(context.Context) error
  29. }
  30. type HTTPClient interface {
  31. Do(context.Context, HTTPAction) (*http.Response, []byte, error)
  32. }
  33. type HTTPAction interface {
  34. HTTPRequest(url.URL) *http.Request
  35. }
  36. // CancelableTransport mimics http.Transport to provide an interface which can be
  37. // substituted for testing (since the RoundTripper interface alone does not
  38. // require the CancelRequest method)
  39. type CancelableTransport interface {
  40. http.RoundTripper
  41. CancelRequest(req *http.Request)
  42. }
  43. func NewHTTPClient(tr CancelableTransport, eps []string) (SyncableHTTPClient, error) {
  44. return newHTTPClusterClient(tr, eps)
  45. }
  46. func newHTTPClusterClient(tr CancelableTransport, eps []string) (*httpClusterClient, error) {
  47. c := httpClusterClient{
  48. transport: tr,
  49. endpoints: make([]HTTPClient, len(eps)),
  50. }
  51. for i, ep := range eps {
  52. u, err := url.Parse(ep)
  53. if err != nil {
  54. return nil, err
  55. }
  56. c.endpoints[i] = &httpClient{
  57. transport: tr,
  58. endpoint: *u,
  59. }
  60. }
  61. return &c, nil
  62. }
  63. type httpClusterClient struct {
  64. transport CancelableTransport
  65. endpoints []HTTPClient
  66. }
  67. func (c *httpClusterClient) Do(ctx context.Context, act HTTPAction) (resp *http.Response, body []byte, err error) {
  68. for _, hc := range c.endpoints {
  69. resp, body, err = hc.Do(ctx, act)
  70. if err != nil {
  71. if err == ErrTimeout || err == ErrCanceled {
  72. return nil, nil, err
  73. }
  74. continue
  75. }
  76. if resp.StatusCode/100 == 5 {
  77. continue
  78. }
  79. break
  80. }
  81. return
  82. }
  83. func (c *httpClusterClient) Sync(ctx context.Context) error {
  84. mAPI := NewMembersAPI(c)
  85. ms, err := mAPI.List(ctx)
  86. if err != nil {
  87. return err
  88. }
  89. eps := make([]string, 0)
  90. for _, m := range ms {
  91. eps = append(eps, m.ClientURLs...)
  92. }
  93. nc, err := newHTTPClusterClient(c.transport, eps)
  94. if err != nil {
  95. return err
  96. }
  97. *c = *nc
  98. return nil
  99. }
  100. type roundTripResponse struct {
  101. resp *http.Response
  102. err error
  103. }
  104. type httpClient struct {
  105. transport CancelableTransport
  106. endpoint url.URL
  107. }
  108. func (c *httpClient) Do(ctx context.Context, act HTTPAction) (*http.Response, []byte, error) {
  109. req := act.HTTPRequest(c.endpoint)
  110. rtchan := make(chan roundTripResponse, 1)
  111. go func() {
  112. resp, err := c.transport.RoundTrip(req)
  113. rtchan <- roundTripResponse{resp: resp, err: err}
  114. close(rtchan)
  115. }()
  116. var resp *http.Response
  117. var err error
  118. select {
  119. case rtresp := <-rtchan:
  120. resp, err = rtresp.resp, rtresp.err
  121. case <-ctx.Done():
  122. c.transport.CancelRequest(req)
  123. // wait for request to actually exit before continuing
  124. <-rtchan
  125. err = ctx.Err()
  126. }
  127. // always check for resp nil-ness to deal with possible
  128. // race conditions between channels above
  129. defer func() {
  130. if resp != nil {
  131. resp.Body.Close()
  132. }
  133. }()
  134. if err != nil {
  135. return nil, nil, err
  136. }
  137. body, err := ioutil.ReadAll(resp.Body)
  138. return resp, body, err
  139. }