http.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package client
  14. import (
  15. "errors"
  16. "fmt"
  17. "io/ioutil"
  18. "net/http"
  19. "net/url"
  20. "time"
  21. "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
  22. )
  23. var (
  24. ErrTimeout = context.DeadlineExceeded
  25. ErrCanceled = context.Canceled
  26. ErrNoEndpoints = errors.New("no endpoints available")
  27. ErrTooManyRedirects = errors.New("too many redirects")
  28. DefaultRequestTimeout = 5 * time.Second
  29. DefaultMaxRedirects = 10
  30. )
  31. type SyncableHTTPClient interface {
  32. HTTPClient
  33. Sync(context.Context) error
  34. Endpoints() []string
  35. }
  36. type HTTPClient interface {
  37. Do(context.Context, HTTPAction) (*http.Response, []byte, error)
  38. }
  39. type HTTPAction interface {
  40. HTTPRequest(url.URL) *http.Request
  41. }
  42. // CancelableTransport mimics http.Transport to provide an interface which can be
  43. // substituted for testing (since the RoundTripper interface alone does not
  44. // require the CancelRequest method)
  45. type CancelableTransport interface {
  46. http.RoundTripper
  47. CancelRequest(req *http.Request)
  48. }
  49. func NewHTTPClient(tr CancelableTransport, eps []string) (SyncableHTTPClient, error) {
  50. return newHTTPClusterClient(tr, eps)
  51. }
  52. func newHTTPClusterClient(tr CancelableTransport, eps []string) (*httpClusterClient, error) {
  53. c := httpClusterClient{
  54. transport: tr,
  55. endpoints: eps,
  56. clients: make([]HTTPClient, len(eps)),
  57. }
  58. for i, ep := range eps {
  59. u, err := url.Parse(ep)
  60. if err != nil {
  61. return nil, err
  62. }
  63. c.clients[i] = &redirectFollowingHTTPClient{
  64. max: DefaultMaxRedirects,
  65. client: &httpClient{
  66. transport: tr,
  67. endpoint: *u,
  68. },
  69. }
  70. }
  71. return &c, nil
  72. }
  73. type httpClusterClient struct {
  74. transport CancelableTransport
  75. endpoints []string
  76. clients []HTTPClient
  77. }
  78. func (c *httpClusterClient) Do(ctx context.Context, act HTTPAction) (resp *http.Response, body []byte, err error) {
  79. if len(c.clients) == 0 {
  80. return nil, nil, ErrNoEndpoints
  81. }
  82. for _, hc := range c.clients {
  83. resp, body, err = hc.Do(ctx, act)
  84. if err != nil {
  85. if err == ErrTimeout || err == ErrCanceled {
  86. return nil, nil, err
  87. }
  88. continue
  89. }
  90. if resp.StatusCode/100 == 5 {
  91. continue
  92. }
  93. break
  94. }
  95. return
  96. }
  97. func (c *httpClusterClient) Endpoints() []string {
  98. return c.endpoints
  99. }
  100. func (c *httpClusterClient) Sync(ctx context.Context) error {
  101. mAPI := NewMembersAPI(c)
  102. ms, err := mAPI.List(ctx)
  103. if err != nil {
  104. return err
  105. }
  106. eps := make([]string, 0)
  107. for _, m := range ms {
  108. eps = append(eps, m.ClientURLs...)
  109. }
  110. if len(eps) == 0 {
  111. return ErrNoEndpoints
  112. }
  113. nc, err := newHTTPClusterClient(c.transport, eps)
  114. if err != nil {
  115. return err
  116. }
  117. *c = *nc
  118. return nil
  119. }
  120. type roundTripResponse struct {
  121. resp *http.Response
  122. err error
  123. }
  124. type httpClient struct {
  125. transport CancelableTransport
  126. endpoint url.URL
  127. }
  128. func (c *httpClient) Do(ctx context.Context, act HTTPAction) (*http.Response, []byte, error) {
  129. req := act.HTTPRequest(c.endpoint)
  130. rtchan := make(chan roundTripResponse, 1)
  131. go func() {
  132. resp, err := c.transport.RoundTrip(req)
  133. rtchan <- roundTripResponse{resp: resp, err: err}
  134. close(rtchan)
  135. }()
  136. var resp *http.Response
  137. var err error
  138. select {
  139. case rtresp := <-rtchan:
  140. resp, err = rtresp.resp, rtresp.err
  141. case <-ctx.Done():
  142. c.transport.CancelRequest(req)
  143. // wait for request to actually exit before continuing
  144. <-rtchan
  145. err = ctx.Err()
  146. }
  147. // always check for resp nil-ness to deal with possible
  148. // race conditions between channels above
  149. defer func() {
  150. if resp != nil {
  151. resp.Body.Close()
  152. }
  153. }()
  154. if err != nil {
  155. return nil, nil, err
  156. }
  157. body, err := ioutil.ReadAll(resp.Body)
  158. return resp, body, err
  159. }
  160. type redirectFollowingHTTPClient struct {
  161. client HTTPClient
  162. max int
  163. }
  164. func (r *redirectFollowingHTTPClient) Do(ctx context.Context, act HTTPAction) (*http.Response, []byte, error) {
  165. for i := 0; i <= r.max; i++ {
  166. resp, body, err := r.client.Do(ctx, act)
  167. if err != nil {
  168. return nil, nil, err
  169. }
  170. if resp.StatusCode/100 == 3 {
  171. hdr := resp.Header.Get("Location")
  172. if hdr == "" {
  173. return nil, nil, fmt.Errorf("Location header not set")
  174. }
  175. loc, err := url.Parse(hdr)
  176. if err != nil {
  177. return nil, nil, fmt.Errorf("Location header not valid URL: %s", hdr)
  178. }
  179. act = &redirectedHTTPAction{
  180. action: act,
  181. location: *loc,
  182. }
  183. continue
  184. }
  185. return resp, body, nil
  186. }
  187. return nil, nil, ErrTooManyRedirects
  188. }
  189. type redirectedHTTPAction struct {
  190. action HTTPAction
  191. location url.URL
  192. }
  193. func (r *redirectedHTTPAction) HTTPRequest(ep url.URL) *http.Request {
  194. orig := r.action.HTTPRequest(ep)
  195. orig.URL = &r.location
  196. return orig
  197. }