kv.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. package storage
  2. import (
  3. "encoding/binary"
  4. "log"
  5. "sync"
  6. "time"
  7. "github.com/coreos/etcd/storage/backend"
  8. "github.com/coreos/etcd/storage/storagepb"
  9. )
  10. var (
  11. batchLimit = 10000
  12. batchInterval = 100 * time.Millisecond
  13. keyBucketName = []byte("key")
  14. )
  15. type KV interface {
  16. // Range gets the keys in the range at rangeIndex.
  17. // If rangeIndex <=0, range gets the keys at currentIndex.
  18. // If `end` is nil, the request returns the key.
  19. // If `end` is not nil, it gets the keys in range [key, range_end).
  20. // Limit limits the number of keys returned.
  21. Range(key, end []byte, limit, rangeIndex int64) (kvs []storagepb.KeyValue, index int64)
  22. // Put puts the given key,value into the store.
  23. // A put also increases the index of the store, and generates one event in the event history.
  24. Put(key, value []byte) (index int64)
  25. // DeleteRange deletes the given range from the store.
  26. // A deleteRange increases the index of the store if any key in the range exists.
  27. // The number of key deleted will be returned.
  28. // It also generates one event for each key delete in the event history.
  29. // if the `end` is nil, deleteRange deletes the key.
  30. // if the `end` is not nil, deleteRange deletes the keys in range [key, range_end).
  31. DeleteRange(key, end []byte) (n, index int64)
  32. // TnxBegin begins a tnx. Only Tnx prefixed operation can be executed, others will be blocked
  33. // until tnx ends. Only one on-going tnx is allowed.
  34. TnxBegin()
  35. // TnxEnd ends the on-going tnx.
  36. TnxEnd()
  37. TnxRange(key, end []byte, limit, rangeIndex int64) (kvs []storagepb.KeyValue, index int64)
  38. TnxPut(key, value []byte) (index int64)
  39. TnxDeleteRange(key, end []byte) (n, index int64)
  40. }
  41. type store struct {
  42. // read operation MUST hold read lock
  43. // write opeartion MUST hold write lock
  44. // tnx operation MUST hold write lock
  45. sync.RWMutex
  46. b backend.Backend
  47. kvindex index
  48. currentIndex uint64
  49. marshalBuf []byte // buffer for marshal protobuf
  50. }
  51. func newStore(path string) *store {
  52. s := &store{
  53. b: backend.New(path, batchInterval, batchLimit),
  54. kvindex: newTreeIndex(),
  55. currentIndex: 0,
  56. marshalBuf: make([]byte, 1024*1024),
  57. }
  58. tx := s.b.BatchTx()
  59. tx.Lock()
  60. tx.UnsafeCreateBucket(keyBucketName)
  61. tx.Unlock()
  62. s.b.ForceCommit()
  63. return s
  64. }
  65. func (s *store) Put(key, value []byte) {
  66. s.Lock()
  67. defer s.Unlock()
  68. currentIndex := s.currentIndex + 1
  69. ibytes := make([]byte, 8)
  70. binary.BigEndian.PutUint64(ibytes, currentIndex)
  71. tx := s.b.BatchTx()
  72. tx.Lock()
  73. defer tx.Unlock()
  74. s.currentIndex = currentIndex
  75. event := storagepb.Event{
  76. Type: storagepb.PUT,
  77. Kv: storagepb.KeyValue{
  78. Key: key,
  79. Value: value,
  80. },
  81. }
  82. var (
  83. d []byte
  84. err error
  85. n int
  86. )
  87. if event.Size() < len(s.marshalBuf) {
  88. n, err = event.MarshalTo(s.marshalBuf)
  89. d = s.marshalBuf[:n]
  90. } else {
  91. d, err = event.Marshal()
  92. }
  93. if err != nil {
  94. log.Fatalf("storage: cannot marshal event: %v", err)
  95. }
  96. tx.UnsafePut(keyBucketName, ibytes, d)
  97. s.kvindex.Put(key, currentIndex)
  98. }
  99. func (s *store) Get(key []byte) []byte {
  100. s.RLock()
  101. defer s.RUnlock()
  102. index, err := s.kvindex.Get(key, s.currentIndex)
  103. if err != nil {
  104. return nil
  105. }
  106. ibytes := make([]byte, 8)
  107. binary.BigEndian.PutUint64(ibytes, index)
  108. tx := s.b.BatchTx()
  109. tx.Lock()
  110. defer tx.Unlock()
  111. vs := tx.UnsafeRange(keyBucketName, ibytes, nil, 0)
  112. // TODO: the value will be an event type.
  113. // TODO: copy out the bytes, decode it, return the value.
  114. return vs[0]
  115. }
  116. func (s *store) Delete(key []byte) error {
  117. s.Lock()
  118. defer s.Unlock()
  119. _, err := s.kvindex.Get(key, s.currentIndex)
  120. if err != nil {
  121. return nil
  122. }
  123. currentIndex := s.currentIndex + 1
  124. ibytes := make([]byte, 8)
  125. binary.BigEndian.PutUint64(ibytes, currentIndex)
  126. tx := s.b.BatchTx()
  127. tx.Lock()
  128. defer tx.Unlock()
  129. // TODO: the value will be an event type.
  130. // A tombstone is simple a "Delete" type event.
  131. tx.UnsafePut(keyBucketName, key, []byte("tombstone"))
  132. return s.kvindex.Tombstone(key, currentIndex)
  133. }