keys.go 6.0 KB


  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package client
  14. import (
  15. "encoding/json"
  16. "errors"
  17. "fmt"
  18. "net/http"
  19. "net/url"
  20. "path"
  21. "strconv"
  22. "strings"
  23. "time"
  24. "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
  25. )
  26. var (
  27. DefaultV2KeysPrefix = "/v2/keys"
  28. )
  29. var (
  30. ErrUnavailable = errors.New("client: no available etcd endpoints")
  31. ErrNoLeader = errors.New("client: no leader")
  32. ErrKeyNoExist = errors.New("client: key does not exist")
  33. ErrKeyExists = errors.New("client: key already exists")
  34. )
  35. func NewKeysAPI(tr *http.Transport, ep string, to time.Duration) (*HTTPKeysAPI, error) {
  36. c, err := newHTTPClient(tr, ep, to)
  37. if err != nil {
  38. return nil, err
  39. }
  40. kAPI := HTTPKeysAPI{
  41. client: c,
  42. }
  43. return &kAPI, nil
  44. }
  45. type KeysAPI interface {
  46. Create(key, value string, ttl time.Duration) (*Response, error)
  47. Get(key string) (*Response, error)
  48. Watch(key string, idx uint64) Watcher
  49. RecursiveWatch(key string, idx uint64) Watcher
  50. }
  51. type Watcher interface {
  52. Next() (*Response, error)
  53. }
  54. type Response struct {
  55. Action string `json:"action"`
  56. Node *Node `json:"node"`
  57. PrevNode *Node `json:"prevNode"`
  58. }
  59. type Nodes []*Node
  60. type Node struct {
  61. Key string `json:"key"`
  62. Value string `json:"value"`
  63. Nodes Nodes `json:"nodes"`
  64. ModifiedIndex uint64 `json:"modifiedIndex"`
  65. CreatedIndex uint64 `json:"createdIndex"`
  66. }
  67. func (n *Node) String() string {
  68. return fmt.Sprintf("{Key: %s, CreatedIndex: %d, ModifiedIndex: %d}", n.Key, n.CreatedIndex, n.ModifiedIndex)
  69. }
  70. type HTTPKeysAPI struct {
  71. client *httpClient
  72. endpoint url.URL
  73. }
  74. func (k *HTTPKeysAPI) SetAPIPrefix(p string) {
  75. ep := k.endpoint
  76. ep.Path = path.Join(ep.Path, p)
  77. k.client.endpoint = ep
  78. }
  79. func (k *HTTPKeysAPI) Create(key, val string, ttl time.Duration) (*Response, error) {
  80. create := &createAction{
  81. Key: key,
  82. Value: val,
  83. }
  84. if ttl >= 0 {
  85. uttl := uint64(ttl.Seconds())
  86. create.TTL = &uttl
  87. }
  88. httpresp, body, err := k.client.doWithTimeout(create)
  89. if err != nil {
  90. return nil, err
  91. }
  92. return unmarshalHTTPResponse(httpresp.StatusCode, body)
  93. }
  94. func (k *HTTPKeysAPI) Get(key string) (*Response, error) {
  95. get := &getAction{
  96. Key: key,
  97. Recursive: false,
  98. }
  99. httpresp, body, err := k.client.doWithTimeout(get)
  100. if err != nil {
  101. return nil, err
  102. }
  103. return unmarshalHTTPResponse(httpresp.StatusCode, body)
  104. }
  105. func (k *HTTPKeysAPI) Watch(key string, idx uint64) Watcher {
  106. return &httpWatcher{
  107. client: k.client,
  108. nextWait: waitAction{
  109. Key: key,
  110. WaitIndex: idx,
  111. Recursive: false,
  112. },
  113. }
  114. }
  115. func (k *HTTPKeysAPI) RecursiveWatch(key string, idx uint64) Watcher {
  116. return &httpWatcher{
  117. client: k.client,
  118. nextWait: waitAction{
  119. Key: key,
  120. WaitIndex: idx,
  121. Recursive: true,
  122. },
  123. }
  124. }
  125. type httpWatcher struct {
  126. client *httpClient
  127. nextWait waitAction
  128. }
  129. func (hw *httpWatcher) Next() (*Response, error) {
  130. //TODO(bcwaldon): This needs to be cancellable by the calling user
  131. httpresp, body, err := hw.client.do(context.Background(), &hw.nextWait)
  132. if err != nil {
  133. return nil, err
  134. }
  135. resp, err := unmarshalHTTPResponse(httpresp.StatusCode, body)
  136. if err != nil {
  137. return nil, err
  138. }
  139. hw.nextWait.WaitIndex = resp.Node.ModifiedIndex + 1
  140. return resp, nil
  141. }
  142. // v2KeysURL forms a URL representing the location of a key. The provided
  143. // endpoint must be the root of the etcd keys API. For example, a valid
  144. // endpoint probably has the path "/v2/keys".
  145. func v2KeysURL(ep url.URL, key string) *url.URL {
  146. ep.Path = path.Join(ep.Path, key)
  147. return &ep
  148. }
  149. type getAction struct {
  150. Key string
  151. Recursive bool
  152. }
  153. func (g *getAction) httpRequest(ep url.URL) *http.Request {
  154. u := v2KeysURL(ep, g.Key)
  155. params := u.Query()
  156. params.Set("recursive", strconv.FormatBool(g.Recursive))
  157. u.RawQuery = params.Encode()
  158. req, _ := http.NewRequest("GET", u.String(), nil)
  159. return req
  160. }
  161. type waitAction struct {
  162. Key string
  163. WaitIndex uint64
  164. Recursive bool
  165. }
  166. func (w *waitAction) httpRequest(ep url.URL) *http.Request {
  167. u := v2KeysURL(ep, w.Key)
  168. params := u.Query()
  169. params.Set("wait", "true")
  170. params.Set("waitIndex", strconv.FormatUint(w.WaitIndex, 10))
  171. params.Set("recursive", strconv.FormatBool(w.Recursive))
  172. u.RawQuery = params.Encode()
  173. req, _ := http.NewRequest("GET", u.String(), nil)
  174. return req
  175. }
  176. type createAction struct {
  177. Key string
  178. Value string
  179. TTL *uint64
  180. }
  181. func (c *createAction) httpRequest(ep url.URL) *http.Request {
  182. u := v2KeysURL(ep, c.Key)
  183. params := u.Query()
  184. params.Set("prevExist", "false")
  185. u.RawQuery = params.Encode()
  186. form := url.Values{}
  187. form.Add("value", c.Value)
  188. if c.TTL != nil {
  189. form.Add("ttl", strconv.FormatUint(*c.TTL, 10))
  190. }
  191. body := strings.NewReader(form.Encode())
  192. req, _ := http.NewRequest("PUT", u.String(), body)
  193. req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
  194. return req
  195. }
  196. func unmarshalHTTPResponse(code int, body []byte) (res *Response, err error) {
  197. switch code {
  198. case http.StatusOK, http.StatusCreated:
  199. res, err = unmarshalSuccessfulResponse(body)
  200. default:
  201. err = unmarshalErrorResponse(code)
  202. }
  203. return
  204. }
  205. func unmarshalSuccessfulResponse(body []byte) (*Response, error) {
  206. var res Response
  207. err := json.Unmarshal(body, &res)
  208. if err != nil {
  209. return nil, err
  210. }
  211. return &res, nil
  212. }
  213. func unmarshalErrorResponse(code int) error {
  214. switch code {
  215. case http.StatusNotFound:
  216. return ErrKeyNoExist
  217. case http.StatusPreconditionFailed:
  218. return ErrKeyExists
  219. case http.StatusInternalServerError:
  220. // this isn't necessarily true
  221. return ErrNoLeader
  222. default:
  223. }
  224. return fmt.Errorf("unrecognized HTTP status code %d", code)
  225. }