http.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package client
  14. import (
  15. "errors"
  16. "fmt"
  17. "io/ioutil"
  18. "net/http"
  19. "net/url"
  20. "time"
  21. "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
  22. )
  23. var (
  24. ErrTimeout = context.DeadlineExceeded
  25. ErrCanceled = context.Canceled
  26. ErrTooManyRedirects = errors.New("too many redirects")
  27. DefaultRequestTimeout = 5 * time.Second
  28. DefaultMaxRedirects = 10
  29. )
  30. type SyncableHTTPClient interface {
  31. HTTPClient
  32. Sync(context.Context) error
  33. }
  34. type HTTPClient interface {
  35. Do(context.Context, HTTPAction) (*http.Response, []byte, error)
  36. }
  37. type HTTPAction interface {
  38. HTTPRequest(url.URL) *http.Request
  39. }
  40. // CancelableTransport mimics http.Transport to provide an interface which can be
  41. // substituted for testing (since the RoundTripper interface alone does not
  42. // require the CancelRequest method)
  43. type CancelableTransport interface {
  44. http.RoundTripper
  45. CancelRequest(req *http.Request)
  46. }
  47. func NewHTTPClient(tr CancelableTransport, eps []string) (SyncableHTTPClient, error) {
  48. return newHTTPClusterClient(tr, eps)
  49. }
  50. func newHTTPClusterClient(tr CancelableTransport, eps []string) (*httpClusterClient, error) {
  51. c := httpClusterClient{
  52. transport: tr,
  53. endpoints: make([]HTTPClient, len(eps)),
  54. }
  55. for i, ep := range eps {
  56. u, err := url.Parse(ep)
  57. if err != nil {
  58. return nil, err
  59. }
  60. c.endpoints[i] = &redirectFollowingHTTPClient{
  61. max: DefaultMaxRedirects,
  62. client: &httpClient{
  63. transport: tr,
  64. endpoint: *u,
  65. },
  66. }
  67. }
  68. return &c, nil
  69. }
  70. type httpClusterClient struct {
  71. transport CancelableTransport
  72. endpoints []HTTPClient
  73. }
  74. func (c *httpClusterClient) Do(ctx context.Context, act HTTPAction) (resp *http.Response, body []byte, err error) {
  75. for _, hc := range c.endpoints {
  76. resp, body, err = hc.Do(ctx, act)
  77. if err != nil {
  78. if err == ErrTimeout || err == ErrCanceled {
  79. return nil, nil, err
  80. }
  81. continue
  82. }
  83. if resp.StatusCode/100 == 5 {
  84. continue
  85. }
  86. break
  87. }
  88. return
  89. }
  90. func (c *httpClusterClient) Sync(ctx context.Context) error {
  91. mAPI := NewMembersAPI(c)
  92. ms, err := mAPI.List(ctx)
  93. if err != nil {
  94. return err
  95. }
  96. eps := make([]string, 0)
  97. for _, m := range ms {
  98. eps = append(eps, m.ClientURLs...)
  99. }
  100. nc, err := newHTTPClusterClient(c.transport, eps)
  101. if err != nil {
  102. return err
  103. }
  104. *c = *nc
  105. return nil
  106. }
  107. type roundTripResponse struct {
  108. resp *http.Response
  109. err error
  110. }
  111. type httpClient struct {
  112. transport CancelableTransport
  113. endpoint url.URL
  114. }
  115. func (c *httpClient) Do(ctx context.Context, act HTTPAction) (*http.Response, []byte, error) {
  116. req := act.HTTPRequest(c.endpoint)
  117. rtchan := make(chan roundTripResponse, 1)
  118. go func() {
  119. resp, err := c.transport.RoundTrip(req)
  120. rtchan <- roundTripResponse{resp: resp, err: err}
  121. close(rtchan)
  122. }()
  123. var resp *http.Response
  124. var err error
  125. select {
  126. case rtresp := <-rtchan:
  127. resp, err = rtresp.resp, rtresp.err
  128. case <-ctx.Done():
  129. c.transport.CancelRequest(req)
  130. // wait for request to actually exit before continuing
  131. <-rtchan
  132. err = ctx.Err()
  133. }
  134. // always check for resp nil-ness to deal with possible
  135. // race conditions between channels above
  136. defer func() {
  137. if resp != nil {
  138. resp.Body.Close()
  139. }
  140. }()
  141. if err != nil {
  142. return nil, nil, err
  143. }
  144. body, err := ioutil.ReadAll(resp.Body)
  145. return resp, body, err
  146. }
  147. type redirectFollowingHTTPClient struct {
  148. client HTTPClient
  149. max int
  150. }
  151. func (r *redirectFollowingHTTPClient) Do(ctx context.Context, act HTTPAction) (*http.Response, []byte, error) {
  152. for i := 0; i <= r.max; i++ {
  153. resp, body, err := r.client.Do(ctx, act)
  154. if err != nil {
  155. return nil, nil, err
  156. }
  157. if resp.StatusCode/100 == 3 {
  158. hdr := resp.Header.Get("Location")
  159. if hdr == "" {
  160. return nil, nil, fmt.Errorf("Location header not set")
  161. }
  162. loc, err := url.Parse(hdr)
  163. if err != nil {
  164. return nil, nil, fmt.Errorf("Location header not valid URL: %s", hdr)
  165. }
  166. act = &redirectedHTTPAction{
  167. action: act,
  168. location: *loc,
  169. }
  170. continue
  171. }
  172. return resp, body, nil
  173. }
  174. return nil, nil, ErrTooManyRedirects
  175. }
  176. type redirectedHTTPAction struct {
  177. action HTTPAction
  178. location url.URL
  179. }
  180. func (r *redirectedHTTPAction) HTTPRequest(ep url.URL) *http.Request {
  181. orig := r.action.HTTPRequest(ep)
  182. orig.URL = &r.location
  183. return orig
  184. }