kv.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. // Copyright 2017 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package ordering
  15. import (
  16. "context"
  17. "sync"
  18. "github.com/coreos/etcd/clientv3"
  19. )
  20. // kvOrdering ensures that serialized requests do not return
  21. // get with revisions less than the previous
  22. // returned revision.
  23. type kvOrdering struct {
  24. clientv3.KV
  25. orderViolationFunc OrderViolationFunc
  26. prevRev int64
  27. revMu sync.RWMutex
  28. }
  29. func NewKV(kv clientv3.KV, orderViolationFunc OrderViolationFunc) *kvOrdering {
  30. return &kvOrdering{kv, orderViolationFunc, 0, sync.RWMutex{}}
  31. }
  32. func (kv *kvOrdering) getPrevRev() int64 {
  33. kv.revMu.RLock()
  34. defer kv.revMu.RUnlock()
  35. return kv.prevRev
  36. }
  37. func (kv *kvOrdering) setPrevRev(currRev int64) {
  38. prevRev := kv.getPrevRev()
  39. kv.revMu.Lock()
  40. defer kv.revMu.Unlock()
  41. if currRev > prevRev {
  42. kv.prevRev = currRev
  43. }
  44. }
  45. func (kv *kvOrdering) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) {
  46. // prevRev is stored in a local variable in order to record the prevRev
  47. // at the beginning of the Get operation, because concurrent
  48. // access to kvOrdering could change the prevRev field in the
  49. // middle of the Get operation.
  50. prevRev := kv.getPrevRev()
  51. op := clientv3.OpGet(key, opts...)
  52. for {
  53. r, err := kv.KV.Do(ctx, op)
  54. if err != nil {
  55. return nil, err
  56. }
  57. resp := r.Get()
  58. if resp.Header.Revision >= prevRev {
  59. kv.setPrevRev(resp.Header.Revision)
  60. return resp, nil
  61. }
  62. err = kv.orderViolationFunc(op, r, prevRev)
  63. if err != nil {
  64. return nil, err
  65. }
  66. }
  67. }
  68. func (kv *kvOrdering) Txn(ctx context.Context) clientv3.Txn {
  69. return &txnOrdering{
  70. kv.KV.Txn(ctx),
  71. kv,
  72. ctx,
  73. sync.Mutex{},
  74. []clientv3.Cmp{},
  75. []clientv3.Op{},
  76. []clientv3.Op{},
  77. }
  78. }
  79. // txnOrdering ensures that serialized requests do not return
  80. // txn responses with revisions less than the previous
  81. // returned revision.
  82. type txnOrdering struct {
  83. clientv3.Txn
  84. *kvOrdering
  85. ctx context.Context
  86. mu sync.Mutex
  87. cmps []clientv3.Cmp
  88. thenOps []clientv3.Op
  89. elseOps []clientv3.Op
  90. }
  91. func (txn *txnOrdering) If(cs ...clientv3.Cmp) clientv3.Txn {
  92. txn.mu.Lock()
  93. defer txn.mu.Unlock()
  94. txn.cmps = cs
  95. txn.Txn.If(cs...)
  96. return txn
  97. }
  98. func (txn *txnOrdering) Then(ops ...clientv3.Op) clientv3.Txn {
  99. txn.mu.Lock()
  100. defer txn.mu.Unlock()
  101. txn.thenOps = ops
  102. txn.Txn.Then(ops...)
  103. return txn
  104. }
  105. func (txn *txnOrdering) Else(ops ...clientv3.Op) clientv3.Txn {
  106. txn.mu.Lock()
  107. defer txn.mu.Unlock()
  108. txn.elseOps = ops
  109. txn.Txn.Else(ops...)
  110. return txn
  111. }
  112. func (txn *txnOrdering) Commit() (*clientv3.TxnResponse, error) {
  113. // prevRev is stored in a local variable in order to record the prevRev
  114. // at the beginning of the Commit operation, because concurrent
  115. // access to txnOrdering could change the prevRev field in the
  116. // middle of the Commit operation.
  117. prevRev := txn.getPrevRev()
  118. opTxn := clientv3.OpTxn(txn.cmps, txn.thenOps, txn.elseOps)
  119. for {
  120. opResp, err := txn.KV.Do(txn.ctx, opTxn)
  121. if err != nil {
  122. return nil, err
  123. }
  124. txnResp := opResp.Txn()
  125. if txnResp.Header.Revision >= prevRev {
  126. txn.setPrevRev(txnResp.Header.Revision)
  127. return txnResp, nil
  128. }
  129. err = txn.orderViolationFunc(opTxn, opResp, prevRev)
  130. if err != nil {
  131. return nil, err
  132. }
  133. }
  134. }