keys.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286
  1. // Copyright 2015 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package client
  15. import (
  16. "encoding/json"
  17. "errors"
  18. "fmt"
  19. "net/http"
  20. "net/url"
  21. "path"
  22. "strconv"
  23. "strings"
  24. "time"
  25. "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
  26. )
  27. var (
  28. DefaultV2KeysPrefix = "/v2/keys"
  29. )
  30. var (
  31. ErrUnavailable = errors.New("client: no available etcd endpoints")
  32. ErrNoLeader = errors.New("client: no leader")
  33. ErrKeyNoExist = errors.New("client: key does not exist")
  34. ErrKeyExists = errors.New("client: key already exists")
  35. )
  36. func NewKeysAPI(c HTTPClient) KeysAPI {
  37. return &httpKeysAPI{
  38. client: c,
  39. prefix: DefaultV2KeysPrefix,
  40. }
  41. }
  42. func NewDiscoveryKeysAPI(c HTTPClient) KeysAPI {
  43. return &httpKeysAPI{
  44. client: c,
  45. prefix: "",
  46. }
  47. }
  48. type KeysAPI interface {
  49. Create(ctx context.Context, key, value string, ttl time.Duration) (*Response, error)
  50. Get(ctx context.Context, key string) (*Response, error)
  51. Watch(key string, idx uint64) Watcher
  52. RecursiveWatch(key string, idx uint64) Watcher
  53. }
  54. type Watcher interface {
  55. Next(context.Context) (*Response, error)
  56. }
  57. type Response struct {
  58. Action string `json:"action"`
  59. Node *Node `json:"node"`
  60. PrevNode *Node `json:"prevNode"`
  61. Index uint64
  62. }
  63. type Nodes []*Node
  64. type Node struct {
  65. Key string `json:"key"`
  66. Value string `json:"value"`
  67. Nodes Nodes `json:"nodes"`
  68. ModifiedIndex uint64 `json:"modifiedIndex"`
  69. CreatedIndex uint64 `json:"createdIndex"`
  70. }
  71. func (n *Node) String() string {
  72. return fmt.Sprintf("{Key: %s, CreatedIndex: %d, ModifiedIndex: %d}", n.Key, n.CreatedIndex, n.ModifiedIndex)
  73. }
  74. type httpKeysAPI struct {
  75. client HTTPClient
  76. prefix string
  77. }
  78. func (k *httpKeysAPI) Create(ctx context.Context, key, val string, ttl time.Duration) (*Response, error) {
  79. create := &createAction{
  80. Prefix: k.prefix,
  81. Key: key,
  82. Value: val,
  83. }
  84. if ttl >= 0 {
  85. uttl := uint64(ttl.Seconds())
  86. create.TTL = &uttl
  87. }
  88. resp, body, err := k.client.Do(ctx, create)
  89. if err != nil {
  90. return nil, err
  91. }
  92. return unmarshalHTTPResponse(resp.StatusCode, resp.Header, body)
  93. }
  94. func (k *httpKeysAPI) Get(ctx context.Context, key string) (*Response, error) {
  95. get := &getAction{
  96. Prefix: k.prefix,
  97. Key: key,
  98. Recursive: false,
  99. }
  100. resp, body, err := k.client.Do(ctx, get)
  101. if err != nil {
  102. return nil, err
  103. }
  104. return unmarshalHTTPResponse(resp.StatusCode, resp.Header, body)
  105. }
  106. func (k *httpKeysAPI) Watch(key string, idx uint64) Watcher {
  107. return &httpWatcher{
  108. client: k.client,
  109. nextWait: waitAction{
  110. Prefix: k.prefix,
  111. Key: key,
  112. WaitIndex: idx,
  113. Recursive: false,
  114. },
  115. }
  116. }
  117. func (k *httpKeysAPI) RecursiveWatch(key string, idx uint64) Watcher {
  118. return &httpWatcher{
  119. client: k.client,
  120. nextWait: waitAction{
  121. Prefix: k.prefix,
  122. Key: key,
  123. WaitIndex: idx,
  124. Recursive: true,
  125. },
  126. }
  127. }
  128. type httpWatcher struct {
  129. client HTTPClient
  130. nextWait waitAction
  131. }
  132. func (hw *httpWatcher) Next(ctx context.Context) (*Response, error) {
  133. httpresp, body, err := hw.client.Do(ctx, &hw.nextWait)
  134. if err != nil {
  135. return nil, err
  136. }
  137. resp, err := unmarshalHTTPResponse(httpresp.StatusCode, httpresp.Header, body)
  138. if err != nil {
  139. return nil, err
  140. }
  141. hw.nextWait.WaitIndex = resp.Node.ModifiedIndex + 1
  142. return resp, nil
  143. }
  144. // v2KeysURL forms a URL representing the location of a key.
  145. // The endpoint argument represents the base URL of an etcd
  146. // server. The prefix is the path needed to route from the
  147. // provided endpoint's path to the root of the keys API
  148. // (typically "/v2/keys").
  149. func v2KeysURL(ep url.URL, prefix, key string) *url.URL {
  150. ep.Path = path.Join(ep.Path, prefix, key)
  151. return &ep
  152. }
  153. type getAction struct {
  154. Prefix string
  155. Key string
  156. Recursive bool
  157. }
  158. func (g *getAction) HTTPRequest(ep url.URL) *http.Request {
  159. u := v2KeysURL(ep, g.Prefix, g.Key)
  160. params := u.Query()
  161. params.Set("recursive", strconv.FormatBool(g.Recursive))
  162. u.RawQuery = params.Encode()
  163. req, _ := http.NewRequest("GET", u.String(), nil)
  164. return req
  165. }
  166. type waitAction struct {
  167. Prefix string
  168. Key string
  169. WaitIndex uint64
  170. Recursive bool
  171. }
  172. func (w *waitAction) HTTPRequest(ep url.URL) *http.Request {
  173. u := v2KeysURL(ep, w.Prefix, w.Key)
  174. params := u.Query()
  175. params.Set("wait", "true")
  176. params.Set("waitIndex", strconv.FormatUint(w.WaitIndex, 10))
  177. params.Set("recursive", strconv.FormatBool(w.Recursive))
  178. u.RawQuery = params.Encode()
  179. req, _ := http.NewRequest("GET", u.String(), nil)
  180. return req
  181. }
  182. type createAction struct {
  183. Prefix string
  184. Key string
  185. Value string
  186. TTL *uint64
  187. }
  188. func (c *createAction) HTTPRequest(ep url.URL) *http.Request {
  189. u := v2KeysURL(ep, c.Prefix, c.Key)
  190. params := u.Query()
  191. params.Set("prevExist", "false")
  192. u.RawQuery = params.Encode()
  193. form := url.Values{}
  194. form.Add("value", c.Value)
  195. if c.TTL != nil {
  196. form.Add("ttl", strconv.FormatUint(*c.TTL, 10))
  197. }
  198. body := strings.NewReader(form.Encode())
  199. req, _ := http.NewRequest("PUT", u.String(), body)
  200. req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
  201. return req
  202. }
  203. func unmarshalHTTPResponse(code int, header http.Header, body []byte) (res *Response, err error) {
  204. switch code {
  205. case http.StatusOK, http.StatusCreated:
  206. res, err = unmarshalSuccessfulResponse(header, body)
  207. default:
  208. err = unmarshalErrorResponse(code)
  209. }
  210. return
  211. }
  212. func unmarshalSuccessfulResponse(header http.Header, body []byte) (*Response, error) {
  213. var res Response
  214. err := json.Unmarshal(body, &res)
  215. if err != nil {
  216. return nil, err
  217. }
  218. if header.Get("X-Etcd-Index") != "" {
  219. res.Index, err = strconv.ParseUint(header.Get("X-Etcd-Index"), 10, 64)
  220. }
  221. if err != nil {
  222. return nil, err
  223. }
  224. return &res, nil
  225. }
  226. func unmarshalErrorResponse(code int) error {
  227. switch code {
  228. case http.StatusNotFound:
  229. return ErrKeyNoExist
  230. case http.StatusPreconditionFailed:
  231. return ErrKeyExists
  232. case http.StatusInternalServerError:
  233. // this isn't necessarily true
  234. return ErrNoLeader
  235. case http.StatusGatewayTimeout:
  236. return ErrTimeout
  237. default:
  238. }
  239. return fmt.Errorf("unrecognized HTTP status code %d", code)
  240. }