keys.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package client
  14. import (
  15. "encoding/json"
  16. "errors"
  17. "fmt"
  18. "net/http"
  19. "net/url"
  20. "path"
  21. "strconv"
  22. "strings"
  23. "time"
  24. "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
  25. )
  26. var (
  27. DefaultV2KeysPrefix = "/v2/keys"
  28. )
  29. var (
  30. ErrUnavailable = errors.New("client: no available etcd endpoints")
  31. ErrNoLeader = errors.New("client: no leader")
  32. ErrKeyNoExist = errors.New("client: key does not exist")
  33. ErrKeyExists = errors.New("client: key already exists")
  34. )
  35. func NewKeysAPI(c httpActionDo) KeysAPI {
  36. return &httpKeysAPI{
  37. client: c,
  38. prefix: DefaultV2KeysPrefix,
  39. }
  40. }
  41. func NewDiscoveryKeysAPI(c httpActionDo) KeysAPI {
  42. return &httpKeysAPI{
  43. client: c,
  44. prefix: "",
  45. }
  46. }
  47. type KeysAPI interface {
  48. Create(ctx context.Context, key, value string, ttl time.Duration) (*Response, error)
  49. Get(ctx context.Context, key string) (*Response, error)
  50. Watch(key string, idx uint64) Watcher
  51. RecursiveWatch(key string, idx uint64) Watcher
  52. }
  53. type Watcher interface {
  54. Next(context.Context) (*Response, error)
  55. }
  56. type Response struct {
  57. Action string `json:"action"`
  58. Node *Node `json:"node"`
  59. PrevNode *Node `json:"prevNode"`
  60. }
  61. type Nodes []*Node
  62. type Node struct {
  63. Key string `json:"key"`
  64. Value string `json:"value"`
  65. Nodes Nodes `json:"nodes"`
  66. ModifiedIndex uint64 `json:"modifiedIndex"`
  67. CreatedIndex uint64 `json:"createdIndex"`
  68. }
  69. func (n *Node) String() string {
  70. return fmt.Sprintf("{Key: %s, CreatedIndex: %d, ModifiedIndex: %d}", n.Key, n.CreatedIndex, n.ModifiedIndex)
  71. }
  72. type httpKeysAPI struct {
  73. client httpActionDo
  74. prefix string
  75. }
  76. func (k *httpKeysAPI) Create(ctx context.Context, key, val string, ttl time.Duration) (*Response, error) {
  77. create := &createAction{
  78. Prefix: k.prefix,
  79. Key: key,
  80. Value: val,
  81. }
  82. if ttl >= 0 {
  83. uttl := uint64(ttl.Seconds())
  84. create.TTL = &uttl
  85. }
  86. resp, body, err := k.client.Do(ctx, create)
  87. if err != nil {
  88. return nil, err
  89. }
  90. return unmarshalHTTPResponse(resp.StatusCode, body)
  91. }
  92. func (k *httpKeysAPI) Get(ctx context.Context, key string) (*Response, error) {
  93. get := &getAction{
  94. Prefix: k.prefix,
  95. Key: key,
  96. Recursive: false,
  97. }
  98. resp, body, err := k.client.Do(ctx, get)
  99. if err != nil {
  100. return nil, err
  101. }
  102. return unmarshalHTTPResponse(resp.StatusCode, body)
  103. }
  104. func (k *httpKeysAPI) Watch(key string, idx uint64) Watcher {
  105. return &httpWatcher{
  106. client: k.client,
  107. nextWait: waitAction{
  108. Prefix: k.prefix,
  109. Key: key,
  110. WaitIndex: idx,
  111. Recursive: false,
  112. },
  113. }
  114. }
  115. func (k *httpKeysAPI) RecursiveWatch(key string, idx uint64) Watcher {
  116. return &httpWatcher{
  117. client: k.client,
  118. nextWait: waitAction{
  119. Prefix: k.prefix,
  120. Key: key,
  121. WaitIndex: idx,
  122. Recursive: true,
  123. },
  124. }
  125. }
  126. type httpWatcher struct {
  127. client httpActionDo
  128. nextWait waitAction
  129. }
  130. func (hw *httpWatcher) Next(ctx context.Context) (*Response, error) {
  131. httpresp, body, err := hw.client.Do(ctx, &hw.nextWait)
  132. if err != nil {
  133. return nil, err
  134. }
  135. resp, err := unmarshalHTTPResponse(httpresp.StatusCode, body)
  136. if err != nil {
  137. return nil, err
  138. }
  139. hw.nextWait.WaitIndex = resp.Node.ModifiedIndex + 1
  140. return resp, nil
  141. }
  142. // v2KeysURL forms a URL representing the location of a key.
  143. // The endpoint argument represents the base URL of an etcd
  144. // server. The prefix is the path needed to route from the
  145. // provided endpoint's path to the root of the keys API
  146. // (typically "/v2/keys").
  147. func v2KeysURL(ep url.URL, prefix, key string) *url.URL {
  148. ep.Path = path.Join(ep.Path, prefix, key)
  149. return &ep
  150. }
  151. type getAction struct {
  152. Prefix string
  153. Key string
  154. Recursive bool
  155. }
  156. func (g *getAction) HTTPRequest(ep url.URL) *http.Request {
  157. u := v2KeysURL(ep, g.Prefix, g.Key)
  158. params := u.Query()
  159. params.Set("recursive", strconv.FormatBool(g.Recursive))
  160. u.RawQuery = params.Encode()
  161. req, _ := http.NewRequest("GET", u.String(), nil)
  162. return req
  163. }
  164. type waitAction struct {
  165. Prefix string
  166. Key string
  167. WaitIndex uint64
  168. Recursive bool
  169. }
  170. func (w *waitAction) HTTPRequest(ep url.URL) *http.Request {
  171. u := v2KeysURL(ep, w.Prefix, w.Key)
  172. params := u.Query()
  173. params.Set("wait", "true")
  174. params.Set("waitIndex", strconv.FormatUint(w.WaitIndex, 10))
  175. params.Set("recursive", strconv.FormatBool(w.Recursive))
  176. u.RawQuery = params.Encode()
  177. req, _ := http.NewRequest("GET", u.String(), nil)
  178. return req
  179. }
  180. type createAction struct {
  181. Prefix string
  182. Key string
  183. Value string
  184. TTL *uint64
  185. }
  186. func (c *createAction) HTTPRequest(ep url.URL) *http.Request {
  187. u := v2KeysURL(ep, c.Prefix, c.Key)
  188. params := u.Query()
  189. params.Set("prevExist", "false")
  190. u.RawQuery = params.Encode()
  191. form := url.Values{}
  192. form.Add("value", c.Value)
  193. if c.TTL != nil {
  194. form.Add("ttl", strconv.FormatUint(*c.TTL, 10))
  195. }
  196. body := strings.NewReader(form.Encode())
  197. req, _ := http.NewRequest("PUT", u.String(), body)
  198. req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
  199. return req
  200. }
  201. func unmarshalHTTPResponse(code int, body []byte) (res *Response, err error) {
  202. switch code {
  203. case http.StatusOK, http.StatusCreated:
  204. res, err = unmarshalSuccessfulResponse(body)
  205. default:
  206. err = unmarshalErrorResponse(code)
  207. }
  208. return
  209. }
  210. func unmarshalSuccessfulResponse(body []byte) (*Response, error) {
  211. var res Response
  212. err := json.Unmarshal(body, &res)
  213. if err != nil {
  214. return nil, err
  215. }
  216. return &res, nil
  217. }
  218. func unmarshalErrorResponse(code int) error {
  219. switch code {
  220. case http.StatusNotFound:
  221. return ErrKeyNoExist
  222. case http.StatusPreconditionFailed:
  223. return ErrKeyExists
  224. case http.StatusInternalServerError:
  225. // this isn't necessarily true
  226. return ErrNoLeader
  227. default:
  228. }
  229. return fmt.Errorf("unrecognized HTTP status code %d", code)
  230. }