Browse Source

storage: support seq put to make db more compact

Xiang Li 9 years ago
parent
commit
d883d5acd3
3 changed files with 23 additions and 5 deletions
  1. 15 0
      storage/backend/batch_tx.go
  2. 2 2
      storage/kvstore.go
  3. 6 3
      storage/kvstore_test.go

+ 15 - 0
storage/backend/batch_tx.go

@@ -28,6 +28,7 @@ type BatchTx interface {
 	Unlock()
 	UnsafeCreateBucket(name []byte)
 	UnsafePut(bucketName []byte, key []byte, value []byte)
+	UnsafeSeqPut(bucketName []byte, key []byte, value []byte)
 	UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)
 	UnsafeDelete(bucketName []byte, key []byte)
 	Commit()
@@ -57,10 +58,24 @@ func (t *batchTx) UnsafeCreateBucket(name []byte) {
 
 // UnsafePut must be called holding the lock on the tx.
 func (t *batchTx) UnsafePut(bucketName []byte, key []byte, value []byte) {
+	t.unsafePut(bucketName, key, value, false)
+}
+
+// UnsafeSeqPut must be called holding the lock on the tx.
+func (t *batchTx) UnsafeSeqPut(bucketName []byte, key []byte, value []byte) {
+	t.unsafePut(bucketName, key, value, true)
+}
+
+func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq bool) {
 	bucket := t.tx.Bucket(bucketName)
 	if bucket == nil {
 		log.Fatalf("storage: bucket %s does not exist", string(bucketName))
 	}
+	if seq {
+		// it is useful to increase fill percent when the workloads are mostly append-only.
+		// this can delay the page split and reduce space usage.
+		bucket.FillPercent = 0.9
+	}
 	if err := bucket.Put(key, value); err != nil {
 		log.Fatalf("storage: cannot put key into bucket (%v)", err)
 	}

+ 2 - 2
storage/kvstore.go

@@ -453,7 +453,7 @@ func (s *store) put(key, value []byte, leaseID lease.LeaseID) {
 		log.Fatalf("storage: cannot marshal event: %v", err)
 	}
 
-	s.tx.UnsafePut(keyBucketName, ibytes, d)
+	s.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
 	s.kvindex.Put(key, revision{main: rev, sub: s.currentRev.sub})
 	s.changes = append(s.changes, kv)
 	s.currentRev.sub += 1
@@ -514,7 +514,7 @@ func (s *store) delete(key []byte, rev revision) {
 		log.Fatalf("storage: cannot marshal event: %v", err)
 	}
 
-	s.tx.UnsafePut(keyBucketName, ibytes, d)
+	s.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
 	err = s.kvindex.Tombstone(key, revision{main: mainrev, sub: s.currentRev.sub})
 	if err != nil {
 		log.Fatalf("storage: cannot tombstone an existing key (%s): %v", string(key), err)

+ 6 - 3
storage/kvstore_test.go

@@ -139,13 +139,13 @@ func TestStorePut(t *testing.T) {
 		}
 
 		wact := []testutil.Action{
-			{"put", []interface{}{keyBucketName, tt.wkey, data}},
+			{"seqput", []interface{}{keyBucketName, tt.wkey, data}},
 		}
 
 		if tt.rr != nil {
 			wact = []testutil.Action{
 				{"range", []interface{}{keyBucketName, newTestKeyBytes(tt.r.rev, false), []byte(nil), int64(0)}},
-				{"put", []interface{}{keyBucketName, tt.wkey, data}},
+				{"seqput", []interface{}{keyBucketName, tt.wkey, data}},
 			}
 		}
 
@@ -305,7 +305,7 @@ func TestStoreDeleteRange(t *testing.T) {
 			t.Errorf("#%d: marshal err = %v, want nil", i, err)
 		}
 		wact := []testutil.Action{
-			{"put", []interface{}{keyBucketName, tt.wkey, data}},
+			{"seqput", []interface{}{keyBucketName, tt.wkey, data}},
 			{"range", []interface{}{keyBucketName, newTestKeyBytes(revision{2, 0}, false), []byte(nil), int64(0)}},
 		}
 		if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
@@ -573,6 +573,9 @@ func (b *fakeBatchTx) UnsafeCreateBucket(name []byte) {}
 func (b *fakeBatchTx) UnsafePut(bucketName []byte, key []byte, value []byte) {
 	b.Recorder.Record(testutil.Action{Name: "put", Params: []interface{}{bucketName, key, value}})
 }
+func (b *fakeBatchTx) UnsafeSeqPut(bucketName []byte, key []byte, value []byte) {
+	b.Recorder.Record(testutil.Action{Name: "seqput", Params: []interface{}{bucketName, key, value}})
+}
 func (b *fakeBatchTx) UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) {
 	b.Recorder.Record(testutil.Action{Name: "range", Params: []interface{}{bucketName, key, endKey, limit}})
 	r := <-b.rangeRespc