kvstore.go 9.4 KB


  1. package storage
  2. import (
  3. "errors"
  4. "io"
  5. "log"
  6. "math"
  7. "math/rand"
  8. "sync"
  9. "time"
  10. "github.com/coreos/etcd/storage/backend"
  11. "github.com/coreos/etcd/storage/storagepb"
  12. )
  13. var (
  14. batchLimit = 10000
  15. batchInterval = 100 * time.Millisecond
  16. keyBucketName = []byte("key")
  17. metaBucketName = []byte("meta")
  18. scheduledCompactKeyName = []byte("scheduledCompactRev")
  19. finishedCompactKeyName = []byte("finishedCompactRev")
  20. ErrTxnIDMismatch = errors.New("storage: txn id mismatch")
  21. ErrCompacted = errors.New("storage: required revision has been compacted")
  22. ErrFutureRev = errors.New("storage: required revision is a future revision")
  23. )
  24. type store struct {
  25. mu sync.RWMutex
  26. b backend.Backend
  27. kvindex index
  28. currentRev revision
  29. // the main revision of the last compaction
  30. compactMainRev int64
  31. tmu sync.Mutex // protect the txnID field
  32. txnID int64 // tracks the current txnID to verify txn operations
  33. wg sync.WaitGroup
  34. stopc chan struct{}
  35. }
  36. func New(path string) KV {
  37. return newStore(path)
  38. }
  39. func newStore(path string) *store {
  40. s := &store{
  41. b: backend.New(path, batchInterval, batchLimit),
  42. kvindex: newTreeIndex(),
  43. currentRev: revision{},
  44. compactMainRev: -1,
  45. stopc: make(chan struct{}),
  46. }
  47. tx := s.b.BatchTx()
  48. tx.Lock()
  49. tx.UnsafeCreateBucket(keyBucketName)
  50. tx.UnsafeCreateBucket(metaBucketName)
  51. tx.Unlock()
  52. s.b.ForceCommit()
  53. return s
  54. }
  55. func (s *store) Put(key, value []byte) int64 {
  56. id := s.TxnBegin()
  57. s.put(key, value, s.currentRev.main+1)
  58. s.txnEnd(id)
  59. putCounter.Inc()
  60. return int64(s.currentRev.main)
  61. }
  62. func (s *store) Range(key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) {
  63. id := s.TxnBegin()
  64. kvs, rev, err = s.rangeKeys(key, end, limit, rangeRev)
  65. s.txnEnd(id)
  66. rangeCounter.Inc()
  67. return kvs, rev, err
  68. }
  69. func (s *store) DeleteRange(key, end []byte) (n, rev int64) {
  70. id := s.TxnBegin()
  71. n = s.deleteRange(key, end, s.currentRev.main+1)
  72. s.txnEnd(id)
  73. deleteCounter.Inc()
  74. return n, int64(s.currentRev.main)
  75. }
  76. func (s *store) TxnBegin() int64 {
  77. s.mu.Lock()
  78. s.currentRev.sub = 0
  79. s.tmu.Lock()
  80. defer s.tmu.Unlock()
  81. s.txnID = rand.Int63()
  82. return s.txnID
  83. }
  84. func (s *store) TxnEnd(txnID int64) error {
  85. err := s.txnEnd(txnID)
  86. if err != nil {
  87. return err
  88. }
  89. txnCounter.Inc()
  90. return nil
  91. }
  92. // txnEnd is used for unlocking an internal txn. It does
  93. // not increase the txnCounter.
  94. func (s *store) txnEnd(txnID int64) error {
  95. s.tmu.Lock()
  96. defer s.tmu.Unlock()
  97. if txnID != s.txnID {
  98. return ErrTxnIDMismatch
  99. }
  100. if s.currentRev.sub != 0 {
  101. s.currentRev.main += 1
  102. }
  103. s.currentRev.sub = 0
  104. s.mu.Unlock()
  105. return nil
  106. }
  107. func (s *store) TxnRange(txnID int64, key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) {
  108. s.tmu.Lock()
  109. defer s.tmu.Unlock()
  110. if txnID != s.txnID {
  111. return nil, 0, ErrTxnIDMismatch
  112. }
  113. return s.rangeKeys(key, end, limit, rangeRev)
  114. }
  115. func (s *store) TxnPut(txnID int64, key, value []byte) (rev int64, err error) {
  116. s.tmu.Lock()
  117. defer s.tmu.Unlock()
  118. if txnID != s.txnID {
  119. return 0, ErrTxnIDMismatch
  120. }
  121. s.put(key, value, s.currentRev.main+1)
  122. return int64(s.currentRev.main + 1), nil
  123. }
  124. func (s *store) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) {
  125. s.tmu.Lock()
  126. defer s.tmu.Unlock()
  127. if txnID != s.txnID {
  128. return 0, 0, ErrTxnIDMismatch
  129. }
  130. n = s.deleteRange(key, end, s.currentRev.main+1)
  131. if n != 0 || s.currentRev.sub != 0 {
  132. rev = int64(s.currentRev.main + 1)
  133. } else {
  134. rev = int64(s.currentRev.main)
  135. }
  136. return n, rev, nil
  137. }
  138. func (s *store) Compact(rev int64) error {
  139. s.mu.Lock()
  140. defer s.mu.Unlock()
  141. if rev <= s.compactMainRev {
  142. return ErrCompacted
  143. }
  144. if rev > s.currentRev.main {
  145. return ErrFutureRev
  146. }
  147. start := time.Now()
  148. s.compactMainRev = rev
  149. rbytes := newRevBytes()
  150. revToBytes(revision{main: rev}, rbytes)
  151. tx := s.b.BatchTx()
  152. tx.Lock()
  153. tx.UnsafePut(metaBucketName, scheduledCompactKeyName, rbytes)
  154. tx.Unlock()
  155. // ensure that desired compaction is persisted
  156. s.b.ForceCommit()
  157. keep := s.kvindex.Compact(rev)
  158. s.wg.Add(1)
  159. go s.scheduleCompaction(rev, keep)
  160. indexCompactionPauseDurations.Observe(float64(time.Now().Sub(start) / time.Millisecond))
  161. return nil
  162. }
  163. func (s *store) Snapshot(w io.Writer) (int64, error) {
  164. s.b.ForceCommit()
  165. return s.b.Snapshot(w)
  166. }
  167. func (s *store) Restore() error {
  168. s.mu.Lock()
  169. defer s.mu.Unlock()
  170. min, max := newRevBytes(), newRevBytes()
  171. revToBytes(revision{}, min)
  172. revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max)
  173. // restore index
  174. tx := s.b.BatchTx()
  175. tx.Lock()
  176. _, finishedCompactBytes := tx.UnsafeRange(metaBucketName, finishedCompactKeyName, nil, 0)
  177. if len(finishedCompactBytes) != 0 {
  178. s.compactMainRev = bytesToRev(finishedCompactBytes[0]).main
  179. log.Printf("storage: restore compact to %d", s.compactMainRev)
  180. }
  181. // TODO: limit N to reduce max memory usage
  182. keys, vals := tx.UnsafeRange(keyBucketName, min, max, 0)
  183. for i, key := range keys {
  184. e := &storagepb.Event{}
  185. if err := e.Unmarshal(vals[i]); err != nil {
  186. log.Fatalf("storage: cannot unmarshal event: %v", err)
  187. }
  188. rev := bytesToRev(key)
  189. // restore index
  190. switch e.Type {
  191. case storagepb.PUT:
  192. s.kvindex.Restore(e.Kv.Key, revision{e.Kv.CreateIndex, 0}, rev, e.Kv.Version)
  193. case storagepb.DELETE:
  194. s.kvindex.Tombstone(e.Kv.Key, rev)
  195. default:
  196. log.Panicf("storage: unexpected event type %s", e.Type)
  197. }
  198. // update revision
  199. s.currentRev = rev
  200. }
  201. _, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0)
  202. if len(scheduledCompactBytes) != 0 {
  203. scheduledCompact := bytesToRev(scheduledCompactBytes[0]).main
  204. if scheduledCompact > s.compactMainRev {
  205. log.Printf("storage: resume scheduled compaction at %d", scheduledCompact)
  206. go s.Compact(scheduledCompact)
  207. }
  208. }
  209. tx.Unlock()
  210. return nil
  211. }
  212. func (s *store) Close() error {
  213. close(s.stopc)
  214. s.wg.Wait()
  215. return s.b.Close()
  216. }
  217. func (a *store) Equal(b *store) bool {
  218. if a.currentRev != b.currentRev {
  219. return false
  220. }
  221. if a.compactMainRev != b.compactMainRev {
  222. return false
  223. }
  224. return a.kvindex.Equal(b.kvindex)
  225. }
  226. // range is a keyword in Go, add Keys suffix.
  227. func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) {
  228. curRev := int64(s.currentRev.main)
  229. if s.currentRev.sub > 0 {
  230. curRev += 1
  231. }
  232. if rangeRev > curRev {
  233. return nil, s.currentRev.main, ErrFutureRev
  234. }
  235. if rangeRev <= 0 {
  236. rev = curRev
  237. } else {
  238. rev = rangeRev
  239. }
  240. if rev <= s.compactMainRev {
  241. return nil, 0, ErrCompacted
  242. }
  243. _, revpairs := s.kvindex.Range(key, end, int64(rev))
  244. if len(revpairs) == 0 {
  245. return nil, rev, nil
  246. }
  247. tx := s.b.BatchTx()
  248. tx.Lock()
  249. defer tx.Unlock()
  250. for _, revpair := range revpairs {
  251. revbytes := newRevBytes()
  252. revToBytes(revpair, revbytes)
  253. _, vs := tx.UnsafeRange(keyBucketName, revbytes, nil, 0)
  254. if len(vs) != 1 {
  255. log.Fatalf("storage: range cannot find rev (%d,%d)", revpair.main, revpair.sub)
  256. }
  257. e := &storagepb.Event{}
  258. if err := e.Unmarshal(vs[0]); err != nil {
  259. log.Fatalf("storage: cannot unmarshal event: %v", err)
  260. }
  261. if e.Type == storagepb.PUT {
  262. kvs = append(kvs, *e.Kv)
  263. }
  264. if limit > 0 && len(kvs) >= int(limit) {
  265. break
  266. }
  267. }
  268. return kvs, rev, nil
  269. }
  270. func (s *store) put(key, value []byte, rev int64) {
  271. c := rev
  272. // if the key exists before, use its previous created
  273. _, created, ver, err := s.kvindex.Get(key, rev)
  274. if err == nil {
  275. c = created.main
  276. }
  277. ibytes := newRevBytes()
  278. revToBytes(revision{main: rev, sub: s.currentRev.sub}, ibytes)
  279. ver = ver + 1
  280. event := storagepb.Event{
  281. Type: storagepb.PUT,
  282. Kv: &storagepb.KeyValue{
  283. Key: key,
  284. Value: value,
  285. CreateIndex: c,
  286. ModIndex: rev,
  287. Version: ver,
  288. },
  289. }
  290. d, err := event.Marshal()
  291. if err != nil {
  292. log.Fatalf("storage: cannot marshal event: %v", err)
  293. }
  294. tx := s.b.BatchTx()
  295. tx.Lock()
  296. defer tx.Unlock()
  297. tx.UnsafePut(keyBucketName, ibytes, d)
  298. s.kvindex.Put(key, revision{main: rev, sub: s.currentRev.sub})
  299. s.currentRev.sub += 1
  300. }
  301. func (s *store) deleteRange(key, end []byte, rev int64) int64 {
  302. var n int64
  303. rrev := rev
  304. if s.currentRev.sub > 0 {
  305. rrev += 1
  306. }
  307. keys, _ := s.kvindex.Range(key, end, rrev)
  308. if len(keys) == 0 {
  309. return 0
  310. }
  311. for _, key := range keys {
  312. ok := s.delete(key, rev)
  313. if ok {
  314. n++
  315. }
  316. }
  317. return n
  318. }
  319. func (s *store) delete(key []byte, mainrev int64) bool {
  320. grev := mainrev
  321. if s.currentRev.sub > 0 {
  322. grev += 1
  323. }
  324. rev, _, _, err := s.kvindex.Get(key, grev)
  325. if err != nil {
  326. // key not exist
  327. return false
  328. }
  329. tx := s.b.BatchTx()
  330. tx.Lock()
  331. defer tx.Unlock()
  332. revbytes := newRevBytes()
  333. revToBytes(rev, revbytes)
  334. _, vs := tx.UnsafeRange(keyBucketName, revbytes, nil, 0)
  335. if len(vs) != 1 {
  336. log.Fatalf("storage: delete cannot find rev (%d,%d)", rev.main, rev.sub)
  337. }
  338. e := &storagepb.Event{}
  339. if err := e.Unmarshal(vs[0]); err != nil {
  340. log.Fatalf("storage: cannot unmarshal event: %v", err)
  341. }
  342. if e.Type == storagepb.DELETE {
  343. return false
  344. }
  345. ibytes := newRevBytes()
  346. revToBytes(revision{main: mainrev, sub: s.currentRev.sub}, ibytes)
  347. event := storagepb.Event{
  348. Type: storagepb.DELETE,
  349. Kv: &storagepb.KeyValue{
  350. Key: key,
  351. },
  352. }
  353. d, err := event.Marshal()
  354. if err != nil {
  355. log.Fatalf("storage: cannot marshal event: %v", err)
  356. }
  357. tx.UnsafePut(keyBucketName, ibytes, d)
  358. err = s.kvindex.Tombstone(key, revision{main: mainrev, sub: s.currentRev.sub})
  359. if err != nil {
  360. log.Fatalf("storage: cannot tombstone an existing key (%s): %v", string(key), err)
  361. }
  362. s.currentRev.sub += 1
  363. return true
  364. }