client.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336
  1. // Copyright 2015 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package client
  15. import (
  16. "errors"
  17. "fmt"
  18. "io/ioutil"
  19. "net"
  20. "net/http"
  21. "net/url"
  22. "sync"
  23. "time"
  24. "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
  25. )
  26. var (
  27. ErrNoEndpoints = errors.New("client: no endpoints available")
  28. ErrTooManyRedirects = errors.New("client: too many redirects")
  29. )
  30. var DefaultRequestTimeout = 5 * time.Second
  31. var DefaultTransport CancelableTransport = &http.Transport{
  32. Proxy: http.ProxyFromEnvironment,
  33. Dial: (&net.Dialer{
  34. Timeout: 30 * time.Second,
  35. KeepAlive: 30 * time.Second,
  36. }).Dial,
  37. TLSHandshakeTimeout: 10 * time.Second,
  38. }
  39. type Config struct {
  40. // Endpoints defines a set of URLs (schemes, hosts and ports only)
  41. // that can be used to communicate with a logical etcd cluster. For
  42. // example, a three-node cluster could be provided like so:
  43. //
  44. // Endpoints: []string{
  45. // "http://node1.example.com:4001",
  46. // "http://node2.example.com:2379",
  47. // "http://node3.example.com:4001",
  48. // }
  49. //
  50. // If multiple endpoints are provided, the Client will attempt to
  51. // use them all in the event that one or more of them are unusable.
  52. //
  53. // If Client.Sync is ever called, the Client may cache an alternate
  54. // set of endpoints to continue operation.
  55. Endpoints []string
  56. // Transport is used by the Client to drive HTTP requests. If not
  57. // provided, DefaultTransport will be used.
  58. Transport CancelableTransport
  59. // CheckRedirect specifies the policy for handling HTTP redirects.
  60. // If CheckRedirect is not nil, the Client calls it before
  61. // following an HTTP redirect. The sole argument is the number of
  62. // requests that have alrady been made. If CheckRedirect returns
  63. // an error, Client.Do will not make any further requests and return
  64. // the error back it to the caller.
  65. //
  66. // If CheckRedirect is nil, the Client uses its default policy,
  67. // which is to stop after 10 consecutive requests.
  68. CheckRedirect CheckRedirectFunc
  69. }
  70. func (cfg *Config) transport() CancelableTransport {
  71. if cfg.Transport == nil {
  72. return DefaultTransport
  73. }
  74. return cfg.Transport
  75. }
  76. func (cfg *Config) checkRedirect() CheckRedirectFunc {
  77. if cfg.CheckRedirect == nil {
  78. return DefaultCheckRedirect
  79. }
  80. return cfg.CheckRedirect
  81. }
  82. // CancelableTransport mimics net/http.Transport, but requires that
  83. // the object also support request cancellation.
  84. type CancelableTransport interface {
  85. http.RoundTripper
  86. CancelRequest(req *http.Request)
  87. }
  88. type CheckRedirectFunc func(via int) error
  89. // DefaultCheckRedirect follows up to 10 redirects, but no more.
  90. var DefaultCheckRedirect CheckRedirectFunc = func(via int) error {
  91. if via > 10 {
  92. return ErrTooManyRedirects
  93. }
  94. return nil
  95. }
  96. type Client interface {
  97. // Sync updates the internal cache of the etcd cluster's membership.
  98. Sync(context.Context) error
  99. // Endpoints returns a copy of the current set of API endpoints used
  100. // by Client to resolve HTTP requests. If Sync has ever been called,
  101. // this may differ from the initial Endpoints provided in the Config.
  102. Endpoints() []string
  103. httpClient
  104. }
  105. func New(cfg Config) (Client, error) {
  106. c := &httpClusterClient{clientFactory: newHTTPClientFactory(cfg.transport(), cfg.checkRedirect())}
  107. if err := c.reset(cfg.Endpoints); err != nil {
  108. return nil, err
  109. }
  110. return c, nil
  111. }
  112. type httpClient interface {
  113. Do(context.Context, httpAction) (*http.Response, []byte, error)
  114. }
  115. func newHTTPClientFactory(tr CancelableTransport, cr CheckRedirectFunc) httpClientFactory {
  116. return func(ep url.URL) httpClient {
  117. return &redirectFollowingHTTPClient{
  118. checkRedirect: cr,
  119. client: &simpleHTTPClient{
  120. transport: tr,
  121. endpoint: ep,
  122. },
  123. }
  124. }
  125. }
  126. type httpClientFactory func(url.URL) httpClient
  127. type httpAction interface {
  128. HTTPRequest(url.URL) *http.Request
  129. }
  130. type httpClusterClient struct {
  131. clientFactory httpClientFactory
  132. endpoints []url.URL
  133. sync.RWMutex
  134. }
  135. func (c *httpClusterClient) reset(eps []string) error {
  136. if len(eps) == 0 {
  137. return ErrNoEndpoints
  138. }
  139. neps := make([]url.URL, len(eps))
  140. for i, ep := range eps {
  141. u, err := url.Parse(ep)
  142. if err != nil {
  143. return err
  144. }
  145. neps[i] = *u
  146. }
  147. c.endpoints = neps
  148. return nil
  149. }
  150. func (c *httpClusterClient) Do(ctx context.Context, act httpAction) (resp *http.Response, body []byte, err error) {
  151. c.RLock()
  152. leps := len(c.endpoints)
  153. eps := make([]url.URL, leps)
  154. n := copy(eps, c.endpoints)
  155. c.RUnlock()
  156. if leps == 0 {
  157. err = ErrNoEndpoints
  158. return
  159. }
  160. if leps != n {
  161. err = errors.New("unable to pick endpoint: copy failed")
  162. return
  163. }
  164. for _, ep := range eps {
  165. hc := c.clientFactory(ep)
  166. resp, body, err = hc.Do(ctx, act)
  167. if err != nil {
  168. if err == context.DeadlineExceeded || err == context.Canceled {
  169. return nil, nil, err
  170. }
  171. continue
  172. }
  173. if resp.StatusCode/100 == 5 {
  174. continue
  175. }
  176. break
  177. }
  178. return
  179. }
  180. func (c *httpClusterClient) Endpoints() []string {
  181. c.RLock()
  182. defer c.RUnlock()
  183. eps := make([]string, len(c.endpoints))
  184. for i, ep := range c.endpoints {
  185. eps[i] = ep.String()
  186. }
  187. return eps
  188. }
  189. func (c *httpClusterClient) Sync(ctx context.Context) error {
  190. mAPI := NewMembersAPI(c)
  191. ms, err := mAPI.List(ctx)
  192. if err != nil {
  193. return err
  194. }
  195. c.Lock()
  196. defer c.Unlock()
  197. eps := make([]string, 0)
  198. for _, m := range ms {
  199. eps = append(eps, m.ClientURLs...)
  200. }
  201. return c.reset(eps)
  202. }
  203. type roundTripResponse struct {
  204. resp *http.Response
  205. err error
  206. }
  207. type simpleHTTPClient struct {
  208. transport CancelableTransport
  209. endpoint url.URL
  210. }
  211. func (c *simpleHTTPClient) Do(ctx context.Context, act httpAction) (*http.Response, []byte, error) {
  212. req := act.HTTPRequest(c.endpoint)
  213. rtchan := make(chan roundTripResponse, 1)
  214. go func() {
  215. resp, err := c.transport.RoundTrip(req)
  216. rtchan <- roundTripResponse{resp: resp, err: err}
  217. close(rtchan)
  218. }()
  219. var resp *http.Response
  220. var err error
  221. select {
  222. case rtresp := <-rtchan:
  223. resp, err = rtresp.resp, rtresp.err
  224. case <-ctx.Done():
  225. c.transport.CancelRequest(req)
  226. // wait for request to actually exit before continuing
  227. <-rtchan
  228. err = ctx.Err()
  229. }
  230. // always check for resp nil-ness to deal with possible
  231. // race conditions between channels above
  232. defer func() {
  233. if resp != nil {
  234. resp.Body.Close()
  235. }
  236. }()
  237. if err != nil {
  238. return nil, nil, err
  239. }
  240. body, err := ioutil.ReadAll(resp.Body)
  241. return resp, body, err
  242. }
  243. type redirectFollowingHTTPClient struct {
  244. client httpClient
  245. checkRedirect CheckRedirectFunc
  246. }
  247. func (r *redirectFollowingHTTPClient) Do(ctx context.Context, act httpAction) (*http.Response, []byte, error) {
  248. for i := 0; ; i++ {
  249. if i > 0 {
  250. if err := r.checkRedirect(i); err != nil {
  251. return nil, nil, err
  252. }
  253. }
  254. resp, body, err := r.client.Do(ctx, act)
  255. if err != nil {
  256. return nil, nil, err
  257. }
  258. if resp.StatusCode/100 == 3 {
  259. hdr := resp.Header.Get("Location")
  260. if hdr == "" {
  261. return nil, nil, fmt.Errorf("Location header not set")
  262. }
  263. loc, err := url.Parse(hdr)
  264. if err != nil {
  265. return nil, nil, fmt.Errorf("Location header not valid URL: %s", hdr)
  266. }
  267. act = &redirectedHTTPAction{
  268. action: act,
  269. location: *loc,
  270. }
  271. continue
  272. }
  273. return resp, body, nil
  274. }
  275. }
  276. type redirectedHTTPAction struct {
  277. action httpAction
  278. location url.URL
  279. }
  280. func (r *redirectedHTTPAction) HTTPRequest(ep url.URL) *http.Request {
  281. orig := r.action.HTTPRequest(ep)
  282. orig.URL = &r.location
  283. return orig
  284. }