kvstore.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383
  1. package storage
  2. import (
  3. "errors"
  4. "io"
  5. "log"
  6. "math"
  7. "math/rand"
  8. "sync"
  9. "time"
  10. "github.com/coreos/etcd/storage/backend"
  11. "github.com/coreos/etcd/storage/storagepb"
  12. )
  13. var (
  14. batchLimit = 10000
  15. batchInterval = 100 * time.Millisecond
  16. keyBucketName = []byte("key")
  17. scheduledCompactKeyName = []byte("scheduledCompactRev")
  18. finishedCompactKeyName = []byte("finishedCompactRev")
  19. ErrTnxIDMismatch = errors.New("storage: tnx id mismatch")
  20. ErrCompacted = errors.New("storage: required reversion has been compacted")
  21. )
  22. type store struct {
  23. mu sync.RWMutex
  24. b backend.Backend
  25. kvindex index
  26. currentRev reversion
  27. // the main reversion of the last compaction
  28. compactMainRev int64
  29. tmu sync.Mutex // protect the tnxID field
  30. tnxID int64 // tracks the current tnxID to verify tnx operations
  31. }
  32. func newStore(path string) *store {
  33. s := &store{
  34. b: backend.New(path, batchInterval, batchLimit),
  35. kvindex: newTreeIndex(),
  36. currentRev: reversion{},
  37. compactMainRev: -1,
  38. }
  39. tx := s.b.BatchTx()
  40. tx.Lock()
  41. tx.UnsafeCreateBucket(keyBucketName)
  42. tx.Unlock()
  43. s.b.ForceCommit()
  44. return s
  45. }
  46. func (s *store) Put(key, value []byte) int64 {
  47. id := s.TnxBegin()
  48. s.put(key, value, s.currentRev.main+1)
  49. s.TnxEnd(id)
  50. return int64(s.currentRev.main)
  51. }
  52. func (s *store) Range(key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) {
  53. id := s.TnxBegin()
  54. kvs, rev, err = s.rangeKeys(key, end, limit, rangeRev)
  55. s.TnxEnd(id)
  56. return kvs, rev, err
  57. }
  58. func (s *store) DeleteRange(key, end []byte) (n, rev int64) {
  59. id := s.TnxBegin()
  60. n = s.deleteRange(key, end, s.currentRev.main+1)
  61. s.TnxEnd(id)
  62. return n, int64(s.currentRev.main)
  63. }
  64. func (s *store) TnxBegin() int64 {
  65. s.mu.Lock()
  66. s.currentRev.sub = 0
  67. s.tmu.Lock()
  68. defer s.tmu.Unlock()
  69. s.tnxID = rand.Int63()
  70. return s.tnxID
  71. }
  72. func (s *store) TnxEnd(tnxID int64) error {
  73. s.tmu.Lock()
  74. defer s.tmu.Unlock()
  75. if tnxID != s.tnxID {
  76. return ErrTnxIDMismatch
  77. }
  78. if s.currentRev.sub != 0 {
  79. s.currentRev.main += 1
  80. }
  81. s.currentRev.sub = 0
  82. s.mu.Unlock()
  83. return nil
  84. }
  85. func (s *store) TnxRange(tnxID int64, key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) {
  86. s.tmu.Lock()
  87. defer s.tmu.Unlock()
  88. if tnxID != s.tnxID {
  89. return nil, 0, ErrTnxIDMismatch
  90. }
  91. return s.rangeKeys(key, end, limit, rangeRev)
  92. }
  93. func (s *store) TnxPut(tnxID int64, key, value []byte) (rev int64, err error) {
  94. s.tmu.Lock()
  95. defer s.tmu.Unlock()
  96. if tnxID != s.tnxID {
  97. return 0, ErrTnxIDMismatch
  98. }
  99. s.put(key, value, s.currentRev.main+1)
  100. return int64(s.currentRev.main + 1), nil
  101. }
  102. func (s *store) TnxDeleteRange(tnxID int64, key, end []byte) (n, rev int64, err error) {
  103. s.tmu.Lock()
  104. defer s.tmu.Unlock()
  105. if tnxID != s.tnxID {
  106. return 0, 0, ErrTnxIDMismatch
  107. }
  108. n = s.deleteRange(key, end, s.currentRev.main+1)
  109. if n != 0 || s.currentRev.sub != 0 {
  110. rev = int64(s.currentRev.main + 1)
  111. }
  112. return n, rev, nil
  113. }
  114. func (s *store) Compact(rev int64) error {
  115. s.mu.Lock()
  116. defer s.mu.Unlock()
  117. if rev <= s.compactMainRev {
  118. return ErrCompacted
  119. }
  120. s.compactMainRev = rev
  121. rbytes := newRevBytes()
  122. revToBytes(reversion{main: rev}, rbytes)
  123. tx := s.b.BatchTx()
  124. tx.Lock()
  125. tx.UnsafePut(keyBucketName, scheduledCompactKeyName, rbytes)
  126. tx.Unlock()
  127. keep := s.kvindex.Compact(rev)
  128. go s.scheduleCompaction(rev, keep)
  129. return nil
  130. }
  131. func (s *store) Snapshot(w io.Writer) (int64, error) {
  132. s.b.ForceCommit()
  133. return s.b.Snapshot(w)
  134. }
  135. func (s *store) Restore() error {
  136. s.mu.Lock()
  137. defer s.mu.Unlock()
  138. min, max := newRevBytes(), newRevBytes()
  139. revToBytes(reversion{}, min)
  140. revToBytes(reversion{main: math.MaxInt64, sub: math.MaxInt64}, max)
  141. // restore index
  142. tx := s.b.BatchTx()
  143. tx.Lock()
  144. _, finishedCompactBytes := tx.UnsafeRange(keyBucketName, finishedCompactKeyName, nil, 0)
  145. if len(finishedCompactBytes) != 0 {
  146. s.compactMainRev = bytesToRev(finishedCompactBytes[0]).main
  147. log.Printf("storage: restore compact to %d", s.compactMainRev)
  148. }
  149. // TODO: limit N to reduce max memory usage
  150. keys, vals := tx.UnsafeRange(keyBucketName, min, max, 0)
  151. for i, key := range keys {
  152. e := &storagepb.Event{}
  153. if err := e.Unmarshal(vals[i]); err != nil {
  154. log.Fatalf("storage: cannot unmarshal event: %v", err)
  155. }
  156. rev := bytesToRev(key)
  157. // restore index
  158. switch e.Type {
  159. case storagepb.PUT:
  160. s.kvindex.Put(e.Kv.Key, rev)
  161. case storagepb.DELETE:
  162. s.kvindex.Tombstone(e.Kv.Key, rev)
  163. default:
  164. log.Panicf("storage: unexpected event type %s", e.Type)
  165. }
  166. // update reversion
  167. s.currentRev = rev
  168. }
  169. _, scheduledCompactBytes := tx.UnsafeRange(keyBucketName, scheduledCompactKeyName, nil, 0)
  170. if len(scheduledCompactBytes) != 0 {
  171. scheduledCompact := bytesToRev(finishedCompactBytes[0]).main
  172. if scheduledCompact > s.compactMainRev {
  173. log.Printf("storage: resume scheduled compaction at %d", scheduledCompact)
  174. go s.Compact(scheduledCompact)
  175. }
  176. }
  177. tx.Unlock()
  178. return nil
  179. }
  180. func (s *store) Close() error {
  181. return s.b.Close()
  182. }
  183. func (a *store) Equal(b *store) bool {
  184. if a.currentRev != b.currentRev {
  185. return false
  186. }
  187. if a.compactMainRev != b.compactMainRev {
  188. return false
  189. }
  190. return a.kvindex.Equal(b.kvindex)
  191. }
  192. // range is a keyword in Go, add Keys suffix.
  193. func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) {
  194. if rangeRev <= 0 {
  195. rev = int64(s.currentRev.main)
  196. if s.currentRev.sub > 0 {
  197. rev += 1
  198. }
  199. } else {
  200. rev = rangeRev
  201. }
  202. if rev <= s.compactMainRev {
  203. return nil, 0, ErrCompacted
  204. }
  205. _, revpairs := s.kvindex.Range(key, end, int64(rev))
  206. if len(revpairs) == 0 {
  207. return nil, rev, nil
  208. }
  209. if limit > 0 && len(revpairs) > int(limit) {
  210. revpairs = revpairs[:limit]
  211. }
  212. tx := s.b.BatchTx()
  213. tx.Lock()
  214. defer tx.Unlock()
  215. for _, revpair := range revpairs {
  216. revbytes := newRevBytes()
  217. revToBytes(revpair, revbytes)
  218. _, vs := tx.UnsafeRange(keyBucketName, revbytes, nil, 0)
  219. if len(vs) != 1 {
  220. log.Fatalf("storage: range cannot find rev (%d,%d)", revpair.main, revpair.sub)
  221. }
  222. e := &storagepb.Event{}
  223. if err := e.Unmarshal(vs[0]); err != nil {
  224. log.Fatalf("storage: cannot unmarshal event: %v", err)
  225. }
  226. if e.Type == storagepb.PUT {
  227. kvs = append(kvs, e.Kv)
  228. }
  229. }
  230. return kvs, rev, nil
  231. }
  232. func (s *store) put(key, value []byte, rev int64) {
  233. ibytes := newRevBytes()
  234. revToBytes(reversion{main: rev, sub: s.currentRev.sub}, ibytes)
  235. event := storagepb.Event{
  236. Type: storagepb.PUT,
  237. Kv: storagepb.KeyValue{
  238. Key: key,
  239. Value: value,
  240. },
  241. }
  242. d, err := event.Marshal()
  243. if err != nil {
  244. log.Fatalf("storage: cannot marshal event: %v", err)
  245. }
  246. tx := s.b.BatchTx()
  247. tx.Lock()
  248. defer tx.Unlock()
  249. tx.UnsafePut(keyBucketName, ibytes, d)
  250. s.kvindex.Put(key, reversion{main: rev, sub: s.currentRev.sub})
  251. s.currentRev.sub += 1
  252. }
  253. func (s *store) deleteRange(key, end []byte, rev int64) int64 {
  254. var n int64
  255. rrev := rev
  256. if s.currentRev.sub > 0 {
  257. rrev += 1
  258. }
  259. keys, _ := s.kvindex.Range(key, end, rrev)
  260. if len(keys) == 0 {
  261. return 0
  262. }
  263. for _, key := range keys {
  264. ok := s.delete(key, rev)
  265. if ok {
  266. n++
  267. }
  268. }
  269. return n
  270. }
  271. func (s *store) delete(key []byte, mainrev int64) bool {
  272. grev := mainrev
  273. if s.currentRev.sub > 0 {
  274. grev += 1
  275. }
  276. rev, err := s.kvindex.Get(key, grev)
  277. if err != nil {
  278. // key not exist
  279. return false
  280. }
  281. tx := s.b.BatchTx()
  282. tx.Lock()
  283. defer tx.Unlock()
  284. revbytes := newRevBytes()
  285. revToBytes(rev, revbytes)
  286. _, vs := tx.UnsafeRange(keyBucketName, revbytes, nil, 0)
  287. if len(vs) != 1 {
  288. log.Fatalf("storage: delete cannot find rev (%d,%d)", rev.main, rev.sub)
  289. }
  290. e := &storagepb.Event{}
  291. if err := e.Unmarshal(vs[0]); err != nil {
  292. log.Fatalf("storage: cannot unmarshal event: %v", err)
  293. }
  294. if e.Type == storagepb.DELETE {
  295. return false
  296. }
  297. ibytes := newRevBytes()
  298. revToBytes(reversion{main: mainrev, sub: s.currentRev.sub}, ibytes)
  299. event := storagepb.Event{
  300. Type: storagepb.DELETE,
  301. Kv: storagepb.KeyValue{
  302. Key: key,
  303. },
  304. }
  305. d, err := event.Marshal()
  306. if err != nil {
  307. log.Fatalf("storage: cannot marshal event: %v", err)
  308. }
  309. tx.UnsafePut(keyBucketName, ibytes, d)
  310. err = s.kvindex.Tombstone(key, reversion{main: mainrev, sub: s.currentRev.sub})
  311. if err != nil {
  312. log.Fatalf("storage: cannot tombstone an existing key (%s): %v", string(key), err)
  313. }
  314. s.currentRev.sub += 1
  315. return true
  316. }