kv.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. package storage
  2. import (
  3. "encoding/binary"
  4. "log"
  5. "sync"
  6. "time"
  7. "github.com/coreos/etcd/storage/backend"
  8. "github.com/coreos/etcd/storage/storagepb"
  9. )
  10. var (
  11. batchLimit = 10000
  12. batchInterval = 100 * time.Millisecond
  13. keyBucketName = []byte("key")
  14. )
  15. type store struct {
  16. // read operation MUST hold read lock
  17. // write opeartion MUST hold write lock
  18. sync.RWMutex
  19. b backend.Backend
  20. kvindex index
  21. currentIndex uint64
  22. marshalBuf []byte // buffer for marshal protobuf
  23. }
  24. func newStore(path string) *store {
  25. s := &store{
  26. b: backend.New(path, batchInterval, batchLimit),
  27. kvindex: newTreeIndex(),
  28. currentIndex: 0,
  29. marshalBuf: make([]byte, 1024*1024),
  30. }
  31. tx := s.b.BatchTx()
  32. tx.Lock()
  33. tx.UnsafeCreateBucket(keyBucketName)
  34. tx.Unlock()
  35. s.b.ForceCommit()
  36. return s
  37. }
  38. func (s *store) Put(key, value []byte) {
  39. s.Lock()
  40. defer s.Unlock()
  41. currentIndex := s.currentIndex + 1
  42. ibytes := make([]byte, 8)
  43. binary.BigEndian.PutUint64(ibytes, currentIndex)
  44. tx := s.b.BatchTx()
  45. tx.Lock()
  46. defer tx.Unlock()
  47. s.currentIndex = currentIndex
  48. event := storagepb.Event{
  49. Type: storagepb.PUT,
  50. Kv: storagepb.KeyValue{
  51. Key: key,
  52. Value: value,
  53. },
  54. }
  55. var (
  56. d []byte
  57. err error
  58. n int
  59. )
  60. if event.Size() < len(s.marshalBuf) {
  61. n, err = event.MarshalTo(s.marshalBuf)
  62. d = s.marshalBuf[:n]
  63. } else {
  64. d, err = event.Marshal()
  65. }
  66. if err != nil {
  67. log.Fatalf("storage: cannot marshal event: %v", err)
  68. }
  69. tx.UnsafePut(keyBucketName, ibytes, d)
  70. s.kvindex.Put(key, currentIndex)
  71. }
  72. func (s *store) Get(key []byte) []byte {
  73. s.RLock()
  74. defer s.RUnlock()
  75. index, err := s.kvindex.Get(key, s.currentIndex)
  76. if err != nil {
  77. return nil
  78. }
  79. ibytes := make([]byte, 8)
  80. binary.BigEndian.PutUint64(ibytes, index)
  81. tx := s.b.BatchTx()
  82. tx.Lock()
  83. defer tx.Unlock()
  84. vs := tx.UnsafeRange(keyBucketName, ibytes, nil, 0)
  85. // TODO: the value will be an event type.
  86. // TODO: copy out the bytes, decode it, return the value.
  87. return vs[0]
  88. }
  89. func (s *store) Delete(key []byte) error {
  90. s.Lock()
  91. defer s.Unlock()
  92. _, err := s.kvindex.Get(key, s.currentIndex)
  93. if err != nil {
  94. return nil
  95. }
  96. currentIndex := s.currentIndex + 1
  97. ibytes := make([]byte, 8)
  98. binary.BigEndian.PutUint64(ibytes, currentIndex)
  99. tx := s.b.BatchTx()
  100. tx.Lock()
  101. defer tx.Unlock()
  102. // TODO: the value will be an event type.
  103. // A tombstone is simple a "Delete" type event.
  104. tx.UnsafePut(keyBucketName, key, []byte("tombstone"))
  105. return s.kvindex.Tombstone(key, currentIndex)
  106. }