kvstore.go 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408
  1. package storage
  2. import (
  3. "errors"
  4. "io"
  5. "log"
  6. "math"
  7. "math/rand"
  8. "sync"
  9. "time"
  10. "github.com/coreos/etcd/storage/backend"
  11. "github.com/coreos/etcd/storage/storagepb"
  12. )
  13. var (
  14. batchLimit = 10000
  15. batchInterval = 100 * time.Millisecond
  16. keyBucketName = []byte("key")
  17. metaBucketName = []byte("meta")
  18. scheduledCompactKeyName = []byte("scheduledCompactRev")
  19. finishedCompactKeyName = []byte("finishedCompactRev")
  20. ErrTnxIDMismatch = errors.New("storage: tnx id mismatch")
  21. ErrCompacted = errors.New("storage: required reversion has been compacted")
  22. ErrFutureRev = errors.New("storage: required reversion is a future reversion")
  23. )
  24. type store struct {
  25. mu sync.RWMutex
  26. b backend.Backend
  27. kvindex index
  28. currentRev reversion
  29. // the main reversion of the last compaction
  30. compactMainRev int64
  31. tmu sync.Mutex // protect the tnxID field
  32. tnxID int64 // tracks the current tnxID to verify tnx operations
  33. wg sync.WaitGroup
  34. stopc chan struct{}
  35. }
  36. func newStore(path string) *store {
  37. s := &store{
  38. b: backend.New(path, batchInterval, batchLimit),
  39. kvindex: newTreeIndex(),
  40. currentRev: reversion{},
  41. compactMainRev: -1,
  42. stopc: make(chan struct{}),
  43. }
  44. tx := s.b.BatchTx()
  45. tx.Lock()
  46. tx.UnsafeCreateBucket(keyBucketName)
  47. tx.UnsafeCreateBucket(metaBucketName)
  48. tx.Unlock()
  49. s.b.ForceCommit()
  50. return s
  51. }
  52. func (s *store) Put(key, value []byte) int64 {
  53. id := s.TnxBegin()
  54. s.put(key, value, s.currentRev.main+1)
  55. s.TnxEnd(id)
  56. return int64(s.currentRev.main)
  57. }
  58. func (s *store) Range(key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) {
  59. id := s.TnxBegin()
  60. kvs, rev, err = s.rangeKeys(key, end, limit, rangeRev)
  61. s.TnxEnd(id)
  62. return kvs, rev, err
  63. }
  64. func (s *store) DeleteRange(key, end []byte) (n, rev int64) {
  65. id := s.TnxBegin()
  66. n = s.deleteRange(key, end, s.currentRev.main+1)
  67. s.TnxEnd(id)
  68. return n, int64(s.currentRev.main)
  69. }
  70. func (s *store) TnxBegin() int64 {
  71. s.mu.Lock()
  72. s.currentRev.sub = 0
  73. s.tmu.Lock()
  74. defer s.tmu.Unlock()
  75. s.tnxID = rand.Int63()
  76. return s.tnxID
  77. }
  78. func (s *store) TnxEnd(tnxID int64) error {
  79. s.tmu.Lock()
  80. defer s.tmu.Unlock()
  81. if tnxID != s.tnxID {
  82. return ErrTnxIDMismatch
  83. }
  84. if s.currentRev.sub != 0 {
  85. s.currentRev.main += 1
  86. }
  87. s.currentRev.sub = 0
  88. s.mu.Unlock()
  89. return nil
  90. }
  91. func (s *store) TnxRange(tnxID int64, key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) {
  92. s.tmu.Lock()
  93. defer s.tmu.Unlock()
  94. if tnxID != s.tnxID {
  95. return nil, 0, ErrTnxIDMismatch
  96. }
  97. return s.rangeKeys(key, end, limit, rangeRev)
  98. }
  99. func (s *store) TnxPut(tnxID int64, key, value []byte) (rev int64, err error) {
  100. s.tmu.Lock()
  101. defer s.tmu.Unlock()
  102. if tnxID != s.tnxID {
  103. return 0, ErrTnxIDMismatch
  104. }
  105. s.put(key, value, s.currentRev.main+1)
  106. return int64(s.currentRev.main + 1), nil
  107. }
  108. func (s *store) TnxDeleteRange(tnxID int64, key, end []byte) (n, rev int64, err error) {
  109. s.tmu.Lock()
  110. defer s.tmu.Unlock()
  111. if tnxID != s.tnxID {
  112. return 0, 0, ErrTnxIDMismatch
  113. }
  114. n = s.deleteRange(key, end, s.currentRev.main+1)
  115. if n != 0 || s.currentRev.sub != 0 {
  116. rev = int64(s.currentRev.main + 1)
  117. }
  118. return n, rev, nil
  119. }
  120. func (s *store) Compact(rev int64) error {
  121. s.mu.Lock()
  122. defer s.mu.Unlock()
  123. if rev <= s.compactMainRev {
  124. return ErrCompacted
  125. }
  126. s.compactMainRev = rev
  127. rbytes := newRevBytes()
  128. revToBytes(reversion{main: rev}, rbytes)
  129. tx := s.b.BatchTx()
  130. tx.Lock()
  131. tx.UnsafePut(metaBucketName, scheduledCompactKeyName, rbytes)
  132. tx.Unlock()
  133. keep := s.kvindex.Compact(rev)
  134. s.wg.Add(1)
  135. go s.scheduleCompaction(rev, keep)
  136. return nil
  137. }
  138. func (s *store) Snapshot(w io.Writer) (int64, error) {
  139. s.b.ForceCommit()
  140. return s.b.Snapshot(w)
  141. }
  142. func (s *store) Restore() error {
  143. s.mu.Lock()
  144. defer s.mu.Unlock()
  145. min, max := newRevBytes(), newRevBytes()
  146. revToBytes(reversion{}, min)
  147. revToBytes(reversion{main: math.MaxInt64, sub: math.MaxInt64}, max)
  148. // restore index
  149. tx := s.b.BatchTx()
  150. tx.Lock()
  151. _, finishedCompactBytes := tx.UnsafeRange(metaBucketName, finishedCompactKeyName, nil, 0)
  152. if len(finishedCompactBytes) != 0 {
  153. s.compactMainRev = bytesToRev(finishedCompactBytes[0]).main
  154. log.Printf("storage: restore compact to %d", s.compactMainRev)
  155. }
  156. // TODO: limit N to reduce max memory usage
  157. keys, vals := tx.UnsafeRange(keyBucketName, min, max, 0)
  158. for i, key := range keys {
  159. e := &storagepb.Event{}
  160. if err := e.Unmarshal(vals[i]); err != nil {
  161. log.Fatalf("storage: cannot unmarshal event: %v", err)
  162. }
  163. rev := bytesToRev(key)
  164. // restore index
  165. switch e.Type {
  166. case storagepb.PUT:
  167. s.kvindex.Put(e.Kv.Key, rev)
  168. case storagepb.DELETE:
  169. s.kvindex.Tombstone(e.Kv.Key, rev)
  170. default:
  171. log.Panicf("storage: unexpected event type %s", e.Type)
  172. }
  173. // update reversion
  174. s.currentRev = rev
  175. }
  176. _, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0)
  177. if len(scheduledCompactBytes) != 0 {
  178. scheduledCompact := bytesToRev(scheduledCompactBytes[0]).main
  179. if scheduledCompact > s.compactMainRev {
  180. log.Printf("storage: resume scheduled compaction at %d", scheduledCompact)
  181. go s.Compact(scheduledCompact)
  182. }
  183. }
  184. tx.Unlock()
  185. return nil
  186. }
  187. func (s *store) Close() error {
  188. close(s.stopc)
  189. s.wg.Wait()
  190. return s.b.Close()
  191. }
  192. func (a *store) Equal(b *store) bool {
  193. if a.currentRev != b.currentRev {
  194. return false
  195. }
  196. if a.compactMainRev != b.compactMainRev {
  197. return false
  198. }
  199. return a.kvindex.Equal(b.kvindex)
  200. }
  201. // range is a keyword in Go, add Keys suffix.
  202. func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) {
  203. if rangeRev > s.currentRev.main {
  204. return nil, s.currentRev.main, ErrFutureRev
  205. }
  206. if rangeRev <= 0 {
  207. rev = int64(s.currentRev.main)
  208. if s.currentRev.sub > 0 {
  209. rev += 1
  210. }
  211. } else {
  212. rev = rangeRev
  213. }
  214. if rev <= s.compactMainRev {
  215. return nil, 0, ErrCompacted
  216. }
  217. _, revpairs := s.kvindex.Range(key, end, int64(rev))
  218. if len(revpairs) == 0 {
  219. return nil, rev, nil
  220. }
  221. tx := s.b.BatchTx()
  222. tx.Lock()
  223. defer tx.Unlock()
  224. for _, revpair := range revpairs {
  225. revbytes := newRevBytes()
  226. revToBytes(revpair, revbytes)
  227. _, vs := tx.UnsafeRange(keyBucketName, revbytes, nil, 0)
  228. if len(vs) != 1 {
  229. log.Fatalf("storage: range cannot find rev (%d,%d)", revpair.main, revpair.sub)
  230. }
  231. e := &storagepb.Event{}
  232. if err := e.Unmarshal(vs[0]); err != nil {
  233. log.Fatalf("storage: cannot unmarshal event: %v", err)
  234. }
  235. if e.Type == storagepb.PUT {
  236. kvs = append(kvs, e.Kv)
  237. }
  238. if limit > 0 && len(kvs) >= int(limit) {
  239. break
  240. }
  241. }
  242. return kvs, rev, nil
  243. }
  244. func (s *store) put(key, value []byte, rev int64) {
  245. c := rev
  246. // if the key exists before, use its previous created
  247. _, created, ver, err := s.kvindex.Get(key, rev)
  248. if err == nil {
  249. c = created.main
  250. }
  251. ibytes := newRevBytes()
  252. revToBytes(reversion{main: rev, sub: s.currentRev.sub}, ibytes)
  253. ver = ver + 1
  254. event := storagepb.Event{
  255. Type: storagepb.PUT,
  256. Kv: storagepb.KeyValue{
  257. Key: key,
  258. Value: value,
  259. CreateIndex: c,
  260. ModIndex: rev,
  261. Version: ver,
  262. },
  263. }
  264. d, err := event.Marshal()
  265. if err != nil {
  266. log.Fatalf("storage: cannot marshal event: %v", err)
  267. }
  268. tx := s.b.BatchTx()
  269. tx.Lock()
  270. defer tx.Unlock()
  271. tx.UnsafePut(keyBucketName, ibytes, d)
  272. s.kvindex.Put(key, reversion{main: rev, sub: s.currentRev.sub})
  273. s.currentRev.sub += 1
  274. }
  275. func (s *store) deleteRange(key, end []byte, rev int64) int64 {
  276. var n int64
  277. rrev := rev
  278. if s.currentRev.sub > 0 {
  279. rrev += 1
  280. }
  281. keys, _ := s.kvindex.Range(key, end, rrev)
  282. if len(keys) == 0 {
  283. return 0
  284. }
  285. for _, key := range keys {
  286. ok := s.delete(key, rev)
  287. if ok {
  288. n++
  289. }
  290. }
  291. return n
  292. }
  293. func (s *store) delete(key []byte, mainrev int64) bool {
  294. grev := mainrev
  295. if s.currentRev.sub > 0 {
  296. grev += 1
  297. }
  298. rev, _, _, err := s.kvindex.Get(key, grev)
  299. if err != nil {
  300. // key not exist
  301. return false
  302. }
  303. tx := s.b.BatchTx()
  304. tx.Lock()
  305. defer tx.Unlock()
  306. revbytes := newRevBytes()
  307. revToBytes(rev, revbytes)
  308. _, vs := tx.UnsafeRange(keyBucketName, revbytes, nil, 0)
  309. if len(vs) != 1 {
  310. log.Fatalf("storage: delete cannot find rev (%d,%d)", rev.main, rev.sub)
  311. }
  312. e := &storagepb.Event{}
  313. if err := e.Unmarshal(vs[0]); err != nil {
  314. log.Fatalf("storage: cannot unmarshal event: %v", err)
  315. }
  316. if e.Type == storagepb.DELETE {
  317. return false
  318. }
  319. ibytes := newRevBytes()
  320. revToBytes(reversion{main: mainrev, sub: s.currentRev.sub}, ibytes)
  321. event := storagepb.Event{
  322. Type: storagepb.DELETE,
  323. Kv: storagepb.KeyValue{
  324. Key: key,
  325. },
  326. }
  327. d, err := event.Marshal()
  328. if err != nil {
  329. log.Fatalf("storage: cannot marshal event: %v", err)
  330. }
  331. tx.UnsafePut(keyBucketName, ibytes, d)
  332. err = s.kvindex.Tombstone(key, reversion{main: mainrev, sub: s.currentRev.sub})
  333. if err != nil {
  334. log.Fatalf("storage: cannot tombstone an existing key (%s): %v", string(key), err)
  335. }
  336. s.currentRev.sub += 1
  337. return true
  338. }