kvstore_txn.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. // Copyright 2017 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package mvcc
  15. import (
  16. "github.com/coreos/etcd/lease"
  17. "github.com/coreos/etcd/mvcc/backend"
  18. "github.com/coreos/etcd/mvcc/mvccpb"
  19. )
  20. type storeTxnRead struct {
  21. s *store
  22. tx backend.ReadTx
  23. firstRev int64
  24. rev int64
  25. }
  26. func (s *store) Read() TxnRead {
  27. s.mu.RLock()
  28. tx := s.b.ReadTx()
  29. s.revMu.RLock()
  30. tx.Lock()
  31. firstRev, rev := s.compactMainRev, s.currentRev
  32. s.revMu.RUnlock()
  33. return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev})
  34. }
  35. func (tr *storeTxnRead) FirstRev() int64 { return tr.firstRev }
  36. func (tr *storeTxnRead) Rev() int64 { return tr.rev }
  37. func (tr *storeTxnRead) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
  38. return tr.rangeKeys(key, end, tr.Rev(), ro)
  39. }
  40. func (tr *storeTxnRead) End() {
  41. tr.tx.Unlock()
  42. tr.s.mu.RUnlock()
  43. }
  44. type storeTxnWrite struct {
  45. storeTxnRead
  46. tx backend.BatchTx
  47. // beginRev is the revision where the txn begins; it will write to the next revision.
  48. beginRev int64
  49. changes []mvccpb.KeyValue
  50. }
  51. func (s *store) Write() TxnWrite {
  52. s.mu.RLock()
  53. tx := s.b.BatchTx()
  54. tx.Lock()
  55. tw := &storeTxnWrite{
  56. storeTxnRead: storeTxnRead{s, tx, 0, 0},
  57. tx: tx,
  58. beginRev: s.currentRev,
  59. changes: make([]mvccpb.KeyValue, 0, 4),
  60. }
  61. return newMetricsTxnWrite(tw)
  62. }
  63. func (tw *storeTxnWrite) Rev() int64 { return tw.beginRev }
  64. func (tw *storeTxnWrite) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
  65. rev := tw.beginRev
  66. if len(tw.changes) > 0 {
  67. rev++
  68. }
  69. return tw.rangeKeys(key, end, rev, ro)
  70. }
  71. func (tw *storeTxnWrite) DeleteRange(key, end []byte) (int64, int64) {
  72. if n := tw.deleteRange(key, end); n != 0 || len(tw.changes) > 0 {
  73. return n, int64(tw.beginRev + 1)
  74. }
  75. return 0, int64(tw.beginRev)
  76. }
  77. func (tw *storeTxnWrite) Put(key, value []byte, lease lease.LeaseID) int64 {
  78. tw.put(key, value, lease)
  79. return int64(tw.beginRev + 1)
  80. }
  81. func (tw *storeTxnWrite) End() {
  82. // only update index if the txn modifies the mvcc state.
  83. if len(tw.changes) != 0 {
  84. tw.s.saveIndex(tw.tx)
  85. // hold revMu lock to prevent new read txns from opening until writeback.
  86. tw.s.revMu.Lock()
  87. tw.s.currentRev++
  88. }
  89. tw.tx.Unlock()
  90. if len(tw.changes) != 0 {
  91. tw.s.revMu.Unlock()
  92. }
  93. tw.s.mu.RUnlock()
  94. }
  95. func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) {
  96. rev := ro.Rev
  97. if rev > curRev {
  98. return &RangeResult{KVs: nil, Count: -1, Rev: curRev}, ErrFutureRev
  99. }
  100. if rev <= 0 {
  101. rev = curRev
  102. }
  103. if rev < tr.s.compactMainRev {
  104. return &RangeResult{KVs: nil, Count: -1, Rev: 0}, ErrCompacted
  105. }
  106. revpairs := tr.s.kvindex.Revisions(key, end, int64(rev))
  107. if len(revpairs) == 0 {
  108. return &RangeResult{KVs: nil, Count: 0, Rev: curRev}, nil
  109. }
  110. if ro.Count {
  111. return &RangeResult{KVs: nil, Count: len(revpairs), Rev: curRev}, nil
  112. }
  113. limit := int(ro.Limit)
  114. if limit <= 0 || limit > len(revpairs) {
  115. limit = len(revpairs)
  116. }
  117. kvs := make([]mvccpb.KeyValue, limit)
  118. revBytes := newRevBytes()
  119. for i, revpair := range revpairs[:len(kvs)] {
  120. revToBytes(revpair, revBytes)
  121. _, vs := tr.tx.UnsafeRange(keyBucketName, revBytes, nil, 0)
  122. if len(vs) != 1 {
  123. plog.Fatalf("range cannot find rev (%d,%d)", revpair.main, revpair.sub)
  124. }
  125. if err := kvs[i].Unmarshal(vs[0]); err != nil {
  126. plog.Fatalf("cannot unmarshal event: %v", err)
  127. }
  128. }
  129. return &RangeResult{KVs: kvs, Count: len(revpairs), Rev: curRev}, nil
  130. }
  131. func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
  132. rev := tw.beginRev + 1
  133. c := rev
  134. oldLease := lease.NoLease
  135. // if the key exists before, use its previous created and
  136. // get its previous leaseID
  137. _, created, ver, err := tw.s.kvindex.Get(key, rev)
  138. if err == nil {
  139. c = created.main
  140. oldLease = tw.s.le.GetLease(lease.LeaseItem{Key: string(key)})
  141. }
  142. ibytes := newRevBytes()
  143. idxRev := revision{main: rev, sub: int64(len(tw.changes))}
  144. revToBytes(idxRev, ibytes)
  145. ver = ver + 1
  146. kv := mvccpb.KeyValue{
  147. Key: key,
  148. Value: value,
  149. CreateRevision: c,
  150. ModRevision: rev,
  151. Version: ver,
  152. Lease: int64(leaseID),
  153. }
  154. d, err := kv.Marshal()
  155. if err != nil {
  156. plog.Fatalf("cannot marshal event: %v", err)
  157. }
  158. tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
  159. tw.s.kvindex.Put(key, idxRev)
  160. tw.changes = append(tw.changes, kv)
  161. if oldLease != lease.NoLease {
  162. if tw.s.le == nil {
  163. panic("no lessor to detach lease")
  164. }
  165. err = tw.s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}})
  166. if err != nil {
  167. plog.Errorf("unexpected error from lease detach: %v", err)
  168. }
  169. }
  170. if leaseID != lease.NoLease {
  171. if tw.s.le == nil {
  172. panic("no lessor to attach lease")
  173. }
  174. err = tw.s.le.Attach(leaseID, []lease.LeaseItem{{Key: string(key)}})
  175. if err != nil {
  176. panic("unexpected error from lease Attach")
  177. }
  178. }
  179. }
  180. func (tw *storeTxnWrite) deleteRange(key, end []byte) int64 {
  181. rrev := tw.beginRev
  182. if len(tw.changes) > 0 {
  183. rrev += 1
  184. }
  185. keys, revs := tw.s.kvindex.Range(key, end, rrev)
  186. if len(keys) == 0 {
  187. return 0
  188. }
  189. for i, key := range keys {
  190. tw.delete(key, revs[i])
  191. }
  192. return int64(len(keys))
  193. }
  194. func (tw *storeTxnWrite) delete(key []byte, rev revision) {
  195. ibytes := newRevBytes()
  196. idxRev := revision{main: tw.beginRev + 1, sub: int64(len(tw.changes))}
  197. revToBytes(idxRev, ibytes)
  198. ibytes = appendMarkTombstone(ibytes)
  199. kv := mvccpb.KeyValue{Key: key}
  200. d, err := kv.Marshal()
  201. if err != nil {
  202. plog.Fatalf("cannot marshal event: %v", err)
  203. }
  204. tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
  205. err = tw.s.kvindex.Tombstone(key, idxRev)
  206. if err != nil {
  207. plog.Fatalf("cannot tombstone an existing key (%s): %v", string(key), err)
  208. }
  209. tw.changes = append(tw.changes, kv)
  210. item := lease.LeaseItem{Key: string(key)}
  211. leaseID := tw.s.le.GetLease(item)
  212. if leaseID != lease.NoLease {
  213. err = tw.s.le.Detach(leaseID, []lease.LeaseItem{item})
  214. if err != nil {
  215. plog.Errorf("cannot detach %v", err)
  216. }
  217. }
  218. }
  219. func (tw *storeTxnWrite) Changes() []mvccpb.KeyValue { return tw.changes }