kvstore.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421
  1. package storage
  2. import (
  3. "errors"
  4. "io"
  5. "log"
  6. "math"
  7. "math/rand"
  8. "sync"
  9. "time"
  10. "github.com/coreos/etcd/storage/backend"
  11. "github.com/coreos/etcd/storage/storagepb"
  12. )
  13. var (
  14. batchLimit = 10000
  15. batchInterval = 100 * time.Millisecond
  16. keyBucketName = []byte("key")
  17. metaBucketName = []byte("meta")
  18. scheduledCompactKeyName = []byte("scheduledCompactRev")
  19. finishedCompactKeyName = []byte("finishedCompactRev")
  20. ErrTxnIDMismatch = errors.New("storage: txn id mismatch")
  21. ErrCompacted = errors.New("storage: required revision has been compacted")
  22. ErrFutureRev = errors.New("storage: required revision is a future revision")
  23. )
  24. type store struct {
  25. mu sync.RWMutex
  26. b backend.Backend
  27. kvindex index
  28. currentRev revision
  29. // the main revision of the last compaction
  30. compactMainRev int64
  31. tmu sync.Mutex // protect the txnID field
  32. txnID int64 // tracks the current txnID to verify txn operations
  33. wg sync.WaitGroup
  34. stopc chan struct{}
  35. }
  36. func New(path string) KV {
  37. return newStore(path)
  38. }
  39. func newStore(path string) *store {
  40. s := &store{
  41. b: backend.New(path, batchInterval, batchLimit),
  42. kvindex: newTreeIndex(),
  43. currentRev: revision{},
  44. compactMainRev: -1,
  45. stopc: make(chan struct{}),
  46. }
  47. tx := s.b.BatchTx()
  48. tx.Lock()
  49. tx.UnsafeCreateBucket(keyBucketName)
  50. tx.UnsafeCreateBucket(metaBucketName)
  51. tx.Unlock()
  52. s.b.ForceCommit()
  53. return s
  54. }
  55. func (s *store) Put(key, value []byte) int64 {
  56. id := s.TxnBegin()
  57. s.put(key, value, s.currentRev.main+1)
  58. s.TxnEnd(id)
  59. return int64(s.currentRev.main)
  60. }
  61. func (s *store) Range(key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) {
  62. id := s.TxnBegin()
  63. kvs, rev, err = s.rangeKeys(key, end, limit, rangeRev)
  64. s.TxnEnd(id)
  65. return kvs, rev, err
  66. }
  67. func (s *store) DeleteRange(key, end []byte) (n, rev int64) {
  68. id := s.TxnBegin()
  69. n = s.deleteRange(key, end, s.currentRev.main+1)
  70. s.TxnEnd(id)
  71. return n, int64(s.currentRev.main)
  72. }
  73. func (s *store) TxnBegin() int64 {
  74. s.mu.Lock()
  75. s.currentRev.sub = 0
  76. s.tmu.Lock()
  77. defer s.tmu.Unlock()
  78. s.txnID = rand.Int63()
  79. return s.txnID
  80. }
  81. func (s *store) TxnEnd(txnID int64) error {
  82. s.tmu.Lock()
  83. defer s.tmu.Unlock()
  84. if txnID != s.txnID {
  85. return ErrTxnIDMismatch
  86. }
  87. if s.currentRev.sub != 0 {
  88. s.currentRev.main += 1
  89. }
  90. s.currentRev.sub = 0
  91. s.mu.Unlock()
  92. return nil
  93. }
  94. func (s *store) TxnRange(txnID int64, key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) {
  95. s.tmu.Lock()
  96. defer s.tmu.Unlock()
  97. if txnID != s.txnID {
  98. return nil, 0, ErrTxnIDMismatch
  99. }
  100. return s.rangeKeys(key, end, limit, rangeRev)
  101. }
  102. func (s *store) TxnPut(txnID int64, key, value []byte) (rev int64, err error) {
  103. s.tmu.Lock()
  104. defer s.tmu.Unlock()
  105. if txnID != s.txnID {
  106. return 0, ErrTxnIDMismatch
  107. }
  108. s.put(key, value, s.currentRev.main+1)
  109. return int64(s.currentRev.main + 1), nil
  110. }
  111. func (s *store) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) {
  112. s.tmu.Lock()
  113. defer s.tmu.Unlock()
  114. if txnID != s.txnID {
  115. return 0, 0, ErrTxnIDMismatch
  116. }
  117. n = s.deleteRange(key, end, s.currentRev.main+1)
  118. if n != 0 || s.currentRev.sub != 0 {
  119. rev = int64(s.currentRev.main + 1)
  120. } else {
  121. rev = int64(s.currentRev.main)
  122. }
  123. return n, rev, nil
  124. }
  125. func (s *store) Compact(rev int64) error {
  126. s.mu.Lock()
  127. defer s.mu.Unlock()
  128. if rev <= s.compactMainRev {
  129. return ErrCompacted
  130. }
  131. if rev > s.currentRev.main {
  132. return ErrFutureRev
  133. }
  134. s.compactMainRev = rev
  135. rbytes := newRevBytes()
  136. revToBytes(revision{main: rev}, rbytes)
  137. tx := s.b.BatchTx()
  138. tx.Lock()
  139. tx.UnsafePut(metaBucketName, scheduledCompactKeyName, rbytes)
  140. tx.Unlock()
  141. // ensure that desired compaction is persisted
  142. s.b.ForceCommit()
  143. keep := s.kvindex.Compact(rev)
  144. s.wg.Add(1)
  145. go s.scheduleCompaction(rev, keep)
  146. return nil
  147. }
  148. func (s *store) Snapshot(w io.Writer) (int64, error) {
  149. s.b.ForceCommit()
  150. return s.b.Snapshot(w)
  151. }
  152. func (s *store) Restore() error {
  153. s.mu.Lock()
  154. defer s.mu.Unlock()
  155. min, max := newRevBytes(), newRevBytes()
  156. revToBytes(revision{}, min)
  157. revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max)
  158. // restore index
  159. tx := s.b.BatchTx()
  160. tx.Lock()
  161. _, finishedCompactBytes := tx.UnsafeRange(metaBucketName, finishedCompactKeyName, nil, 0)
  162. if len(finishedCompactBytes) != 0 {
  163. s.compactMainRev = bytesToRev(finishedCompactBytes[0]).main
  164. log.Printf("storage: restore compact to %d", s.compactMainRev)
  165. }
  166. // TODO: limit N to reduce max memory usage
  167. keys, vals := tx.UnsafeRange(keyBucketName, min, max, 0)
  168. for i, key := range keys {
  169. e := &storagepb.Event{}
  170. if err := e.Unmarshal(vals[i]); err != nil {
  171. log.Fatalf("storage: cannot unmarshal event: %v", err)
  172. }
  173. rev := bytesToRev(key)
  174. // restore index
  175. switch e.Type {
  176. case storagepb.PUT:
  177. s.kvindex.Restore(e.Kv.Key, revision{e.Kv.CreateIndex, 0}, rev, e.Kv.Version)
  178. case storagepb.DELETE:
  179. s.kvindex.Tombstone(e.Kv.Key, rev)
  180. default:
  181. log.Panicf("storage: unexpected event type %s", e.Type)
  182. }
  183. // update revision
  184. s.currentRev = rev
  185. }
  186. _, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0)
  187. if len(scheduledCompactBytes) != 0 {
  188. scheduledCompact := bytesToRev(scheduledCompactBytes[0]).main
  189. if scheduledCompact > s.compactMainRev {
  190. log.Printf("storage: resume scheduled compaction at %d", scheduledCompact)
  191. go s.Compact(scheduledCompact)
  192. }
  193. }
  194. tx.Unlock()
  195. return nil
  196. }
  197. func (s *store) Close() error {
  198. close(s.stopc)
  199. s.wg.Wait()
  200. return s.b.Close()
  201. }
  202. func (a *store) Equal(b *store) bool {
  203. if a.currentRev != b.currentRev {
  204. return false
  205. }
  206. if a.compactMainRev != b.compactMainRev {
  207. return false
  208. }
  209. return a.kvindex.Equal(b.kvindex)
  210. }
  211. // range is a keyword in Go, add Keys suffix.
  212. func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) {
  213. curRev := int64(s.currentRev.main)
  214. if s.currentRev.sub > 0 {
  215. curRev += 1
  216. }
  217. if rangeRev > curRev {
  218. return nil, s.currentRev.main, ErrFutureRev
  219. }
  220. if rangeRev <= 0 {
  221. rev = curRev
  222. } else {
  223. rev = rangeRev
  224. }
  225. if rev <= s.compactMainRev {
  226. return nil, 0, ErrCompacted
  227. }
  228. _, revpairs := s.kvindex.Range(key, end, int64(rev))
  229. if len(revpairs) == 0 {
  230. return nil, rev, nil
  231. }
  232. tx := s.b.BatchTx()
  233. tx.Lock()
  234. defer tx.Unlock()
  235. for _, revpair := range revpairs {
  236. revbytes := newRevBytes()
  237. revToBytes(revpair, revbytes)
  238. _, vs := tx.UnsafeRange(keyBucketName, revbytes, nil, 0)
  239. if len(vs) != 1 {
  240. log.Fatalf("storage: range cannot find rev (%d,%d)", revpair.main, revpair.sub)
  241. }
  242. e := &storagepb.Event{}
  243. if err := e.Unmarshal(vs[0]); err != nil {
  244. log.Fatalf("storage: cannot unmarshal event: %v", err)
  245. }
  246. if e.Type == storagepb.PUT {
  247. kvs = append(kvs, *e.Kv)
  248. }
  249. if limit > 0 && len(kvs) >= int(limit) {
  250. break
  251. }
  252. }
  253. return kvs, rev, nil
  254. }
  255. func (s *store) put(key, value []byte, rev int64) {
  256. c := rev
  257. // if the key exists before, use its previous created
  258. _, created, ver, err := s.kvindex.Get(key, rev)
  259. if err == nil {
  260. c = created.main
  261. }
  262. ibytes := newRevBytes()
  263. revToBytes(revision{main: rev, sub: s.currentRev.sub}, ibytes)
  264. ver = ver + 1
  265. event := storagepb.Event{
  266. Type: storagepb.PUT,
  267. Kv: &storagepb.KeyValue{
  268. Key: key,
  269. Value: value,
  270. CreateIndex: c,
  271. ModIndex: rev,
  272. Version: ver,
  273. },
  274. }
  275. d, err := event.Marshal()
  276. if err != nil {
  277. log.Fatalf("storage: cannot marshal event: %v", err)
  278. }
  279. tx := s.b.BatchTx()
  280. tx.Lock()
  281. defer tx.Unlock()
  282. tx.UnsafePut(keyBucketName, ibytes, d)
  283. s.kvindex.Put(key, revision{main: rev, sub: s.currentRev.sub})
  284. s.currentRev.sub += 1
  285. }
  286. func (s *store) deleteRange(key, end []byte, rev int64) int64 {
  287. var n int64
  288. rrev := rev
  289. if s.currentRev.sub > 0 {
  290. rrev += 1
  291. }
  292. keys, _ := s.kvindex.Range(key, end, rrev)
  293. if len(keys) == 0 {
  294. return 0
  295. }
  296. for _, key := range keys {
  297. ok := s.delete(key, rev)
  298. if ok {
  299. n++
  300. }
  301. }
  302. return n
  303. }
  304. func (s *store) delete(key []byte, mainrev int64) bool {
  305. grev := mainrev
  306. if s.currentRev.sub > 0 {
  307. grev += 1
  308. }
  309. rev, _, _, err := s.kvindex.Get(key, grev)
  310. if err != nil {
  311. // key not exist
  312. return false
  313. }
  314. tx := s.b.BatchTx()
  315. tx.Lock()
  316. defer tx.Unlock()
  317. revbytes := newRevBytes()
  318. revToBytes(rev, revbytes)
  319. _, vs := tx.UnsafeRange(keyBucketName, revbytes, nil, 0)
  320. if len(vs) != 1 {
  321. log.Fatalf("storage: delete cannot find rev (%d,%d)", rev.main, rev.sub)
  322. }
  323. e := &storagepb.Event{}
  324. if err := e.Unmarshal(vs[0]); err != nil {
  325. log.Fatalf("storage: cannot unmarshal event: %v", err)
  326. }
  327. if e.Type == storagepb.DELETE {
  328. return false
  329. }
  330. ibytes := newRevBytes()
  331. revToBytes(revision{main: mainrev, sub: s.currentRev.sub}, ibytes)
  332. event := storagepb.Event{
  333. Type: storagepb.DELETE,
  334. Kv: &storagepb.KeyValue{
  335. Key: key,
  336. },
  337. }
  338. d, err := event.Marshal()
  339. if err != nil {
  340. log.Fatalf("storage: cannot marshal event: %v", err)
  341. }
  342. tx.UnsafePut(keyBucketName, ibytes, d)
  343. err = s.kvindex.Tombstone(key, revision{main: mainrev, sub: s.currentRev.sub})
  344. if err != nil {
  345. log.Fatalf("storage: cannot tombstone an existing key (%s): %v", string(key), err)
  346. }
  347. s.currentRev.sub += 1
  348. return true
  349. }