Browse Source

storage: initial compact

Xiang Li 10 năm trước cách đây
mục cha
commit
f47ed4a364

+ 6 - 7
storage/backend/batch_tx.go

@@ -13,7 +13,7 @@ type BatchTx interface {
 	Unlock()
 	UnsafeCreateBucket(name []byte)
 	UnsafePut(bucketName []byte, key []byte, value []byte)
-	UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) [][]byte
+	UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)
 	UnsafeDelete(bucketName []byte, key []byte)
 	Commit()
 }
@@ -49,28 +49,27 @@ func (t *batchTx) UnsafePut(bucketName []byte, key []byte, value []byte) {
 }
 
 // before calling unsafeRange, the caller MUST hold the lock on tnx.
-func (t *batchTx) UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) [][]byte {
+func (t *batchTx) UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte) {
 	bucket := t.tx.Bucket(bucketName)
 	if bucket == nil {
 		log.Fatalf("storage: bucket %s does not exist", string(bucketName))
 	}
 
-	var vs [][]byte
-
 	if len(endKey) == 0 {
 		if v := bucket.Get(key); v == nil {
-			return vs
+			return keys, vs
 		} else {
-			return append(vs, v)
+			return append(keys, key), append(vs, v)
 		}
 	}
 
 	c := bucket.Cursor()
 	for ck, cv := c.Seek(key); ck != nil && bytes.Compare(ck, endKey) < 0; ck, cv = c.Next() {
 		vs = append(vs, cv)
+		keys = append(keys, ck)
 	}
 
-	return vs
+	return keys, vs
 }
 
 // before calling unsafeDelete, the caller MUST hold the lock on tnx.

+ 1 - 6
storage/key_index.go

@@ -10,7 +10,7 @@ import (
 )
 
 var (
-	ErrReversionNotFound = errors.New("stroage: Reversion not found")
+	ErrReversionNotFound = errors.New("stroage: reversion not found")
 )
 
 // keyIndex stores the reversion of an key in the backend.
@@ -200,11 +200,6 @@ type generation struct {
 	revs []reversion
 }
 
-type reversion struct {
-	main int64
-	sub  int64
-}
-
 func (g *generation) isEmpty() bool { return g == nil || len(g.revs) == 0 }
 
 // walk walks through the reversions in the generation in ascending order.

+ 4 - 1
storage/kv.go

@@ -8,7 +8,8 @@ type KV interface {
 	// If `end` is nil, the request returns the key.
 	// If `end` is not nil, it gets the keys in range [key, range_end).
 	// Limit limits the number of keys returned.
-	Range(key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64)
+	// If the required rev is compacted, ErrCompacted will be returned.
+	Range(key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error)
 
 	// Put puts the given key,value into the store.
 	// A put also increases the rev of the store, and generates one event in the event history.
@@ -32,4 +33,6 @@ type KV interface {
 	TnxRange(tnxID int64, key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error)
 	TnxPut(tnxID int64, key, value []byte) (rev int64, err error)
 	TnxDeleteRange(tnxID int64, key, end []byte) (n, rev int64, err error)
+
+	Compact(rev int64) error
 }

+ 55 - 30
storage/kvstore.go

@@ -1,7 +1,6 @@
 package storage
 
 import (
-	"encoding/binary"
 	"errors"
 	"log"
 	"math/rand"
@@ -17,7 +16,11 @@ var (
 	batchInterval = 100 * time.Millisecond
 	keyBucketName = []byte("key")
 
+	scheduledCompactKeyName = []byte("scheduledCompactRev")
+	finishedCompactKeyName  = []byte("finishedCompactRev")
+
 	ErrTnxIDMismatch = errors.New("storage: tnx id mismatch")
+	ErrCompacted     = errors.New("storage: required reversion has been compacted")
 )
 
 type store struct {
@@ -27,6 +30,8 @@ type store struct {
 	kvindex index
 
 	currentRev reversion
+	// the main reversion of the last compaction
+	compactMainRev int64
 
 	tmu   sync.Mutex // protect the tnxID field
 	tnxID int64      // tracks the current tnxID to verify tnx operations
@@ -34,9 +39,10 @@ type store struct {
 
 func newStore(path string) KV {
 	s := &store{
-		b:          backend.New(path, batchInterval, batchLimit),
-		kvindex:    newTreeIndex(),
-		currentRev: reversion{},
+		b:              backend.New(path, batchInterval, batchLimit),
+		kvindex:        newTreeIndex(),
+		currentRev:     reversion{},
+		compactMainRev: -1,
 	}
 
 	tx := s.b.BatchTx()
@@ -56,12 +62,12 @@ func (s *store) Put(key, value []byte) int64 {
 	return int64(s.currentRev.main)
 }
 
-func (s *store) Range(key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64) {
+func (s *store) Range(key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) {
 	id := s.TnxBegin()
-	kvs, rev = s.rangeKeys(key, end, limit, rangeRev)
+	kvs, rev, err = s.rangeKeys(key, end, limit, rangeRev)
 	s.TnxEnd(id)
 
-	return kvs, rev
+	return kvs, rev, err
 }
 
 func (s *store) DeleteRange(key, end []byte) (n, rev int64) {
@@ -103,8 +109,7 @@ func (s *store) TnxRange(tnxID int64, key, end []byte, limit, rangeRev int64) (k
 	if tnxID != s.tnxID {
 		return nil, 0, ErrTnxIDMismatch
 	}
-	kvs, rev = s.rangeKeys(key, end, limit, rangeRev)
-	return kvs, rev, nil
+	return s.rangeKeys(key, end, limit, rangeRev)
 }
 
 func (s *store) TnxPut(tnxID int64, key, value []byte) (rev int64, err error) {
@@ -132,8 +137,31 @@ func (s *store) TnxDeleteRange(tnxID int64, key, end []byte) (n, rev int64, err
 	return n, rev, nil
 }
 
+func (s *store) Compact(rev int64) error {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	if rev <= s.compactMainRev {
+		return ErrCompacted
+	}
+
+	s.compactMainRev = rev
+
+	rbytes := make([]byte, 8+1+8)
+	revToBytes(reversion{main: rev}, rbytes)
+
+	tx := s.b.BatchTx()
+	tx.Lock()
+	tx.UnsafePut(keyBucketName, scheduledCompactKeyName, rbytes)
+	tx.Unlock()
+
+	keep := s.kvindex.Compact(rev)
+
+	go s.scheduleCompaction(rev, keep)
+	return nil
+}
+
 // range is a keyword in Go, add Keys suffix.
-func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64) {
+func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) {
 	if rangeRev <= 0 {
 		rev = int64(s.currentRev.main)
 		if s.currentRev.sub > 0 {
@@ -142,25 +170,28 @@ func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64) (kvs []storage
 	} else {
 		rev = rangeRev
 	}
+	if rev <= s.compactMainRev {
+		return nil, 0, ErrCompacted
+	}
 
-	_, revs := s.kvindex.Range(key, end, int64(rev))
-	if len(revs) == 0 {
-		return nil, rev
+	_, revpairs := s.kvindex.Range(key, end, int64(rev))
+	if len(revpairs) == 0 {
+		return nil, rev, nil
 	}
-	if limit > 0 && len(revs) > int(limit) {
-		revs = revs[:limit]
+	if limit > 0 && len(revpairs) > int(limit) {
+		revpairs = revpairs[:limit]
 	}
 
 	tx := s.b.BatchTx()
 	tx.Lock()
 	defer tx.Unlock()
-	for _, rev := range revs {
+	for _, revpair := range revpairs {
 		revbytes := make([]byte, 8+1+8)
-		revToBytes(rev.main, rev.sub, revbytes)
+		revToBytes(revpair, revbytes)
 
-		vs := tx.UnsafeRange(keyBucketName, revbytes, nil, 0)
+		_, vs := tx.UnsafeRange(keyBucketName, revbytes, nil, 0)
 		if len(vs) != 1 {
-			log.Fatalf("storage: range cannot find rev (%d,%d)", rev.main, rev.sub)
+			log.Fatalf("storage: range cannot find rev (%d,%d)", revpair.main, revpair.sub)
 		}
 
 		e := &storagepb.Event{}
@@ -171,12 +202,12 @@ func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64) (kvs []storage
 			kvs = append(kvs, e.Kv)
 		}
 	}
-	return kvs, rev
+	return kvs, rev, nil
 }
 
 func (s *store) put(key, value []byte, rev int64) {
 	ibytes := make([]byte, 8+1+8)
-	revToBytes(rev, s.currentRev.sub, ibytes)
+	revToBytes(reversion{main: rev, sub: s.currentRev.sub}, ibytes)
 
 	event := storagepb.Event{
 		Type: storagepb.PUT,
@@ -236,9 +267,9 @@ func (s *store) delete(key []byte, mainrev int64) bool {
 	defer tx.Unlock()
 
 	revbytes := make([]byte, 8+1+8)
-	revToBytes(rev.main, rev.sub, revbytes)
+	revToBytes(rev, revbytes)
 
-	vs := tx.UnsafeRange(keyBucketName, revbytes, nil, 0)
+	_, vs := tx.UnsafeRange(keyBucketName, revbytes, nil, 0)
 	if len(vs) != 1 {
 		log.Fatalf("storage: delete cannot find rev (%d,%d)", rev.main, rev.sub)
 	}
@@ -252,7 +283,7 @@ func (s *store) delete(key []byte, mainrev int64) bool {
 	}
 
 	ibytes := make([]byte, 8+1+8)
-	revToBytes(mainrev, s.currentRev.sub, ibytes)
+	revToBytes(reversion{main: mainrev, sub: s.currentRev.sub}, ibytes)
 
 	event := storagepb.Event{
 		Type: storagepb.DELETE,
@@ -274,9 +305,3 @@ func (s *store) delete(key []byte, mainrev int64) bool {
 	s.currentRev.sub += 1
 	return true
 }
-
-func revToBytes(main int64, sub int64, bytes []byte) {
-	binary.BigEndian.PutUint64(bytes, uint64(main))
-	bytes[8] = '_'
-	binary.BigEndian.PutUint64(bytes[9:], uint64(sub))
-}

+ 42 - 0
storage/kvstore_compaction.go

@@ -0,0 +1,42 @@
+package storage
+
+import (
+	"encoding/binary"
+	"time"
+)
+
+func (s *store) scheduleCompaction(compactMainRev int64, keep map[reversion]struct{}) {
+	end := make([]byte, 8)
+	binary.BigEndian.PutUint64(end, uint64(compactMainRev+1))
+
+	batchsize := int64(10000)
+	last := make([]byte, 8+1+8)
+	for {
+		var rev reversion
+
+		tx := s.b.BatchTx()
+		tx.Lock()
+
+		keys, _ := tx.UnsafeRange(keyBucketName, last, end, batchsize)
+		for _, key := range keys {
+			rev = bytesToRev(key)
+			if _, ok := keep[rev]; !ok {
+				tx.UnsafeDelete(keyBucketName, key)
+			}
+		}
+
+		if len(keys) == 0 {
+			rbytes := make([]byte, 8+1+8)
+			revToBytes(reversion{main: compactMainRev}, rbytes)
+			tx.UnsafePut(keyBucketName, finishedCompactKeyName, rbytes)
+			tx.Unlock()
+			return
+		}
+
+		// update last
+		revToBytes(reversion{main: rev.main, sub: rev.sub + 1}, last)
+		tx.Unlock()
+
+		time.Sleep(100 * time.Millisecond)
+	}
+}

+ 87 - 6
storage/kvstore_test.go

@@ -1,6 +1,7 @@
 package storage
 
 import (
+	"bytes"
 	"crypto/rand"
 	"os"
 	"testing"
@@ -41,7 +42,10 @@ func TestRange(t *testing.T) {
 	}
 
 	for i, tt := range tests {
-		kvs, rev := s.Range(tt.key, tt.end, 0, tt.rev)
+		kvs, rev, err := s.Range(tt.key, tt.end, 0, tt.rev)
+		if err != nil {
+			t.Fatal(err)
+		}
 		if len(kvs) != int(tt.wN) {
 			t.Errorf("#%d: len(kvs) = %d, want %d", i, len(kvs), tt.wN)
 		}
@@ -110,13 +114,19 @@ func TestRangeInSequence(t *testing.T) {
 	}
 
 	// before removal foo
-	kvs, rev := s.Range([]byte("foo"), []byte("foo3"), 0, 3)
+	kvs, rev, err := s.Range([]byte("foo"), []byte("foo3"), 0, 3)
+	if err != nil {
+		t.Fatal(err)
+	}
 	if len(kvs) != 3 {
 		t.Fatalf("len(kvs) = %d, want %d", len(kvs), 3)
 	}
 
 	// after removal foo
-	kvs, rev = s.Range([]byte("foo"), []byte("foo3"), 0, 4)
+	kvs, rev, err = s.Range([]byte("foo"), []byte("foo3"), 0, 4)
+	if err != nil {
+		t.Fatal(err)
+	}
 	if len(kvs) != 2 {
 		t.Fatalf("len(kvs) = %d, want %d", len(kvs), 2)
 	}
@@ -134,7 +144,10 @@ func TestRangeInSequence(t *testing.T) {
 	}
 
 	// after removal foo1
-	kvs, rev = s.Range([]byte("foo"), []byte("foo3"), 0, 5)
+	kvs, rev, err = s.Range([]byte("foo"), []byte("foo3"), 0, 5)
+	if err != nil {
+		t.Fatal(err)
+	}
 	if len(kvs) != 1 {
 		t.Fatalf("len(kvs) = %d, want %d", len(kvs), 1)
 	}
@@ -146,7 +159,10 @@ func TestRangeInSequence(t *testing.T) {
 	}
 
 	// after removal foo2
-	kvs, rev = s.Range([]byte("foo"), []byte("foo3"), 0, 6)
+	kvs, rev, err = s.Range([]byte("foo"), []byte("foo3"), 0, 6)
+	if err != nil {
+		t.Fatal(err)
+	}
 	if len(kvs) != 0 {
 		t.Fatalf("len(kvs) = %d, want %d", len(kvs), 0)
 	}
@@ -230,7 +246,10 @@ func TestOneTnx(t *testing.T) {
 	}
 
 	// After tnx
-	kvs, rev := s.Range([]byte("foo"), []byte("foo3"), 0, 1)
+	kvs, rev, err := s.Range([]byte("foo"), []byte("foo3"), 0, 1)
+	if err != nil {
+		t.Fatal(err)
+	}
 	if len(kvs) != 0 {
 		t.Fatalf("len(kvs) = %d, want %d", len(kvs), 0)
 	}
@@ -239,6 +258,68 @@ func TestOneTnx(t *testing.T) {
 	}
 }
 
+func TestCompaction(t *testing.T) {
+	s := newStore("test")
+	defer os.Remove("test")
+
+	s.Put([]byte("foo"), []byte("bar"))
+	s.Put([]byte("foo1"), []byte("bar1"))
+	s.Put([]byte("foo2"), []byte("bar2"))
+	s.Put([]byte("foo"), []byte("bar11"))
+	s.Put([]byte("foo1"), []byte("bar12"))
+	s.Put([]byte("foo2"), []byte("bar13"))
+	s.Put([]byte("foo1"), []byte("bar14"))
+	s.DeleteRange([]byte("foo"), []byte("foo200"))
+	s.Put([]byte("foo4"), []byte("bar4"))
+
+	err := s.Compact(4)
+	if err != nil {
+		t.Errorf("unexpect compact error %v", err)
+	}
+
+	err = s.Compact(4)
+	if err != ErrCompacted {
+		t.Errorf("err = %v, want %v", err, ErrCompacted)
+	}
+
+	_, _, err = s.Range([]byte("foo"), nil, 0, 4)
+	if err != ErrCompacted {
+		t.Errorf("err = %v, want %v", err, ErrCompacted)
+	}
+
+	// compact should not compact the last value of foo
+	kvs, rev, err := s.Range([]byte("foo"), nil, 0, 5)
+	if err != nil {
+		t.Errorf("unexpected range error %v", err)
+	}
+	if !bytes.Equal(kvs[0].Value, []byte("bar11")) {
+		t.Errorf("value = %s, want %s", string(kvs[0].Value), "bar11")
+	}
+	if rev != 5 {
+		t.Errorf("rev = %d, want %d", rev, 5)
+	}
+
+	// compact everything
+	err = s.Compact(8)
+	if err != nil {
+		t.Errorf("unexpect compact error %v", err)
+	}
+
+	kvs, rev, err = s.Range([]byte("foo"), []byte("fop"), 0, 0)
+	if err != nil {
+		t.Errorf("unexpected range error %v", err)
+	}
+	if len(kvs) != 1 {
+		t.Errorf("len(kvs) = %d, want %d", len(kvs), 1)
+	}
+	if !bytes.Equal(kvs[0].Value, []byte("bar4")) {
+		t.Errorf("value = %s, want %s", string(kvs[0].Value), "bar4")
+	}
+	if rev != 9 {
+		t.Errorf("rev = %d, want %d", rev, 9)
+	}
+}
+
 func BenchmarkStorePut(b *testing.B) {
 	s := newStore("test")
 	defer os.Remove("test")

+ 21 - 0
storage/reversion.go

@@ -0,0 +1,21 @@
+package storage
+
+import "encoding/binary"
+
+type reversion struct {
+	main int64
+	sub  int64
+}
+
+func revToBytes(rev reversion, bytes []byte) {
+	binary.BigEndian.PutUint64(bytes, uint64(rev.main))
+	bytes[8] = '_'
+	binary.BigEndian.PutUint64(bytes[9:], uint64(rev.sub))
+}
+
+func bytesToRev(bytes []byte) reversion {
+	return reversion{
+		main: int64(binary.BigEndian.Uint64(bytes[0:8])),
+		sub:  int64(binary.BigEndian.Uint64(bytes[9:])),
+	}
+}