http.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. // Copyright 2015 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package client
  15. import (
  16. "errors"
  17. "fmt"
  18. "io/ioutil"
  19. "net/http"
  20. "net/url"
  21. "time"
  22. "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
  23. )
  24. var (
  25. ErrTimeout = context.DeadlineExceeded
  26. ErrCanceled = context.Canceled
  27. ErrNoEndpoints = errors.New("no endpoints available")
  28. ErrTooManyRedirects = errors.New("too many redirects")
  29. DefaultRequestTimeout = 5 * time.Second
  30. DefaultMaxRedirects = 10
  31. )
  32. type ClientConfig struct {
  33. Endpoints []string
  34. Transport CancelableTransport
  35. }
  36. func New(cfg ClientConfig) (SyncableHTTPClient, error) {
  37. return newHTTPClusterClient(cfg.Transport, cfg.Endpoints)
  38. }
  39. type SyncableHTTPClient interface {
  40. HTTPClient
  41. Sync(context.Context) error
  42. Endpoints() []string
  43. }
  44. type HTTPClient interface {
  45. Do(context.Context, HTTPAction) (*http.Response, []byte, error)
  46. }
  47. type HTTPAction interface {
  48. HTTPRequest(url.URL) *http.Request
  49. }
  50. // CancelableTransport mimics http.Transport to provide an interface which can be
  51. // substituted for testing (since the RoundTripper interface alone does not
  52. // require the CancelRequest method)
  53. type CancelableTransport interface {
  54. http.RoundTripper
  55. CancelRequest(req *http.Request)
  56. }
  57. func newHTTPClusterClient(tr CancelableTransport, eps []string) (*httpClusterClient, error) {
  58. c := httpClusterClient{
  59. transport: tr,
  60. endpoints: eps,
  61. clients: make([]HTTPClient, len(eps)),
  62. }
  63. for i, ep := range eps {
  64. u, err := url.Parse(ep)
  65. if err != nil {
  66. return nil, err
  67. }
  68. c.clients[i] = &redirectFollowingHTTPClient{
  69. max: DefaultMaxRedirects,
  70. client: &httpClient{
  71. transport: tr,
  72. endpoint: *u,
  73. },
  74. }
  75. }
  76. return &c, nil
  77. }
  78. type httpClusterClient struct {
  79. transport CancelableTransport
  80. endpoints []string
  81. clients []HTTPClient
  82. }
  83. func (c *httpClusterClient) Do(ctx context.Context, act HTTPAction) (resp *http.Response, body []byte, err error) {
  84. if len(c.clients) == 0 {
  85. return nil, nil, ErrNoEndpoints
  86. }
  87. for _, hc := range c.clients {
  88. resp, body, err = hc.Do(ctx, act)
  89. if err != nil {
  90. if err == ErrTimeout || err == ErrCanceled {
  91. return nil, nil, err
  92. }
  93. continue
  94. }
  95. if resp.StatusCode/100 == 5 {
  96. continue
  97. }
  98. break
  99. }
  100. return
  101. }
  102. func (c *httpClusterClient) Endpoints() []string {
  103. return c.endpoints
  104. }
  105. func (c *httpClusterClient) Sync(ctx context.Context) error {
  106. mAPI := NewMembersAPI(c)
  107. ms, err := mAPI.List(ctx)
  108. if err != nil {
  109. return err
  110. }
  111. eps := make([]string, 0)
  112. for _, m := range ms {
  113. eps = append(eps, m.ClientURLs...)
  114. }
  115. if len(eps) == 0 {
  116. return ErrNoEndpoints
  117. }
  118. nc, err := newHTTPClusterClient(c.transport, eps)
  119. if err != nil {
  120. return err
  121. }
  122. *c = *nc
  123. return nil
  124. }
  125. type roundTripResponse struct {
  126. resp *http.Response
  127. err error
  128. }
  129. type httpClient struct {
  130. transport CancelableTransport
  131. endpoint url.URL
  132. }
  133. func (c *httpClient) Do(ctx context.Context, act HTTPAction) (*http.Response, []byte, error) {
  134. req := act.HTTPRequest(c.endpoint)
  135. rtchan := make(chan roundTripResponse, 1)
  136. go func() {
  137. resp, err := c.transport.RoundTrip(req)
  138. rtchan <- roundTripResponse{resp: resp, err: err}
  139. close(rtchan)
  140. }()
  141. var resp *http.Response
  142. var err error
  143. select {
  144. case rtresp := <-rtchan:
  145. resp, err = rtresp.resp, rtresp.err
  146. case <-ctx.Done():
  147. c.transport.CancelRequest(req)
  148. // wait for request to actually exit before continuing
  149. <-rtchan
  150. err = ctx.Err()
  151. }
  152. // always check for resp nil-ness to deal with possible
  153. // race conditions between channels above
  154. defer func() {
  155. if resp != nil {
  156. resp.Body.Close()
  157. }
  158. }()
  159. if err != nil {
  160. return nil, nil, err
  161. }
  162. body, err := ioutil.ReadAll(resp.Body)
  163. return resp, body, err
  164. }
  165. type redirectFollowingHTTPClient struct {
  166. client HTTPClient
  167. max int
  168. }
  169. func (r *redirectFollowingHTTPClient) Do(ctx context.Context, act HTTPAction) (*http.Response, []byte, error) {
  170. for i := 0; i <= r.max; i++ {
  171. resp, body, err := r.client.Do(ctx, act)
  172. if err != nil {
  173. return nil, nil, err
  174. }
  175. if resp.StatusCode/100 == 3 {
  176. hdr := resp.Header.Get("Location")
  177. if hdr == "" {
  178. return nil, nil, fmt.Errorf("Location header not set")
  179. }
  180. loc, err := url.Parse(hdr)
  181. if err != nil {
  182. return nil, nil, fmt.Errorf("Location header not valid URL: %s", hdr)
  183. }
  184. act = &redirectedHTTPAction{
  185. action: act,
  186. location: *loc,
  187. }
  188. continue
  189. }
  190. return resp, body, nil
  191. }
  192. return nil, nil, ErrTooManyRedirects
  193. }
  194. type redirectedHTTPAction struct {
  195. action HTTPAction
  196. location url.URL
  197. }
  198. func (r *redirectedHTTPAction) HTTPRequest(ep url.URL) *http.Request {
  199. orig := r.action.HTTPRequest(ep)
  200. orig.URL = &r.location
  201. return orig
  202. }