http.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package client
  14. import (
  15. "errors"
  16. "fmt"
  17. "io/ioutil"
  18. "net/http"
  19. "net/url"
  20. "time"
  21. "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
  22. )
  23. var (
  24. ErrTimeout = context.DeadlineExceeded
  25. ErrCanceled = context.Canceled
  26. ErrNoEndpoints = errors.New("no endpoints available")
  27. ErrTooManyRedirects = errors.New("too many redirects")
  28. DefaultRequestTimeout = 5 * time.Second
  29. DefaultMaxRedirects = 10
  30. )
  31. type SyncableHTTPClient interface {
  32. HTTPClient
  33. Sync(context.Context) error
  34. }
  35. type HTTPClient interface {
  36. Do(context.Context, HTTPAction) (*http.Response, []byte, error)
  37. }
  38. type HTTPAction interface {
  39. HTTPRequest(url.URL) *http.Request
  40. }
  41. // CancelableTransport mimics http.Transport to provide an interface which can be
  42. // substituted for testing (since the RoundTripper interface alone does not
  43. // require the CancelRequest method)
  44. type CancelableTransport interface {
  45. http.RoundTripper
  46. CancelRequest(req *http.Request)
  47. }
  48. func NewHTTPClient(tr CancelableTransport, eps []string) (SyncableHTTPClient, error) {
  49. return newHTTPClusterClient(tr, eps)
  50. }
  51. func newHTTPClusterClient(tr CancelableTransport, eps []string) (*httpClusterClient, error) {
  52. c := httpClusterClient{
  53. transport: tr,
  54. endpoints: make([]HTTPClient, len(eps)),
  55. }
  56. for i, ep := range eps {
  57. u, err := url.Parse(ep)
  58. if err != nil {
  59. return nil, err
  60. }
  61. c.endpoints[i] = &redirectFollowingHTTPClient{
  62. max: DefaultMaxRedirects,
  63. client: &httpClient{
  64. transport: tr,
  65. endpoint: *u,
  66. },
  67. }
  68. }
  69. return &c, nil
  70. }
  71. type httpClusterClient struct {
  72. transport CancelableTransport
  73. endpoints []HTTPClient
  74. }
  75. func (c *httpClusterClient) Do(ctx context.Context, act HTTPAction) (resp *http.Response, body []byte, err error) {
  76. if len(c.endpoints) == 0 {
  77. return nil, nil, ErrNoEndpoints
  78. }
  79. for _, hc := range c.endpoints {
  80. resp, body, err = hc.Do(ctx, act)
  81. if err != nil {
  82. if err == ErrTimeout || err == ErrCanceled {
  83. return nil, nil, err
  84. }
  85. continue
  86. }
  87. if resp.StatusCode/100 == 5 {
  88. continue
  89. }
  90. break
  91. }
  92. return
  93. }
  94. func (c *httpClusterClient) Sync(ctx context.Context) error {
  95. mAPI := NewMembersAPI(c)
  96. ms, err := mAPI.List(ctx)
  97. if err != nil {
  98. return err
  99. }
  100. eps := make([]string, 0)
  101. for _, m := range ms {
  102. eps = append(eps, m.ClientURLs...)
  103. }
  104. if len(eps) == 0 {
  105. return ErrNoEndpoints
  106. }
  107. nc, err := newHTTPClusterClient(c.transport, eps)
  108. if err != nil {
  109. return err
  110. }
  111. *c = *nc
  112. return nil
  113. }
  114. type roundTripResponse struct {
  115. resp *http.Response
  116. err error
  117. }
  118. type httpClient struct {
  119. transport CancelableTransport
  120. endpoint url.URL
  121. }
  122. func (c *httpClient) Do(ctx context.Context, act HTTPAction) (*http.Response, []byte, error) {
  123. req := act.HTTPRequest(c.endpoint)
  124. rtchan := make(chan roundTripResponse, 1)
  125. go func() {
  126. resp, err := c.transport.RoundTrip(req)
  127. rtchan <- roundTripResponse{resp: resp, err: err}
  128. close(rtchan)
  129. }()
  130. var resp *http.Response
  131. var err error
  132. select {
  133. case rtresp := <-rtchan:
  134. resp, err = rtresp.resp, rtresp.err
  135. case <-ctx.Done():
  136. c.transport.CancelRequest(req)
  137. // wait for request to actually exit before continuing
  138. <-rtchan
  139. err = ctx.Err()
  140. }
  141. // always check for resp nil-ness to deal with possible
  142. // race conditions between channels above
  143. defer func() {
  144. if resp != nil {
  145. resp.Body.Close()
  146. }
  147. }()
  148. if err != nil {
  149. return nil, nil, err
  150. }
  151. body, err := ioutil.ReadAll(resp.Body)
  152. return resp, body, err
  153. }
  154. type redirectFollowingHTTPClient struct {
  155. client HTTPClient
  156. max int
  157. }
  158. func (r *redirectFollowingHTTPClient) Do(ctx context.Context, act HTTPAction) (*http.Response, []byte, error) {
  159. for i := 0; i <= r.max; i++ {
  160. resp, body, err := r.client.Do(ctx, act)
  161. if err != nil {
  162. return nil, nil, err
  163. }
  164. if resp.StatusCode/100 == 3 {
  165. hdr := resp.Header.Get("Location")
  166. if hdr == "" {
  167. return nil, nil, fmt.Errorf("Location header not set")
  168. }
  169. loc, err := url.Parse(hdr)
  170. if err != nil {
  171. return nil, nil, fmt.Errorf("Location header not valid URL: %s", hdr)
  172. }
  173. act = &redirectedHTTPAction{
  174. action: act,
  175. location: *loc,
  176. }
  177. continue
  178. }
  179. return resp, body, nil
  180. }
  181. return nil, nil, ErrTooManyRedirects
  182. }
  183. type redirectedHTTPAction struct {
  184. action HTTPAction
  185. location url.URL
  186. }
  187. func (r *redirectedHTTPAction) HTTPRequest(ep url.URL) *http.Request {
  188. orig := r.action.HTTPRequest(ep)
  189. orig.URL = &r.location
  190. return orig
  191. }