http.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. package client
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io/ioutil"
  6. "net/http"
  7. "net/url"
  8. "path"
  9. "strconv"
  10. "strings"
  11. "time"
  12. "github.com/coreos/etcd/third_party/code.google.com/p/go.net/context"
  13. )
  14. var (
  15. v2Prefix = "/v2/keys"
  16. )
  17. // transport mimics http.Transport to provide an interface which can be
  18. // substituted for testing (since the RoundTripper interface alone does not
  19. // require the CancelRequest method)
  20. type transport interface {
  21. http.RoundTripper
  22. CancelRequest(req *http.Request)
  23. }
  24. type httpClient struct {
  25. transport transport
  26. endpoint url.URL
  27. timeout time.Duration
  28. }
  29. func NewHTTPClient(tr *http.Transport, ep string, timeout time.Duration) (*httpClient, error) {
  30. u, err := url.Parse(ep)
  31. if err != nil {
  32. return nil, err
  33. }
  34. c := &httpClient{
  35. transport: tr,
  36. endpoint: *u,
  37. timeout: timeout,
  38. }
  39. return c, nil
  40. }
  41. func (c *httpClient) SetPrefix(p string) {
  42. v2Prefix = p
  43. }
  44. func (c *httpClient) Create(key, val string, ttl time.Duration) (*Response, error) {
  45. create := &createAction{
  46. Key: key,
  47. Value: val,
  48. }
  49. if ttl >= 0 {
  50. uttl := uint64(ttl.Seconds())
  51. create.TTL = &uttl
  52. }
  53. ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
  54. httpresp, body, err := c.do(ctx, create)
  55. cancel()
  56. if err != nil {
  57. return nil, err
  58. }
  59. return unmarshalHTTPResponse(httpresp.StatusCode, body)
  60. }
  61. func (c *httpClient) Get(key string) (*Response, error) {
  62. get := &getAction{
  63. Key: key,
  64. Recursive: false,
  65. }
  66. ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
  67. httpresp, body, err := c.do(ctx, get)
  68. cancel()
  69. if err != nil {
  70. return nil, err
  71. }
  72. return unmarshalHTTPResponse(httpresp.StatusCode, body)
  73. }
  74. type roundTripResponse struct {
  75. resp *http.Response
  76. err error
  77. }
  78. func (c *httpClient) do(ctx context.Context, act httpAction) (*http.Response, []byte, error) {
  79. req := act.httpRequest(c.endpoint)
  80. rtchan := make(chan roundTripResponse, 1)
  81. go func() {
  82. resp, err := c.transport.RoundTrip(req)
  83. rtchan <- roundTripResponse{resp: resp, err: err}
  84. close(rtchan)
  85. }()
  86. var resp *http.Response
  87. var err error
  88. select {
  89. case rtresp := <-rtchan:
  90. resp, err = rtresp.resp, rtresp.err
  91. case <-ctx.Done():
  92. c.transport.CancelRequest(req)
  93. // wait for request to actually exit before continuing
  94. <-rtchan
  95. err = ctx.Err()
  96. }
  97. // always check for resp nil-ness to deal with possible
  98. // race conditions between channels above
  99. defer func() {
  100. if resp != nil {
  101. resp.Body.Close()
  102. }
  103. }()
  104. if err != nil {
  105. return nil, nil, err
  106. }
  107. body, err := ioutil.ReadAll(resp.Body)
  108. return resp, body, err
  109. }
  110. func (c *httpClient) Watch(key string, idx uint64) Watcher {
  111. return &httpWatcher{
  112. httpClient: *c,
  113. nextWait: waitAction{
  114. Key: key,
  115. WaitIndex: idx,
  116. Recursive: false,
  117. },
  118. }
  119. }
  120. func (c *httpClient) RecursiveWatch(key string, idx uint64) Watcher {
  121. return &httpWatcher{
  122. httpClient: *c,
  123. nextWait: waitAction{
  124. Key: key,
  125. WaitIndex: idx,
  126. Recursive: true,
  127. },
  128. }
  129. }
  130. type httpWatcher struct {
  131. httpClient
  132. nextWait waitAction
  133. }
  134. func (hw *httpWatcher) Next() (*Response, error) {
  135. httpresp, body, err := hw.httpClient.do(context.Background(), &hw.nextWait)
  136. if err != nil {
  137. return nil, err
  138. }
  139. resp, err := unmarshalHTTPResponse(httpresp.StatusCode, body)
  140. if err != nil {
  141. return nil, err
  142. }
  143. hw.nextWait.WaitIndex = resp.Node.ModifiedIndex + 1
  144. return resp, nil
  145. }
  146. func v2URL(ep url.URL, key string) *url.URL {
  147. ep.Path = path.Join(ep.Path, v2Prefix, key)
  148. return &ep
  149. }
  150. type httpAction interface {
  151. httpRequest(url.URL) *http.Request
  152. }
  153. type getAction struct {
  154. Key string
  155. Recursive bool
  156. }
  157. func (g *getAction) httpRequest(ep url.URL) *http.Request {
  158. u := v2URL(ep, g.Key)
  159. params := u.Query()
  160. params.Set("recursive", strconv.FormatBool(g.Recursive))
  161. u.RawQuery = params.Encode()
  162. req, _ := http.NewRequest("GET", u.String(), nil)
  163. return req
  164. }
  165. type waitAction struct {
  166. Key string
  167. WaitIndex uint64
  168. Recursive bool
  169. }
  170. func (w *waitAction) httpRequest(ep url.URL) *http.Request {
  171. u := v2URL(ep, w.Key)
  172. params := u.Query()
  173. params.Set("wait", "true")
  174. params.Set("waitIndex", strconv.FormatUint(w.WaitIndex, 10))
  175. params.Set("recursive", strconv.FormatBool(w.Recursive))
  176. u.RawQuery = params.Encode()
  177. req, _ := http.NewRequest("GET", u.String(), nil)
  178. return req
  179. }
  180. type createAction struct {
  181. Key string
  182. Value string
  183. TTL *uint64
  184. }
  185. func (c *createAction) httpRequest(ep url.URL) *http.Request {
  186. u := v2URL(ep, c.Key)
  187. params := u.Query()
  188. params.Set("prevExist", "false")
  189. u.RawQuery = params.Encode()
  190. form := url.Values{}
  191. form.Add("value", c.Value)
  192. if c.TTL != nil {
  193. form.Add("ttl", strconv.FormatUint(*c.TTL, 10))
  194. }
  195. body := strings.NewReader(form.Encode())
  196. req, _ := http.NewRequest("PUT", u.String(), body)
  197. req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
  198. return req
  199. }
  200. func unmarshalHTTPResponse(code int, body []byte) (res *Response, err error) {
  201. switch code {
  202. case http.StatusOK, http.StatusCreated:
  203. res, err = unmarshalSuccessfulResponse(body)
  204. default:
  205. err = unmarshalErrorResponse(code)
  206. }
  207. return
  208. }
  209. func unmarshalSuccessfulResponse(body []byte) (*Response, error) {
  210. var res Response
  211. err := json.Unmarshal(body, &res)
  212. if err != nil {
  213. return nil, err
  214. }
  215. return &res, nil
  216. }
  217. func unmarshalErrorResponse(code int) error {
  218. switch code {
  219. case http.StatusNotFound:
  220. return ErrKeyNoExist
  221. case http.StatusPreconditionFailed:
  222. return ErrKeyExists
  223. case http.StatusInternalServerError:
  224. // this isn't necessarily true
  225. return ErrNoLeader
  226. default:
  227. }
  228. return fmt.Errorf("unrecognized HTTP status code %d", code)
  229. }