kvstore_txn.go 6.4 KB


  1. // Copyright 2017 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package mvcc
  15. import (
  16. "github.com/coreos/etcd/lease"
  17. "github.com/coreos/etcd/mvcc/backend"
  18. "github.com/coreos/etcd/mvcc/mvccpb"
  19. )
  20. type storeTxnRead struct {
  21. s *store
  22. tx backend.ReadTx
  23. firstRev int64
  24. rev int64
  25. }
  26. func (s *store) Read() TxnRead {
  27. s.mu.RLock()
  28. tx := s.b.ReadTx()
  29. s.revMu.RLock()
  30. tx.Lock()
  31. firstRev, rev := s.compactMainRev, s.currentRev
  32. s.revMu.RUnlock()
  33. return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev})
  34. }
  35. func (tr *storeTxnRead) FirstRev() int64 { return tr.firstRev }
  36. func (tr *storeTxnRead) Rev() int64 { return tr.rev }
  37. func (tr *storeTxnRead) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
  38. return tr.rangeKeys(key, end, tr.Rev(), ro)
  39. }
  40. func (tr *storeTxnRead) End() {
  41. tr.tx.Unlock()
  42. tr.s.mu.RUnlock()
  43. }
  44. type storeTxnWrite struct {
  45. *storeTxnRead
  46. tx backend.BatchTx
  47. // beginRev is the revision where the txn begins; it will write to the next revision.
  48. beginRev int64
  49. changes []mvccpb.KeyValue
  50. }
  51. func (s *store) Write() TxnWrite {
  52. s.mu.RLock()
  53. tx := s.b.BatchTx()
  54. tx.Lock()
  55. tw := &storeTxnWrite{
  56. storeTxnRead: &storeTxnRead{s, tx, 0, 0},
  57. tx: tx,
  58. beginRev: s.currentRev,
  59. changes: make([]mvccpb.KeyValue, 0, 4),
  60. }
  61. return newMetricsTxnWrite(tw)
  62. }
  63. func (tw *storeTxnWrite) Rev() int64 { return tw.beginRev }
  64. func (tw *storeTxnWrite) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
  65. rev := tw.beginRev
  66. if len(tw.changes) > 0 {
  67. rev++
  68. }
  69. return tw.rangeKeys(key, end, rev, ro)
  70. }
  71. func (tw *storeTxnWrite) DeleteRange(key, end []byte) (int64, int64) {
  72. if n := tw.deleteRange(key, end); n != 0 || len(tw.changes) > 0 {
  73. return n, int64(tw.beginRev + 1)
  74. }
  75. return 0, int64(tw.beginRev)
  76. }
  77. func (tw *storeTxnWrite) Put(key, value []byte, lease lease.LeaseID) int64 {
  78. tw.put(key, value, lease)
  79. return int64(tw.beginRev + 1)
  80. }
  81. func (tw *storeTxnWrite) End() {
  82. // only update index if the txn modifies the mvcc state.
  83. if len(tw.changes) != 0 {
  84. tw.s.saveIndex(tw.tx)
  85. // hold revMu lock to prevent new read txns from opening until writeback.
  86. tw.s.revMu.Lock()
  87. tw.s.currentRev++
  88. }
  89. tw.tx.Unlock()
  90. if len(tw.changes) != 0 {
  91. tw.s.revMu.Unlock()
  92. }
  93. dbTotalSize.Set(float64(tw.s.b.Size()))
  94. tw.s.mu.RUnlock()
  95. }
  96. func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) {
  97. rev := ro.Rev
  98. if rev > curRev {
  99. return &RangeResult{KVs: nil, Count: -1, Rev: curRev}, ErrFutureRev
  100. }
  101. if rev <= 0 {
  102. rev = curRev
  103. }
  104. if rev < tr.s.compactMainRev {
  105. return &RangeResult{KVs: nil, Count: -1, Rev: 0}, ErrCompacted
  106. }
  107. _, revpairs := tr.s.kvindex.Range(key, end, int64(rev))
  108. if len(revpairs) == 0 {
  109. return &RangeResult{KVs: nil, Count: 0, Rev: curRev}, nil
  110. }
  111. if ro.Count {
  112. return &RangeResult{KVs: nil, Count: len(revpairs), Rev: curRev}, nil
  113. }
  114. var kvs []mvccpb.KeyValue
  115. for _, revpair := range revpairs {
  116. start, end := revBytesRange(revpair)
  117. _, vs := tr.tx.UnsafeRange(keyBucketName, start, end, 0)
  118. if len(vs) != 1 {
  119. plog.Fatalf("range cannot find rev (%d,%d)", revpair.main, revpair.sub)
  120. }
  121. var kv mvccpb.KeyValue
  122. if err := kv.Unmarshal(vs[0]); err != nil {
  123. plog.Fatalf("cannot unmarshal event: %v", err)
  124. }
  125. kvs = append(kvs, kv)
  126. if ro.Limit > 0 && len(kvs) >= int(ro.Limit) {
  127. break
  128. }
  129. }
  130. return &RangeResult{KVs: kvs, Count: len(revpairs), Rev: curRev}, nil
  131. }
  132. func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
  133. rev := tw.beginRev + 1
  134. c := rev
  135. oldLease := lease.NoLease
  136. // if the key exists before, use its previous created and
  137. // get its previous leaseID
  138. _, created, ver, err := tw.s.kvindex.Get(key, rev)
  139. if err == nil {
  140. c = created.main
  141. oldLease = tw.s.le.GetLease(lease.LeaseItem{Key: string(key)})
  142. }
  143. ibytes := newRevBytes()
  144. idxRev := revision{main: rev, sub: int64(len(tw.changes))}
  145. revToBytes(idxRev, ibytes)
  146. ver = ver + 1
  147. kv := mvccpb.KeyValue{
  148. Key: key,
  149. Value: value,
  150. CreateRevision: c,
  151. ModRevision: rev,
  152. Version: ver,
  153. Lease: int64(leaseID),
  154. }
  155. d, err := kv.Marshal()
  156. if err != nil {
  157. plog.Fatalf("cannot marshal event: %v", err)
  158. }
  159. tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
  160. tw.s.kvindex.Put(key, idxRev)
  161. tw.changes = append(tw.changes, kv)
  162. if oldLease != lease.NoLease {
  163. if tw.s.le == nil {
  164. panic("no lessor to detach lease")
  165. }
  166. err = tw.s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}})
  167. if err != nil {
  168. plog.Errorf("unexpected error from lease detach: %v", err)
  169. }
  170. }
  171. if leaseID != lease.NoLease {
  172. if tw.s.le == nil {
  173. panic("no lessor to attach lease")
  174. }
  175. err = tw.s.le.Attach(leaseID, []lease.LeaseItem{{Key: string(key)}})
  176. if err != nil {
  177. panic("unexpected error from lease Attach")
  178. }
  179. }
  180. }
  181. func (tw *storeTxnWrite) deleteRange(key, end []byte) int64 {
  182. rrev := tw.beginRev
  183. if len(tw.changes) > 0 {
  184. rrev += 1
  185. }
  186. keys, revs := tw.s.kvindex.Range(key, end, rrev)
  187. if len(keys) == 0 {
  188. return 0
  189. }
  190. for i, key := range keys {
  191. tw.delete(key, revs[i])
  192. }
  193. return int64(len(keys))
  194. }
  195. func (tw *storeTxnWrite) delete(key []byte, rev revision) {
  196. ibytes := newRevBytes()
  197. idxRev := revision{main: tw.beginRev + 1, sub: int64(len(tw.changes))}
  198. revToBytes(idxRev, ibytes)
  199. ibytes = appendMarkTombstone(ibytes)
  200. kv := mvccpb.KeyValue{Key: key}
  201. d, err := kv.Marshal()
  202. if err != nil {
  203. plog.Fatalf("cannot marshal event: %v", err)
  204. }
  205. tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
  206. err = tw.s.kvindex.Tombstone(key, idxRev)
  207. if err != nil {
  208. plog.Fatalf("cannot tombstone an existing key (%s): %v", string(key), err)
  209. }
  210. tw.changes = append(tw.changes, kv)
  211. item := lease.LeaseItem{Key: string(key)}
  212. leaseID := tw.s.le.GetLease(item)
  213. if leaseID != lease.NoLease {
  214. err = tw.s.le.Detach(leaseID, []lease.LeaseItem{item})
  215. if err != nil {
  216. plog.Errorf("cannot detach %v", err)
  217. }
  218. }
  219. }
  220. func (tw *storeTxnWrite) Changes() []mvccpb.KeyValue { return tw.changes }