http.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. // Copyright 2015 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package client
  15. import (
  16. "errors"
  17. "fmt"
  18. "io/ioutil"
  19. "net/http"
  20. "net/url"
  21. "time"
  22. "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
  23. )
  24. var (
  25. ErrTimeout = context.DeadlineExceeded
  26. ErrCanceled = context.Canceled
  27. ErrNoEndpoints = errors.New("no endpoints available")
  28. ErrTooManyRedirects = errors.New("too many redirects")
  29. DefaultRequestTimeout = 5 * time.Second
  30. DefaultMaxRedirects = 10
  31. )
  32. type SyncableHTTPClient interface {
  33. HTTPClient
  34. Sync(context.Context) error
  35. Endpoints() []string
  36. }
  37. type HTTPClient interface {
  38. Do(context.Context, HTTPAction) (*http.Response, []byte, error)
  39. }
  40. type HTTPAction interface {
  41. HTTPRequest(url.URL) *http.Request
  42. }
  43. // CancelableTransport mimics http.Transport to provide an interface which can be
  44. // substituted for testing (since the RoundTripper interface alone does not
  45. // require the CancelRequest method)
  46. type CancelableTransport interface {
  47. http.RoundTripper
  48. CancelRequest(req *http.Request)
  49. }
  50. func NewHTTPClient(tr CancelableTransport, eps []string) (SyncableHTTPClient, error) {
  51. return newHTTPClusterClient(tr, eps)
  52. }
  53. func newHTTPClusterClient(tr CancelableTransport, eps []string) (*httpClusterClient, error) {
  54. c := httpClusterClient{
  55. transport: tr,
  56. endpoints: eps,
  57. clients: make([]HTTPClient, len(eps)),
  58. }
  59. for i, ep := range eps {
  60. u, err := url.Parse(ep)
  61. if err != nil {
  62. return nil, err
  63. }
  64. c.clients[i] = &redirectFollowingHTTPClient{
  65. max: DefaultMaxRedirects,
  66. client: &httpClient{
  67. transport: tr,
  68. endpoint: *u,
  69. },
  70. }
  71. }
  72. return &c, nil
  73. }
  74. type httpClusterClient struct {
  75. transport CancelableTransport
  76. endpoints []string
  77. clients []HTTPClient
  78. }
  79. func (c *httpClusterClient) Do(ctx context.Context, act HTTPAction) (resp *http.Response, body []byte, err error) {
  80. if len(c.clients) == 0 {
  81. return nil, nil, ErrNoEndpoints
  82. }
  83. for _, hc := range c.clients {
  84. resp, body, err = hc.Do(ctx, act)
  85. if err != nil {
  86. if err == ErrTimeout || err == ErrCanceled {
  87. return nil, nil, err
  88. }
  89. continue
  90. }
  91. if resp.StatusCode/100 == 5 {
  92. continue
  93. }
  94. break
  95. }
  96. return
  97. }
  98. func (c *httpClusterClient) Endpoints() []string {
  99. return c.endpoints
  100. }
  101. func (c *httpClusterClient) Sync(ctx context.Context) error {
  102. mAPI := NewMembersAPI(c)
  103. ms, err := mAPI.List(ctx)
  104. if err != nil {
  105. return err
  106. }
  107. eps := make([]string, 0)
  108. for _, m := range ms {
  109. eps = append(eps, m.ClientURLs...)
  110. }
  111. if len(eps) == 0 {
  112. return ErrNoEndpoints
  113. }
  114. nc, err := newHTTPClusterClient(c.transport, eps)
  115. if err != nil {
  116. return err
  117. }
  118. *c = *nc
  119. return nil
  120. }
  121. type roundTripResponse struct {
  122. resp *http.Response
  123. err error
  124. }
  125. type httpClient struct {
  126. transport CancelableTransport
  127. endpoint url.URL
  128. }
  129. func (c *httpClient) Do(ctx context.Context, act HTTPAction) (*http.Response, []byte, error) {
  130. req := act.HTTPRequest(c.endpoint)
  131. rtchan := make(chan roundTripResponse, 1)
  132. go func() {
  133. resp, err := c.transport.RoundTrip(req)
  134. rtchan <- roundTripResponse{resp: resp, err: err}
  135. close(rtchan)
  136. }()
  137. var resp *http.Response
  138. var err error
  139. select {
  140. case rtresp := <-rtchan:
  141. resp, err = rtresp.resp, rtresp.err
  142. case <-ctx.Done():
  143. c.transport.CancelRequest(req)
  144. // wait for request to actually exit before continuing
  145. <-rtchan
  146. err = ctx.Err()
  147. }
  148. // always check for resp nil-ness to deal with possible
  149. // race conditions between channels above
  150. defer func() {
  151. if resp != nil {
  152. resp.Body.Close()
  153. }
  154. }()
  155. if err != nil {
  156. return nil, nil, err
  157. }
  158. body, err := ioutil.ReadAll(resp.Body)
  159. return resp, body, err
  160. }
  161. type redirectFollowingHTTPClient struct {
  162. client HTTPClient
  163. max int
  164. }
  165. func (r *redirectFollowingHTTPClient) Do(ctx context.Context, act HTTPAction) (*http.Response, []byte, error) {
  166. for i := 0; i <= r.max; i++ {
  167. resp, body, err := r.client.Do(ctx, act)
  168. if err != nil {
  169. return nil, nil, err
  170. }
  171. if resp.StatusCode/100 == 3 {
  172. hdr := resp.Header.Get("Location")
  173. if hdr == "" {
  174. return nil, nil, fmt.Errorf("Location header not set")
  175. }
  176. loc, err := url.Parse(hdr)
  177. if err != nil {
  178. return nil, nil, fmt.Errorf("Location header not valid URL: %s", hdr)
  179. }
  180. act = &redirectedHTTPAction{
  181. action: act,
  182. location: *loc,
  183. }
  184. continue
  185. }
  186. return resp, body, nil
  187. }
  188. return nil, nil, ErrTooManyRedirects
  189. }
  190. type redirectedHTTPAction struct {
  191. action HTTPAction
  192. location url.URL
  193. }
  194. func (r *redirectedHTTPAction) HTTPRequest(ep url.URL) *http.Request {
  195. orig := r.action.HTTPRequest(ep)
  196. orig.URL = &r.location
  197. return orig
  198. }