http.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. package client
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io/ioutil"
  6. "net/http"
  7. "net/url"
  8. "path"
  9. "strconv"
  10. "strings"
  11. "time"
  12. "github.com/coreos/etcd/third_party/code.google.com/p/go.net/context"
  13. )
  14. const (
  15. v2Prefix = "/v2/keys"
  16. )
  17. // transport mimics http.Transport to provide an interface which can be
  18. // substituted for testing (since the RoundTripper interface alone does not
  19. // require the CancelRequest method)
  20. type transport interface {
  21. http.RoundTripper
  22. CancelRequest(req *http.Request)
  23. }
  24. type httpClient struct {
  25. transport transport
  26. endpoint url.URL
  27. timeout time.Duration
  28. }
  29. func NewHTTPClient(tr *http.Transport, ep string, timeout time.Duration) (*httpClient, error) {
  30. u, err := url.Parse(ep)
  31. if err != nil {
  32. return nil, err
  33. }
  34. c := &httpClient{
  35. transport: tr,
  36. endpoint: *u,
  37. timeout: timeout,
  38. }
  39. return c, nil
  40. }
  41. func (c *httpClient) Create(key, val string, ttl time.Duration) (*Response, error) {
  42. uintTTL := uint64(ttl.Seconds())
  43. create := &createAction{
  44. Key: key,
  45. Value: val,
  46. TTL: &uintTTL,
  47. }
  48. ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
  49. httpresp, body, err := c.do(ctx, create)
  50. cancel()
  51. if err != nil {
  52. return nil, err
  53. }
  54. return unmarshalHTTPResponse(httpresp.StatusCode, body)
  55. }
  56. func (c *httpClient) Get(key string) (*Response, error) {
  57. get := &getAction{
  58. Key: key,
  59. Recursive: false,
  60. }
  61. ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
  62. httpresp, body, err := c.do(ctx, get)
  63. cancel()
  64. if err != nil {
  65. return nil, err
  66. }
  67. return unmarshalHTTPResponse(httpresp.StatusCode, body)
  68. }
  69. type roundTripResponse struct {
  70. resp *http.Response
  71. err error
  72. }
  73. func (c *httpClient) do(ctx context.Context, act httpAction) (*http.Response, []byte, error) {
  74. req := act.httpRequest(c.endpoint)
  75. rtchan := make(chan roundTripResponse, 1)
  76. go func() {
  77. resp, err := c.transport.RoundTrip(req)
  78. rtchan <- roundTripResponse{resp: resp, err: err}
  79. close(rtchan)
  80. }()
  81. var resp *http.Response
  82. var err error
  83. select {
  84. case rtresp := <-rtchan:
  85. resp, err = rtresp.resp, rtresp.err
  86. case <-ctx.Done():
  87. c.transport.CancelRequest(req)
  88. // wait for request to actually exit before continuing
  89. <-rtchan
  90. err = ctx.Err()
  91. }
  92. // always check for resp nil-ness to deal with possible
  93. // race conditions between channels above
  94. defer func() {
  95. if resp != nil {
  96. resp.Body.Close()
  97. }
  98. }()
  99. if err != nil {
  100. return nil, nil, err
  101. }
  102. body, err := ioutil.ReadAll(resp.Body)
  103. return resp, body, err
  104. }
  105. func (c *httpClient) Watch(key string, idx uint64) Watcher {
  106. return &httpWatcher{
  107. httpClient: *c,
  108. nextWait: waitAction{
  109. Key: key,
  110. WaitIndex: idx,
  111. Recursive: false,
  112. },
  113. }
  114. }
  115. func (c *httpClient) RecursiveWatch(key string, idx uint64) Watcher {
  116. return &httpWatcher{
  117. httpClient: *c,
  118. nextWait: waitAction{
  119. Key: key,
  120. WaitIndex: idx,
  121. Recursive: true,
  122. },
  123. }
  124. }
  125. type httpWatcher struct {
  126. httpClient
  127. nextWait waitAction
  128. }
  129. func (hw *httpWatcher) Next() (*Response, error) {
  130. httpresp, body, err := hw.httpClient.do(context.Background(), &hw.nextWait)
  131. if err != nil {
  132. return nil, err
  133. }
  134. resp, err := unmarshalHTTPResponse(httpresp.StatusCode, body)
  135. if err != nil {
  136. return nil, err
  137. }
  138. hw.nextWait.WaitIndex = resp.Node.ModifiedIndex + 1
  139. return resp, nil
  140. }
  141. func v2URL(ep url.URL, key string) *url.URL {
  142. ep.Path = path.Join(ep.Path, v2Prefix, key)
  143. return &ep
  144. }
  145. type httpAction interface {
  146. httpRequest(url.URL) *http.Request
  147. }
  148. type getAction struct {
  149. Key string
  150. Recursive bool
  151. }
  152. func (g *getAction) httpRequest(ep url.URL) *http.Request {
  153. u := v2URL(ep, g.Key)
  154. params := u.Query()
  155. params.Set("recursive", strconv.FormatBool(g.Recursive))
  156. u.RawQuery = params.Encode()
  157. req, _ := http.NewRequest("GET", u.String(), nil)
  158. return req
  159. }
  160. type waitAction struct {
  161. Key string
  162. WaitIndex uint64
  163. Recursive bool
  164. }
  165. func (w *waitAction) httpRequest(ep url.URL) *http.Request {
  166. u := v2URL(ep, w.Key)
  167. params := u.Query()
  168. params.Set("wait", "true")
  169. params.Set("waitIndex", strconv.FormatUint(w.WaitIndex, 10))
  170. params.Set("recursive", strconv.FormatBool(w.Recursive))
  171. u.RawQuery = params.Encode()
  172. req, _ := http.NewRequest("GET", u.String(), nil)
  173. return req
  174. }
  175. type createAction struct {
  176. Key string
  177. Value string
  178. TTL *uint64
  179. }
  180. func (c *createAction) httpRequest(ep url.URL) *http.Request {
  181. u := v2URL(ep, c.Key)
  182. params := u.Query()
  183. params.Set("prevExist", "false")
  184. u.RawQuery = params.Encode()
  185. form := url.Values{}
  186. form.Add("value", c.Value)
  187. if c.TTL != nil {
  188. form.Add("ttl", strconv.FormatUint(*c.TTL, 10))
  189. }
  190. body := strings.NewReader(form.Encode())
  191. req, _ := http.NewRequest("PUT", u.String(), body)
  192. req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
  193. return req
  194. }
  195. func unmarshalHTTPResponse(code int, body []byte) (res *Response, err error) {
  196. switch code {
  197. case http.StatusOK, http.StatusCreated:
  198. res, err = unmarshalSuccessfulResponse(body)
  199. default:
  200. err = unmarshalErrorResponse(code)
  201. }
  202. return
  203. }
  204. func unmarshalSuccessfulResponse(body []byte) (*Response, error) {
  205. var res Response
  206. err := json.Unmarshal(body, &res)
  207. if err != nil {
  208. return nil, err
  209. }
  210. return &res, nil
  211. }
  212. func unmarshalErrorResponse(code int) error {
  213. switch code {
  214. case http.StatusNotFound:
  215. return ErrKeyNoExist
  216. case http.StatusPreconditionFailed:
  217. return ErrKeyExists
  218. case http.StatusInternalServerError:
  219. // this isn't necessarily true
  220. return ErrNoLeader
  221. default:
  222. }
  223. return fmt.Errorf("unrecognized HTTP status code %d", code)
  224. }