http.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283
  1. package client
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io/ioutil"
  6. "net/http"
  7. "net/url"
  8. "path"
  9. "strconv"
  10. "strings"
  11. "time"
  12. "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
  13. )
  14. var (
  15. v2Prefix = "/v2/keys"
  16. ErrTimeout = context.DeadlineExceeded
  17. )
  18. // transport mimics http.Transport to provide an interface which can be
  19. // substituted for testing (since the RoundTripper interface alone does not
  20. // require the CancelRequest method)
  21. type transport interface {
  22. http.RoundTripper
  23. CancelRequest(req *http.Request)
  24. }
  25. type httpClient struct {
  26. transport transport
  27. endpoint url.URL
  28. timeout time.Duration
  29. }
  30. func NewHTTPClient(tr *http.Transport, ep string, timeout time.Duration) (*httpClient, error) {
  31. u, err := url.Parse(ep)
  32. if err != nil {
  33. return nil, err
  34. }
  35. c := &httpClient{
  36. transport: tr,
  37. endpoint: *u,
  38. timeout: timeout,
  39. }
  40. return c, nil
  41. }
  42. func (c *httpClient) SetPrefix(p string) {
  43. v2Prefix = p
  44. }
  45. func (c *httpClient) Create(key, val string, ttl time.Duration) (*Response, error) {
  46. create := &createAction{
  47. Key: key,
  48. Value: val,
  49. }
  50. if ttl >= 0 {
  51. uttl := uint64(ttl.Seconds())
  52. create.TTL = &uttl
  53. }
  54. ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
  55. httpresp, body, err := c.do(ctx, create)
  56. cancel()
  57. if err != nil {
  58. return nil, err
  59. }
  60. return unmarshalHTTPResponse(httpresp.StatusCode, body)
  61. }
  62. func (c *httpClient) Get(key string) (*Response, error) {
  63. get := &getAction{
  64. Key: key,
  65. Recursive: false,
  66. }
  67. ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
  68. httpresp, body, err := c.do(ctx, get)
  69. cancel()
  70. if err != nil {
  71. return nil, err
  72. }
  73. return unmarshalHTTPResponse(httpresp.StatusCode, body)
  74. }
  75. type roundTripResponse struct {
  76. resp *http.Response
  77. err error
  78. }
  79. func (c *httpClient) do(ctx context.Context, act httpAction) (*http.Response, []byte, error) {
  80. req := act.httpRequest(c.endpoint)
  81. rtchan := make(chan roundTripResponse, 1)
  82. go func() {
  83. resp, err := c.transport.RoundTrip(req)
  84. rtchan <- roundTripResponse{resp: resp, err: err}
  85. close(rtchan)
  86. }()
  87. var resp *http.Response
  88. var err error
  89. select {
  90. case rtresp := <-rtchan:
  91. resp, err = rtresp.resp, rtresp.err
  92. case <-ctx.Done():
  93. c.transport.CancelRequest(req)
  94. // wait for request to actually exit before continuing
  95. <-rtchan
  96. err = ctx.Err()
  97. }
  98. // always check for resp nil-ness to deal with possible
  99. // race conditions between channels above
  100. defer func() {
  101. if resp != nil {
  102. resp.Body.Close()
  103. }
  104. }()
  105. if err != nil {
  106. return nil, nil, err
  107. }
  108. body, err := ioutil.ReadAll(resp.Body)
  109. return resp, body, err
  110. }
  111. func (c *httpClient) Watch(key string, idx uint64) Watcher {
  112. return &httpWatcher{
  113. httpClient: *c,
  114. nextWait: waitAction{
  115. Key: key,
  116. WaitIndex: idx,
  117. Recursive: false,
  118. },
  119. }
  120. }
  121. func (c *httpClient) RecursiveWatch(key string, idx uint64) Watcher {
  122. return &httpWatcher{
  123. httpClient: *c,
  124. nextWait: waitAction{
  125. Key: key,
  126. WaitIndex: idx,
  127. Recursive: true,
  128. },
  129. }
  130. }
  131. type httpWatcher struct {
  132. httpClient
  133. nextWait waitAction
  134. }
  135. func (hw *httpWatcher) Next() (*Response, error) {
  136. httpresp, body, err := hw.httpClient.do(context.Background(), &hw.nextWait)
  137. if err != nil {
  138. return nil, err
  139. }
  140. resp, err := unmarshalHTTPResponse(httpresp.StatusCode, body)
  141. if err != nil {
  142. return nil, err
  143. }
  144. hw.nextWait.WaitIndex = resp.Node.ModifiedIndex + 1
  145. return resp, nil
  146. }
  147. func v2URL(ep url.URL, key string) *url.URL {
  148. ep.Path = path.Join(ep.Path, v2Prefix, key)
  149. return &ep
  150. }
  151. type httpAction interface {
  152. httpRequest(url.URL) *http.Request
  153. }
  154. type getAction struct {
  155. Key string
  156. Recursive bool
  157. }
  158. func (g *getAction) httpRequest(ep url.URL) *http.Request {
  159. u := v2URL(ep, g.Key)
  160. params := u.Query()
  161. params.Set("recursive", strconv.FormatBool(g.Recursive))
  162. u.RawQuery = params.Encode()
  163. req, _ := http.NewRequest("GET", u.String(), nil)
  164. return req
  165. }
  166. type waitAction struct {
  167. Key string
  168. WaitIndex uint64
  169. Recursive bool
  170. }
  171. func (w *waitAction) httpRequest(ep url.URL) *http.Request {
  172. u := v2URL(ep, w.Key)
  173. params := u.Query()
  174. params.Set("wait", "true")
  175. params.Set("waitIndex", strconv.FormatUint(w.WaitIndex, 10))
  176. params.Set("recursive", strconv.FormatBool(w.Recursive))
  177. u.RawQuery = params.Encode()
  178. req, _ := http.NewRequest("GET", u.String(), nil)
  179. return req
  180. }
  181. type createAction struct {
  182. Key string
  183. Value string
  184. TTL *uint64
  185. }
  186. func (c *createAction) httpRequest(ep url.URL) *http.Request {
  187. u := v2URL(ep, c.Key)
  188. params := u.Query()
  189. params.Set("prevExist", "false")
  190. u.RawQuery = params.Encode()
  191. form := url.Values{}
  192. form.Add("value", c.Value)
  193. if c.TTL != nil {
  194. form.Add("ttl", strconv.FormatUint(*c.TTL, 10))
  195. }
  196. body := strings.NewReader(form.Encode())
  197. req, _ := http.NewRequest("PUT", u.String(), body)
  198. req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
  199. return req
  200. }
  201. func unmarshalHTTPResponse(code int, body []byte) (res *Response, err error) {
  202. switch code {
  203. case http.StatusOK, http.StatusCreated:
  204. res, err = unmarshalSuccessfulResponse(body)
  205. default:
  206. err = unmarshalErrorResponse(code)
  207. }
  208. return
  209. }
  210. func unmarshalSuccessfulResponse(body []byte) (*Response, error) {
  211. var res Response
  212. err := json.Unmarshal(body, &res)
  213. if err != nil {
  214. return nil, err
  215. }
  216. return &res, nil
  217. }
  218. func unmarshalErrorResponse(code int) error {
  219. switch code {
  220. case http.StatusNotFound:
  221. return ErrKeyNoExist
  222. case http.StatusPreconditionFailed:
  223. return ErrKeyExists
  224. case http.StatusInternalServerError:
  225. // this isn't necessarily true
  226. return ErrNoLeader
  227. default:
  228. }
  229. return fmt.Errorf("unrecognized HTTP status code %d", code)
  230. }