key.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. // Copyright 2016 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package recipe
  15. import (
  16. "fmt"
  17. "strings"
  18. "time"
  19. "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
  20. v3 "github.com/coreos/etcd/clientv3"
  21. "github.com/coreos/etcd/clientv3/concurrency"
  22. "github.com/coreos/etcd/lease"
  23. )
  24. // Key is a key/revision pair created by the client and stored on etcd
  25. type RemoteKV struct {
  26. kv v3.KV
  27. key string
  28. rev int64
  29. val string
  30. }
  31. func NewKey(kv v3.KV, key string, leaseID lease.LeaseID) (*RemoteKV, error) {
  32. return NewKV(kv, key, "", leaseID)
  33. }
  34. func NewKV(kv v3.KV, key, val string, leaseID lease.LeaseID) (*RemoteKV, error) {
  35. rev, err := putNewKV(kv, key, val, leaseID)
  36. if err != nil {
  37. return nil, err
  38. }
  39. return &RemoteKV{kv, key, rev, val}, nil
  40. }
  41. func GetRemoteKV(kv v3.KV, key string) (*RemoteKV, error) {
  42. resp, err := kv.Get(context.TODO(), key)
  43. if err != nil {
  44. return nil, err
  45. }
  46. rev := resp.Header.Revision
  47. val := ""
  48. if len(resp.Kvs) > 0 {
  49. rev = resp.Kvs[0].ModRevision
  50. val = string(resp.Kvs[0].Value)
  51. }
  52. return &RemoteKV{kv: kv, key: key, rev: rev, val: val}, nil
  53. }
  54. func NewUniqueKey(kv v3.KV, prefix string) (*RemoteKV, error) {
  55. return NewUniqueKV(kv, prefix, "", 0)
  56. }
  57. func NewUniqueKV(kv v3.KV, prefix string, val string, leaseID lease.LeaseID) (*RemoteKV, error) {
  58. for {
  59. newKey := fmt.Sprintf("%s/%v", prefix, time.Now().UnixNano())
  60. rev, err := putNewKV(kv, newKey, val, 0)
  61. if err == nil {
  62. return &RemoteKV{kv, newKey, rev, val}, nil
  63. }
  64. if err != ErrKeyExists {
  65. return nil, err
  66. }
  67. }
  68. }
  69. // putNewKV attempts to create the given key, only succeeding if the key did
  70. // not yet exist.
  71. func putNewKV(kv v3.KV, key, val string, leaseID lease.LeaseID) (int64, error) {
  72. cmp := v3.Compare(v3.Version(key), "=", 0)
  73. req := v3.OpPut(key, val, v3.WithLease(leaseID))
  74. txnresp, err := kv.Txn(context.TODO()).If(cmp).Then(req).Commit()
  75. if err != nil {
  76. return 0, err
  77. }
  78. if txnresp.Succeeded == false {
  79. return 0, ErrKeyExists
  80. }
  81. return txnresp.Header.Revision, nil
  82. }
  83. // NewSequentialKV allocates a new sequential key-value pair at <prefix>/nnnnn
  84. func NewSequentialKV(kv v3.KV, prefix, val string) (*RemoteKV, error) {
  85. return newSequentialKV(kv, prefix, val, 0)
  86. }
  87. // newSequentialKV allocates a new sequential key <prefix>/nnnnn with a given
  88. // value and lease. Note: a bookkeeping node __<prefix> is also allocated.
  89. func newSequentialKV(kv v3.KV, prefix, val string, leaseID lease.LeaseID) (*RemoteKV, error) {
  90. resp, err := kv.Get(context.TODO(), prefix, v3.WithLastKey()...)
  91. if err != nil {
  92. return nil, err
  93. }
  94. // add 1 to last key, if any
  95. newSeqNum := 0
  96. if len(resp.Kvs) != 0 {
  97. fields := strings.Split(string(resp.Kvs[0].Key), "/")
  98. _, serr := fmt.Sscanf(fields[len(fields)-1], "%d", &newSeqNum)
  99. if serr != nil {
  100. return nil, serr
  101. }
  102. newSeqNum++
  103. }
  104. newKey := fmt.Sprintf("%s/%016d", prefix, newSeqNum)
  105. // base prefix key must be current (i.e., <=) with the server update;
  106. // the base key is important to avoid the following:
  107. // N1: LastKey() == 1, start txn.
  108. // N2: New Key 2, New Key 3, Delete Key 2
  109. // N1: txn succeeds allocating key 2 when it shouldn't
  110. baseKey := "__" + prefix
  111. // current revision might contain modification so +1
  112. cmp := v3.Compare(v3.ModifiedRevision(baseKey), "<", resp.Header.Revision+1)
  113. reqPrefix := v3.OpPut(baseKey, "", v3.WithLease(leaseID))
  114. reqNewKey := v3.OpPut(newKey, val, v3.WithLease(leaseID))
  115. txn := kv.Txn(context.TODO())
  116. txnresp, err := txn.If(cmp).Then(reqPrefix, reqNewKey).Commit()
  117. if err != nil {
  118. return nil, err
  119. }
  120. if txnresp.Succeeded == false {
  121. return newSequentialKV(kv, prefix, val, leaseID)
  122. }
  123. return &RemoteKV{kv, newKey, txnresp.Header.Revision, val}, nil
  124. }
  125. func (rk *RemoteKV) Key() string { return rk.key }
  126. func (rk *RemoteKV) Revision() int64 { return rk.rev }
  127. func (rk *RemoteKV) Value() string { return rk.val }
  128. func (rk *RemoteKV) Delete() error {
  129. if rk.kv == nil {
  130. return nil
  131. }
  132. _, err := rk.kv.Delete(context.TODO(), rk.key)
  133. rk.kv = nil
  134. return err
  135. }
  136. func (rk *RemoteKV) Put(val string) error {
  137. _, err := rk.kv.Put(context.TODO(), rk.key, val)
  138. return err
  139. }
  140. // EphemeralKV is a new key associated with a session lease
  141. type EphemeralKV struct{ RemoteKV }
  142. // NewEphemeralKV creates a new key/value pair associated with a session lease
  143. func NewEphemeralKV(client *v3.Client, key, val string) (*EphemeralKV, error) {
  144. s, err := concurrency.NewSession(client)
  145. if err != nil {
  146. return nil, err
  147. }
  148. k, err := NewKV(v3.NewKV(client), key, val, s.Lease())
  149. if err != nil {
  150. return nil, err
  151. }
  152. return &EphemeralKV{*k}, nil
  153. }
  154. // NewUniqueEphemeralKey creates a new unique valueless key associated with a session lease
  155. func NewUniqueEphemeralKey(client *v3.Client, prefix string) (*EphemeralKV, error) {
  156. return NewUniqueEphemeralKV(client, prefix, "")
  157. }
  158. // NewUniqueEphemeralKV creates a new unique key/value pair associated with a session lease
  159. func NewUniqueEphemeralKV(client *v3.Client, prefix, val string) (ek *EphemeralKV, err error) {
  160. for {
  161. newKey := fmt.Sprintf("%s/%v", prefix, time.Now().UnixNano())
  162. ek, err = NewEphemeralKV(client, newKey, val)
  163. if err == nil || err != ErrKeyExists {
  164. break
  165. }
  166. }
  167. return ek, err
  168. }