kv.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. // Copyright 2017 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package ordering
  15. import (
  16. "context"
  17. "sync"
  18. "go.etcd.io/etcd/clientv3"
  19. )
  20. // kvOrdering ensures that serialized requests do not return
  21. // get with revisions less than the previous
  22. // returned revision.
  23. type kvOrdering struct {
  24. clientv3.KV
  25. orderViolationFunc OrderViolationFunc
  26. prevRev int64
  27. revMu sync.RWMutex
  28. }
  29. func NewKV(kv clientv3.KV, orderViolationFunc OrderViolationFunc) *kvOrdering {
  30. return &kvOrdering{kv, orderViolationFunc, 0, sync.RWMutex{}}
  31. }
  32. func (kv *kvOrdering) getPrevRev() int64 {
  33. kv.revMu.RLock()
  34. defer kv.revMu.RUnlock()
  35. return kv.prevRev
  36. }
  37. func (kv *kvOrdering) setPrevRev(currRev int64) {
  38. kv.revMu.Lock()
  39. defer kv.revMu.Unlock()
  40. if currRev > kv.prevRev {
  41. kv.prevRev = currRev
  42. }
  43. }
  44. func (kv *kvOrdering) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) {
  45. // prevRev is stored in a local variable in order to record the prevRev
  46. // at the beginning of the Get operation, because concurrent
  47. // access to kvOrdering could change the prevRev field in the
  48. // middle of the Get operation.
  49. prevRev := kv.getPrevRev()
  50. op := clientv3.OpGet(key, opts...)
  51. for {
  52. r, err := kv.KV.Do(ctx, op)
  53. if err != nil {
  54. return nil, err
  55. }
  56. resp := r.Get()
  57. if resp.Header.Revision == prevRev {
  58. return resp, nil
  59. } else if resp.Header.Revision > prevRev {
  60. kv.setPrevRev(resp.Header.Revision)
  61. return resp, nil
  62. }
  63. err = kv.orderViolationFunc(op, r, prevRev)
  64. if err != nil {
  65. return nil, err
  66. }
  67. }
  68. }
  69. func (kv *kvOrdering) Txn(ctx context.Context) clientv3.Txn {
  70. return &txnOrdering{
  71. kv.KV.Txn(ctx),
  72. kv,
  73. ctx,
  74. sync.Mutex{},
  75. []clientv3.Cmp{},
  76. []clientv3.Op{},
  77. []clientv3.Op{},
  78. }
  79. }
  80. // txnOrdering ensures that serialized requests do not return
  81. // txn responses with revisions less than the previous
  82. // returned revision.
  83. type txnOrdering struct {
  84. clientv3.Txn
  85. *kvOrdering
  86. ctx context.Context
  87. mu sync.Mutex
  88. cmps []clientv3.Cmp
  89. thenOps []clientv3.Op
  90. elseOps []clientv3.Op
  91. }
  92. func (txn *txnOrdering) If(cs ...clientv3.Cmp) clientv3.Txn {
  93. txn.mu.Lock()
  94. defer txn.mu.Unlock()
  95. txn.cmps = cs
  96. txn.Txn.If(cs...)
  97. return txn
  98. }
  99. func (txn *txnOrdering) Then(ops ...clientv3.Op) clientv3.Txn {
  100. txn.mu.Lock()
  101. defer txn.mu.Unlock()
  102. txn.thenOps = ops
  103. txn.Txn.Then(ops...)
  104. return txn
  105. }
  106. func (txn *txnOrdering) Else(ops ...clientv3.Op) clientv3.Txn {
  107. txn.mu.Lock()
  108. defer txn.mu.Unlock()
  109. txn.elseOps = ops
  110. txn.Txn.Else(ops...)
  111. return txn
  112. }
  113. func (txn *txnOrdering) Commit() (*clientv3.TxnResponse, error) {
  114. // prevRev is stored in a local variable in order to record the prevRev
  115. // at the beginning of the Commit operation, because concurrent
  116. // access to txnOrdering could change the prevRev field in the
  117. // middle of the Commit operation.
  118. prevRev := txn.getPrevRev()
  119. opTxn := clientv3.OpTxn(txn.cmps, txn.thenOps, txn.elseOps)
  120. for {
  121. opResp, err := txn.KV.Do(txn.ctx, opTxn)
  122. if err != nil {
  123. return nil, err
  124. }
  125. txnResp := opResp.Txn()
  126. if txnResp.Header.Revision >= prevRev {
  127. txn.setPrevRev(txnResp.Header.Revision)
  128. return txnResp, nil
  129. }
  130. err = txn.orderViolationFunc(opTxn, opResp, prevRev)
  131. if err != nil {
  132. return nil, err
  133. }
  134. }
  135. }