Browse Source

storage: address barak's comments

Xiang Li 10 years ago
parent
commit
e332e86b5d
5 changed files with 65 additions and 86 deletions
  1. 3 27
      storage/backend/backend.go
  2. 0 32
      storage/backend/backend_test.go
  3. 25 11
      storage/backend/batch_tx.go
  4. 5 0
      storage/key_index.go
  5. 32 16
      storage/kv.go

+ 3 - 27
storage/backend/backend.go

@@ -57,17 +57,13 @@ func (b *backend) BatchTx() BatchTx {
 
 // force commit the current batching tx.
 func (b *backend) ForceCommit() {
-	b.batchTx.Lock()
-	b.commitAndBegin()
-	b.batchTx.Unlock()
+	b.batchTx.Commit()
 }
 
 func (b *backend) run() {
 	defer close(b.donec)
 
-	b.batchTx.Lock()
-	b.commitAndBegin()
-	b.batchTx.Unlock()
+	b.batchTx.Commit()
 	b.startc <- struct{}{}
 
 	for {
@@ -76,9 +72,7 @@ func (b *backend) run() {
 		case <-b.stopc:
 			return
 		}
-		b.batchTx.Lock()
-		b.commitAndBegin()
-		b.batchTx.Unlock()
+		b.batchTx.Commit()
 	}
 }
 
@@ -87,21 +81,3 @@ func (b *backend) Close() error {
 	<-b.donec
 	return b.db.Close()
 }
-
-// commitAndBegin commits a previous tx and begins a new writable one.
-func (b *backend) commitAndBegin() {
-	var err error
-	// commit the last batchTx
-	if b.batchTx.tx != nil {
-		err = b.batchTx.tx.Commit()
-		if err != nil {
-			log.Fatalf("storage: cannot commit tx (%s)", err)
-		}
-	}
-
-	// begin a new tx
-	b.batchTx.tx, err = b.db.Begin(true)
-	if err != nil {
-		log.Fatalf("storage: cannot begin tx (%s)", err)
-	}
-}

+ 0 - 32
storage/backend/backend_test.go

@@ -27,35 +27,3 @@ func TestBackendPut(t *testing.T) {
 
 	batchTx.Unlock()
 }
-
-func TestBackendForceCommit(t *testing.T) {
-	backend := New("test", 10*time.Second, 10000)
-	defer backend.Close()
-	defer os.Remove("test")
-
-	v := []byte("foo")
-	batchTx := backend.BatchTx()
-
-	batchTx.Lock()
-
-	batchTx.UnsafeCreateBucket([]byte("test"))
-	batchTx.UnsafePut([]byte("test"), []byte("foo"), v)
-
-	batchTx.Unlock()
-
-	// expect to see nothing that the batch tx created
-	tx := backend.ReadTnx()
-	gbucket := tx.Bucket([]byte("test"))
-	if gbucket != nil {
-		t.Errorf("readtx.bu = %p, want nil", gbucket)
-	}
-	tx.Commit()
-
-	// commit batch tx
-	backend.ForceCommit()
-	tx = backend.ReadTnx()
-	gbucket = tx.Bucket([]byte("test"))
-	if gbucket == nil {
-		t.Errorf("readtx.bu = nil, want not nil")
-	}
-}

+ 25 - 11
storage/backend/batch_tx.go

@@ -15,23 +15,16 @@ type BatchTx interface {
 	UnsafePut(bucketName []byte, key []byte, value []byte)
 	UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) [][]byte
 	UnsafeDelete(bucketName []byte, key []byte)
+	Commit()
 }
 
 type batchTx struct {
-	mu      sync.Mutex
+	sync.Mutex
 	tx      *bolt.Tx
 	backend *backend
 	pending int
 }
 
-func (t *batchTx) Lock() {
-	t.mu.Lock()
-}
-
-func (t *batchTx) Unlock() {
-	t.mu.Unlock()
-}
-
 func (t *batchTx) UnsafeCreateBucket(name []byte) {
 	_, err := t.tx.CreateBucket(name)
 	if err != nil && err != bolt.ErrBucketExists {
@@ -50,7 +43,7 @@ func (t *batchTx) UnsafePut(bucketName []byte, key []byte, value []byte) {
 	}
 	t.pending++
 	if t.pending > t.backend.batchLimit {
-		t.backend.commitAndBegin()
+		t.Commit()
 		t.pending = 0
 	}
 }
@@ -92,7 +85,28 @@ func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) {
 	}
 	t.pending++
 	if t.pending > t.backend.batchLimit {
-		t.backend.commitAndBegin()
+		t.Commit()
 		t.pending = 0
 	}
 }
+
+// commitAndBegin commits a previous tx and begins a new writable one.
+func (t *batchTx) Commit() {
+	t.Lock()
+	defer t.Unlock()
+
+	var err error
+	// commit the last tx
+	if t.tx != nil {
+		err = t.tx.Commit()
+		if err != nil {
+			log.Fatalf("storage: cannot commit tx (%s)", err)
+		}
+	}
+
+	// begin a new tx
+	t.tx, err = t.backend.db.Begin(true)
+	if err != nil {
+		log.Fatalf("storage: cannot begin tx (%s)", err)
+	}
+}

+ 5 - 0
storage/key_index.go

@@ -186,6 +186,11 @@ type generation struct {
 
 func (g *generation) isEmpty() bool { return len(g.cont) == 0 }
 
+// walk walks through the (index, version) pairs in the generation in ascending order.
+// It passes the (index, version) to the given function.
+// walk returns until: 1. it finishs walking all pairs 2. the function returns false.
+// walk returns the (index, version) pair at where it stopped. If it stopped after
+// finishing walking, (0, -1) will be returned.
 func (g *generation) walk(f func(index, ver uint64) bool) (uint64, int) {
 	ver := g.ver
 	l := len(g.cont)

+ 32 - 16
storage/kv.go

@@ -3,6 +3,7 @@ package storage
 import (
 	"encoding/binary"
 	"log"
+	"sync"
 	"time"
 
 	"github.com/coreos/etcd/storage/backend"
@@ -16,19 +17,23 @@ var (
 )
 
 type store struct {
+	// read operation MUST hold read lock
+	// write opeartion MUST hold write lock
+	sync.RWMutex
+
 	b       backend.Backend
 	kvindex index
 
-	now        uint64 // current index of the store
-	marshalBuf []byte // buffer for marshal protobuf
+	currentIndex uint64
+	marshalBuf   []byte // buffer for marshal protobuf
 }
 
 func newStore(path string) *store {
 	s := &store{
-		b:          backend.New(path, batchInterval, batchLimit),
-		kvindex:    newTreeIndex(),
-		now:        0,
-		marshalBuf: make([]byte, 1024*1024),
+		b:            backend.New(path, batchInterval, batchLimit),
+		kvindex:      newTreeIndex(),
+		currentIndex: 0,
+		marshalBuf:   make([]byte, 1024*1024),
 	}
 
 	tx := s.b.BatchTx()
@@ -41,16 +46,18 @@ func newStore(path string) *store {
 }
 
 func (s *store) Put(key, value []byte) {
-	now := s.now + 1
+	s.Lock()
+	defer s.Unlock()
+
+	currentIndex := s.currentIndex + 1
 
-	s.kvindex.Put(key, now)
 	ibytes := make([]byte, 8)
-	binary.BigEndian.PutUint64(ibytes, now)
+	binary.BigEndian.PutUint64(ibytes, currentIndex)
 
 	tx := s.b.BatchTx()
 	tx.Lock()
 	defer tx.Unlock()
-	s.now = now
+	s.currentIndex = currentIndex
 
 	event := storagepb.Event{
 		Type: storagepb.PUT,
@@ -77,10 +84,15 @@ func (s *store) Put(key, value []byte) {
 	}
 
 	tx.UnsafePut(keyBucketName, ibytes, d)
+
+	s.kvindex.Put(key, currentIndex)
 }
 
 func (s *store) Get(key []byte) []byte {
-	index, err := s.kvindex.Get(key, s.now)
+	s.RLock()
+	defer s.RUnlock()
+
+	index, err := s.kvindex.Get(key, s.currentIndex)
 	if err != nil {
 		return nil
 	}
@@ -97,20 +109,24 @@ func (s *store) Get(key []byte) []byte {
 }
 
 func (s *store) Delete(key []byte) error {
-	now := s.now + 1
+	s.Lock()
+	defer s.Unlock()
 
-	err := s.kvindex.Tombstone(key, now)
+	_, err := s.kvindex.Get(key, s.currentIndex)
 	if err != nil {
-		return err
+		return nil
 	}
 
+	currentIndex := s.currentIndex + 1
+
 	ibytes := make([]byte, 8)
-	binary.BigEndian.PutUint64(ibytes, now)
+	binary.BigEndian.PutUint64(ibytes, currentIndex)
 	tx := s.b.BatchTx()
 	tx.Lock()
 	defer tx.Unlock()
 	// TODO: the value will be an event type.
 	// A tombstone is simple a "Delete" type event.
 	tx.UnsafePut(keyBucketName, key, []byte("tombstone"))
-	return nil
+
+	return s.kvindex.Tombstone(key, currentIndex)
 }