kv.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. // Copyright 2017 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package namespace
  15. import (
  16. "context"
  17. "github.com/coreos/etcd/clientv3"
  18. "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
  19. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  20. )
  21. type kvPrefix struct {
  22. clientv3.KV
  23. pfx string
  24. }
  25. // NewKV wraps a KV instance so that all requests
  26. // are prefixed with a given string.
  27. func NewKV(kv clientv3.KV, prefix string) clientv3.KV {
  28. return &kvPrefix{kv, prefix}
  29. }
  30. func (kv *kvPrefix) Put(ctx context.Context, key, val string, opts ...clientv3.OpOption) (*clientv3.PutResponse, error) {
  31. if len(key) == 0 {
  32. return nil, rpctypes.ErrEmptyKey
  33. }
  34. op := kv.prefixOp(clientv3.OpPut(key, val, opts...))
  35. r, err := kv.KV.Do(ctx, op)
  36. if err != nil {
  37. return nil, err
  38. }
  39. put := r.Put()
  40. kv.unprefixPutResponse(put)
  41. return put, nil
  42. }
  43. func (kv *kvPrefix) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) {
  44. if len(key) == 0 {
  45. return nil, rpctypes.ErrEmptyKey
  46. }
  47. r, err := kv.KV.Do(ctx, kv.prefixOp(clientv3.OpGet(key, opts...)))
  48. if err != nil {
  49. return nil, err
  50. }
  51. get := r.Get()
  52. kv.unprefixGetResponse(get)
  53. return get, nil
  54. }
  55. func (kv *kvPrefix) Delete(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.DeleteResponse, error) {
  56. if len(key) == 0 {
  57. return nil, rpctypes.ErrEmptyKey
  58. }
  59. r, err := kv.KV.Do(ctx, kv.prefixOp(clientv3.OpDelete(key, opts...)))
  60. if err != nil {
  61. return nil, err
  62. }
  63. del := r.Del()
  64. kv.unprefixDeleteResponse(del)
  65. return del, nil
  66. }
  67. func (kv *kvPrefix) Do(ctx context.Context, op clientv3.Op) (clientv3.OpResponse, error) {
  68. if len(op.KeyBytes()) == 0 && !op.IsTxn() {
  69. return clientv3.OpResponse{}, rpctypes.ErrEmptyKey
  70. }
  71. r, err := kv.KV.Do(ctx, kv.prefixOp(op))
  72. if err != nil {
  73. return r, err
  74. }
  75. switch {
  76. case r.Get() != nil:
  77. kv.unprefixGetResponse(r.Get())
  78. case r.Put() != nil:
  79. kv.unprefixPutResponse(r.Put())
  80. case r.Del() != nil:
  81. kv.unprefixDeleteResponse(r.Del())
  82. case r.Txn() != nil:
  83. kv.unprefixTxnResponse(r.Txn())
  84. }
  85. return r, nil
  86. }
  87. type txnPrefix struct {
  88. clientv3.Txn
  89. kv *kvPrefix
  90. }
  91. func (kv *kvPrefix) Txn(ctx context.Context) clientv3.Txn {
  92. return &txnPrefix{kv.KV.Txn(ctx), kv}
  93. }
  94. func (txn *txnPrefix) If(cs ...clientv3.Cmp) clientv3.Txn {
  95. txn.Txn = txn.Txn.If(txn.kv.prefixCmps(cs)...)
  96. return txn
  97. }
  98. func (txn *txnPrefix) Then(ops ...clientv3.Op) clientv3.Txn {
  99. txn.Txn = txn.Txn.Then(txn.kv.prefixOps(ops)...)
  100. return txn
  101. }
  102. func (txn *txnPrefix) Else(ops ...clientv3.Op) clientv3.Txn {
  103. txn.Txn = txn.Txn.Else(txn.kv.prefixOps(ops)...)
  104. return txn
  105. }
  106. func (txn *txnPrefix) Commit() (*clientv3.TxnResponse, error) {
  107. resp, err := txn.Txn.Commit()
  108. if err != nil {
  109. return nil, err
  110. }
  111. txn.kv.unprefixTxnResponse(resp)
  112. return resp, nil
  113. }
  114. func (kv *kvPrefix) prefixOp(op clientv3.Op) clientv3.Op {
  115. if !op.IsTxn() {
  116. begin, end := kv.prefixInterval(op.KeyBytes(), op.RangeBytes())
  117. op.WithKeyBytes(begin)
  118. op.WithRangeBytes(end)
  119. return op
  120. }
  121. cmps, thenOps, elseOps := op.Txn()
  122. return clientv3.OpTxn(kv.prefixCmps(cmps), kv.prefixOps(thenOps), kv.prefixOps(elseOps))
  123. }
  124. func (kv *kvPrefix) unprefixGetResponse(resp *clientv3.GetResponse) {
  125. for i := range resp.Kvs {
  126. resp.Kvs[i].Key = resp.Kvs[i].Key[len(kv.pfx):]
  127. }
  128. }
  129. func (kv *kvPrefix) unprefixPutResponse(resp *clientv3.PutResponse) {
  130. if resp.PrevKv != nil {
  131. resp.PrevKv.Key = resp.PrevKv.Key[len(kv.pfx):]
  132. }
  133. }
  134. func (kv *kvPrefix) unprefixDeleteResponse(resp *clientv3.DeleteResponse) {
  135. for i := range resp.PrevKvs {
  136. resp.PrevKvs[i].Key = resp.PrevKvs[i].Key[len(kv.pfx):]
  137. }
  138. }
  139. func (kv *kvPrefix) unprefixTxnResponse(resp *clientv3.TxnResponse) {
  140. for _, r := range resp.Responses {
  141. switch tv := r.Response.(type) {
  142. case *pb.ResponseOp_ResponseRange:
  143. if tv.ResponseRange != nil {
  144. kv.unprefixGetResponse((*clientv3.GetResponse)(tv.ResponseRange))
  145. }
  146. case *pb.ResponseOp_ResponsePut:
  147. if tv.ResponsePut != nil {
  148. kv.unprefixPutResponse((*clientv3.PutResponse)(tv.ResponsePut))
  149. }
  150. case *pb.ResponseOp_ResponseDeleteRange:
  151. if tv.ResponseDeleteRange != nil {
  152. kv.unprefixDeleteResponse((*clientv3.DeleteResponse)(tv.ResponseDeleteRange))
  153. }
  154. case *pb.ResponseOp_ResponseTxn:
  155. if tv.ResponseTxn != nil {
  156. kv.unprefixTxnResponse((*clientv3.TxnResponse)(tv.ResponseTxn))
  157. }
  158. default:
  159. }
  160. }
  161. }
  162. func (kv *kvPrefix) prefixInterval(key, end []byte) (pfxKey []byte, pfxEnd []byte) {
  163. return prefixInterval(kv.pfx, key, end)
  164. }
  165. func (kv *kvPrefix) prefixCmps(cs []clientv3.Cmp) []clientv3.Cmp {
  166. newCmps := make([]clientv3.Cmp, len(cs))
  167. for i := range cs {
  168. newCmps[i] = cs[i]
  169. pfxKey, endKey := kv.prefixInterval(cs[i].KeyBytes(), cs[i].RangeEnd)
  170. newCmps[i].WithKeyBytes(pfxKey)
  171. if len(cs[i].RangeEnd) != 0 {
  172. newCmps[i].RangeEnd = endKey
  173. }
  174. }
  175. return newCmps
  176. }
  177. func (kv *kvPrefix) prefixOps(ops []clientv3.Op) []clientv3.Op {
  178. newOps := make([]clientv3.Op, len(ops))
  179. for i := range ops {
  180. newOps[i] = kv.prefixOp(ops[i])
  181. }
  182. return newOps
  183. }