http.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package client
  14. import (
  15. "encoding/json"
  16. "fmt"
  17. "io/ioutil"
  18. "net/http"
  19. "net/url"
  20. "path"
  21. "strconv"
  22. "strings"
  23. "time"
  24. "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
  25. )
  26. var (
  27. DefaultV2KeysPrefix = "/v2/keys"
  28. ErrTimeout = context.DeadlineExceeded
  29. )
  30. // transport mimics http.Transport to provide an interface which can be
  31. // substituted for testing (since the RoundTripper interface alone does not
  32. // require the CancelRequest method)
  33. type transport interface {
  34. http.RoundTripper
  35. CancelRequest(req *http.Request)
  36. }
  37. type httpClient struct {
  38. transport transport
  39. endpoint url.URL
  40. timeout time.Duration
  41. v2KeysPrefix string
  42. }
  43. func NewHTTPClient(tr *http.Transport, ep string, timeout time.Duration) (*httpClient, error) {
  44. u, err := url.Parse(ep)
  45. if err != nil {
  46. return nil, err
  47. }
  48. c := &httpClient{
  49. transport: tr,
  50. endpoint: *u,
  51. timeout: timeout,
  52. v2KeysPrefix: DefaultV2KeysPrefix,
  53. }
  54. return c, nil
  55. }
  56. func (c *httpClient) SetPrefix(p string) {
  57. c.v2KeysPrefix = p
  58. }
  59. func (c *httpClient) Endpoint() url.URL {
  60. ep := c.endpoint
  61. ep.Path = path.Join(ep.Path, c.v2KeysPrefix)
  62. return ep
  63. }
  64. func (c *httpClient) Create(key, val string, ttl time.Duration) (*Response, error) {
  65. create := &createAction{
  66. Key: key,
  67. Value: val,
  68. }
  69. if ttl >= 0 {
  70. uttl := uint64(ttl.Seconds())
  71. create.TTL = &uttl
  72. }
  73. ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
  74. httpresp, body, err := c.do(ctx, create)
  75. cancel()
  76. if err != nil {
  77. return nil, err
  78. }
  79. return unmarshalHTTPResponse(httpresp.StatusCode, body)
  80. }
  81. func (c *httpClient) Get(key string) (*Response, error) {
  82. get := &getAction{
  83. Key: key,
  84. Recursive: false,
  85. }
  86. ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
  87. httpresp, body, err := c.do(ctx, get)
  88. cancel()
  89. if err != nil {
  90. return nil, err
  91. }
  92. return unmarshalHTTPResponse(httpresp.StatusCode, body)
  93. }
  94. type roundTripResponse struct {
  95. resp *http.Response
  96. err error
  97. }
  98. func (c *httpClient) do(ctx context.Context, act httpAction) (*http.Response, []byte, error) {
  99. req := act.httpRequest(c.Endpoint())
  100. rtchan := make(chan roundTripResponse, 1)
  101. go func() {
  102. resp, err := c.transport.RoundTrip(req)
  103. rtchan <- roundTripResponse{resp: resp, err: err}
  104. close(rtchan)
  105. }()
  106. var resp *http.Response
  107. var err error
  108. select {
  109. case rtresp := <-rtchan:
  110. resp, err = rtresp.resp, rtresp.err
  111. case <-ctx.Done():
  112. c.transport.CancelRequest(req)
  113. // wait for request to actually exit before continuing
  114. <-rtchan
  115. err = ctx.Err()
  116. }
  117. // always check for resp nil-ness to deal with possible
  118. // race conditions between channels above
  119. defer func() {
  120. if resp != nil {
  121. resp.Body.Close()
  122. }
  123. }()
  124. if err != nil {
  125. return nil, nil, err
  126. }
  127. body, err := ioutil.ReadAll(resp.Body)
  128. return resp, body, err
  129. }
  130. func (c *httpClient) Watch(key string, idx uint64) Watcher {
  131. return &httpWatcher{
  132. httpClient: *c,
  133. nextWait: waitAction{
  134. Key: key,
  135. WaitIndex: idx,
  136. Recursive: false,
  137. },
  138. }
  139. }
  140. func (c *httpClient) RecursiveWatch(key string, idx uint64) Watcher {
  141. return &httpWatcher{
  142. httpClient: *c,
  143. nextWait: waitAction{
  144. Key: key,
  145. WaitIndex: idx,
  146. Recursive: true,
  147. },
  148. }
  149. }
  150. type httpWatcher struct {
  151. httpClient
  152. nextWait waitAction
  153. }
  154. func (hw *httpWatcher) Next() (*Response, error) {
  155. httpresp, body, err := hw.httpClient.do(context.Background(), &hw.nextWait)
  156. if err != nil {
  157. return nil, err
  158. }
  159. resp, err := unmarshalHTTPResponse(httpresp.StatusCode, body)
  160. if err != nil {
  161. return nil, err
  162. }
  163. hw.nextWait.WaitIndex = resp.Node.ModifiedIndex + 1
  164. return resp, nil
  165. }
  166. // v2KeysURL forms a URL representing the location of a key. The provided
  167. // endpoint must be the root of the etcd keys API. For example, a valid
  168. // endpoint probably has the path "/v2/keys".
  169. func v2KeysURL(ep url.URL, key string) *url.URL {
  170. ep.Path = path.Join(ep.Path, key)
  171. return &ep
  172. }
  173. type httpAction interface {
  174. httpRequest(url.URL) *http.Request
  175. }
  176. type getAction struct {
  177. Key string
  178. Recursive bool
  179. }
  180. func (g *getAction) httpRequest(ep url.URL) *http.Request {
  181. u := v2KeysURL(ep, g.Key)
  182. params := u.Query()
  183. params.Set("recursive", strconv.FormatBool(g.Recursive))
  184. u.RawQuery = params.Encode()
  185. req, _ := http.NewRequest("GET", u.String(), nil)
  186. return req
  187. }
  188. type waitAction struct {
  189. Key string
  190. WaitIndex uint64
  191. Recursive bool
  192. }
  193. func (w *waitAction) httpRequest(ep url.URL) *http.Request {
  194. u := v2KeysURL(ep, w.Key)
  195. params := u.Query()
  196. params.Set("wait", "true")
  197. params.Set("waitIndex", strconv.FormatUint(w.WaitIndex, 10))
  198. params.Set("recursive", strconv.FormatBool(w.Recursive))
  199. u.RawQuery = params.Encode()
  200. req, _ := http.NewRequest("GET", u.String(), nil)
  201. return req
  202. }
  203. type createAction struct {
  204. Key string
  205. Value string
  206. TTL *uint64
  207. }
  208. func (c *createAction) httpRequest(ep url.URL) *http.Request {
  209. u := v2KeysURL(ep, c.Key)
  210. params := u.Query()
  211. params.Set("prevExist", "false")
  212. u.RawQuery = params.Encode()
  213. form := url.Values{}
  214. form.Add("value", c.Value)
  215. if c.TTL != nil {
  216. form.Add("ttl", strconv.FormatUint(*c.TTL, 10))
  217. }
  218. body := strings.NewReader(form.Encode())
  219. req, _ := http.NewRequest("PUT", u.String(), body)
  220. req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
  221. return req
  222. }
  223. func unmarshalHTTPResponse(code int, body []byte) (res *Response, err error) {
  224. switch code {
  225. case http.StatusOK, http.StatusCreated:
  226. res, err = unmarshalSuccessfulResponse(body)
  227. default:
  228. err = unmarshalErrorResponse(code)
  229. }
  230. return
  231. }
  232. func unmarshalSuccessfulResponse(body []byte) (*Response, error) {
  233. var res Response
  234. err := json.Unmarshal(body, &res)
  235. if err != nil {
  236. return nil, err
  237. }
  238. return &res, nil
  239. }
  240. func unmarshalErrorResponse(code int) error {
  241. switch code {
  242. case http.StatusNotFound:
  243. return ErrKeyNoExist
  244. case http.StatusPreconditionFailed:
  245. return ErrKeyExists
  246. case http.StatusInternalServerError:
  247. // this isn't necessarily true
  248. return ErrNoLeader
  249. default:
  250. }
  251. return fmt.Errorf("unrecognized HTTP status code %d", code)
  252. }