key.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. // Copyright 2016 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package recipe
  15. import (
  16. "fmt"
  17. "strings"
  18. "time"
  19. "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
  20. "github.com/coreos/etcd/clientv3"
  21. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  22. "github.com/coreos/etcd/lease"
  23. )
  24. // Key is a key/revision pair created by the client and stored on etcd
  25. type RemoteKV struct {
  26. client *clientv3.Client
  27. key string
  28. rev int64
  29. val string
  30. }
  31. func NewKey(client *clientv3.Client, key string, leaseID lease.LeaseID) (*RemoteKV, error) {
  32. return NewKV(client, key, "", leaseID)
  33. }
  34. func NewKV(client *clientv3.Client, key, val string, leaseID lease.LeaseID) (*RemoteKV, error) {
  35. rev, err := putNewKV(client, key, val, leaseID)
  36. if err != nil {
  37. return nil, err
  38. }
  39. return &RemoteKV{client, key, rev, val}, nil
  40. }
  41. func GetRemoteKV(client *clientv3.Client, key string) (*RemoteKV, error) {
  42. resp, err := client.KV.Range(
  43. context.TODO(),
  44. &pb.RangeRequest{Key: []byte(key)},
  45. )
  46. if err != nil {
  47. return nil, err
  48. }
  49. rev := resp.Header.Revision
  50. val := ""
  51. if len(resp.Kvs) > 0 {
  52. rev = resp.Kvs[0].ModRevision
  53. val = string(resp.Kvs[0].Value)
  54. }
  55. return &RemoteKV{
  56. client: client,
  57. key: key,
  58. rev: rev,
  59. val: val}, nil
  60. }
  61. func NewUniqueKey(client *clientv3.Client, prefix string) (*RemoteKV, error) {
  62. return NewUniqueKV(client, prefix, "", 0)
  63. }
  64. func NewUniqueKV(client *clientv3.Client, prefix string, val string, leaseID lease.LeaseID) (*RemoteKV, error) {
  65. for {
  66. newKey := fmt.Sprintf("%s/%v", prefix, time.Now().UnixNano())
  67. rev, err := putNewKV(client, newKey, val, 0)
  68. if err == nil {
  69. return &RemoteKV{client, newKey, rev, val}, nil
  70. }
  71. if err != ErrKeyExists {
  72. return nil, err
  73. }
  74. }
  75. }
  76. // putNewKV attempts to create the given key, only succeeding if the key did
  77. // not yet exist.
  78. func putNewKV(ec *clientv3.Client, key, val string, leaseID lease.LeaseID) (int64, error) {
  79. cmp := &pb.Compare{
  80. Result: pb.Compare_EQUAL,
  81. Target: pb.Compare_VERSION,
  82. Key: []byte(key),
  83. TargetUnion: &pb.Compare_Version{Version: 0}}
  84. req := &pb.RequestUnion{
  85. Request: &pb.RequestUnion_RequestPut{
  86. RequestPut: &pb.PutRequest{
  87. Key: []byte(key),
  88. Value: []byte(val),
  89. Lease: int64(leaseID)}}}
  90. txnresp, err := ec.KV.Txn(
  91. context.TODO(),
  92. &pb.TxnRequest{[]*pb.Compare{cmp}, []*pb.RequestUnion{req}, nil})
  93. if err != nil {
  94. return 0, err
  95. }
  96. if txnresp.Succeeded == false {
  97. return 0, ErrKeyExists
  98. }
  99. return txnresp.Header.Revision, nil
  100. }
  101. // NewSequentialKV allocates a new sequential key-value pair at <prefix>/nnnnn
  102. func NewSequentialKV(client *clientv3.Client, prefix, val string) (*RemoteKV, error) {
  103. return newSequentialKV(client, prefix, val, 0)
  104. }
  105. // newSequentialKV allocates a new sequential key <prefix>/nnnnn with a given
  106. // value and lease. Note: a bookkeeping node __<prefix> is also allocated.
  107. func newSequentialKV(client *clientv3.Client, prefix, val string, leaseID lease.LeaseID) (*RemoteKV, error) {
  108. resp, err := NewRange(client, prefix).LastKey()
  109. if err != nil {
  110. return nil, err
  111. }
  112. // add 1 to last key, if any
  113. newSeqNum := 0
  114. if len(resp.Kvs) != 0 {
  115. fields := strings.Split(string(resp.Kvs[0].Key), "/")
  116. _, err := fmt.Sscanf(fields[len(fields)-1], "%d", &newSeqNum)
  117. if err != nil {
  118. return nil, err
  119. }
  120. newSeqNum++
  121. }
  122. newKey := fmt.Sprintf("%s/%016d", prefix, newSeqNum)
  123. // base prefix key must be current (i.e., <=) with the server update;
  124. // the base key is important to avoid the following:
  125. // N1: LastKey() == 1, start txn.
  126. // N2: New Key 2, New Key 3, Delete Key 2
  127. // N1: txn succeeds allocating key 2 when it shouldn't
  128. baseKey := []byte("__" + prefix)
  129. cmp := &pb.Compare{
  130. Result: pb.Compare_LESS,
  131. Target: pb.Compare_MOD,
  132. Key: []byte(baseKey),
  133. // current revision might contain modification so +1
  134. TargetUnion: &pb.Compare_ModRevision{ModRevision: resp.Header.Revision + 1},
  135. }
  136. reqPrefix := &pb.RequestUnion{
  137. Request: &pb.RequestUnion_RequestPut{
  138. RequestPut: &pb.PutRequest{
  139. Key: baseKey,
  140. Lease: int64(leaseID),
  141. }}}
  142. reqNewKey := &pb.RequestUnion{
  143. Request: &pb.RequestUnion_RequestPut{
  144. RequestPut: &pb.PutRequest{
  145. Key: []byte(newKey),
  146. Value: []byte(val),
  147. Lease: int64(leaseID),
  148. }}}
  149. txnresp, err := client.KV.Txn(
  150. context.TODO(),
  151. &pb.TxnRequest{
  152. []*pb.Compare{cmp},
  153. []*pb.RequestUnion{reqPrefix, reqNewKey}, nil})
  154. if err != nil {
  155. return nil, err
  156. }
  157. if txnresp.Succeeded == false {
  158. return newSequentialKV(client, prefix, val, leaseID)
  159. }
  160. return &RemoteKV{client, newKey, txnresp.Header.Revision, val}, nil
  161. }
  162. func (rk *RemoteKV) Key() string { return rk.key }
  163. func (rk *RemoteKV) Revision() int64 { return rk.rev }
  164. func (rk *RemoteKV) Value() string { return rk.val }
  165. func (rk *RemoteKV) Delete() error {
  166. if rk.client == nil {
  167. return nil
  168. }
  169. req := &pb.DeleteRangeRequest{Key: []byte(rk.key)}
  170. _, err := rk.client.KV.DeleteRange(context.TODO(), req)
  171. rk.client = nil
  172. return err
  173. }
  174. func (rk *RemoteKV) Put(val string) error {
  175. req := &pb.PutRequest{Key: []byte(rk.key), Value: []byte(val)}
  176. _, err := rk.client.KV.Put(context.TODO(), req)
  177. return err
  178. }
  179. // EphemeralKV is a new key associated with a session lease
  180. type EphemeralKV struct{ RemoteKV }
  181. // NewEphemeralKV creates a new key/value pair associated with a session lease
  182. func NewEphemeralKV(client *clientv3.Client, key, val string) (*EphemeralKV, error) {
  183. leaseID, err := SessionLease(client)
  184. if err != nil {
  185. return nil, err
  186. }
  187. k, err := NewKV(client, key, val, leaseID)
  188. if err != nil {
  189. return nil, err
  190. }
  191. return &EphemeralKV{*k}, nil
  192. }
  193. // NewUniqueEphemeralKey creates a new unique valueless key associated with a session lease
  194. func NewUniqueEphemeralKey(client *clientv3.Client, prefix string) (*EphemeralKV, error) {
  195. return NewUniqueEphemeralKV(client, prefix, "")
  196. }
  197. // NewUniqueEphemeralKV creates a new unique key/value pair associated with a session lease
  198. func NewUniqueEphemeralKV(client *clientv3.Client, prefix, val string) (ek *EphemeralKV, err error) {
  199. for {
  200. newKey := fmt.Sprintf("%s/%v", prefix, time.Now().UnixNano())
  201. ek, err = NewEphemeralKV(client, newKey, val)
  202. if err == nil || err != ErrKeyExists {
  203. break
  204. }
  205. }
  206. return ek, err
  207. }