소스 검색

storage: have Range on rev=0 work even if compacted to current revision

Anthony Romano 9 년 전
부모
커밋
d72bcdc156
6개의 변경된 파일33개의 추가작업 그리고 8개의 파일을 삭제
  1. 2 4
      etcdserver/etcdserverpb/rpc.proto
  2. 25 0
      integration/v3_grpc_test.go
  3. 1 0
      storage/kv.go
  4. 3 1
      storage/kv_test.go
  5. 1 1
      storage/kvstore.go
  6. 1 2
      storage/kvstore_test.go

+ 2 - 4
etcdserver/etcdserverpb/rpc.proto

@@ -290,10 +290,8 @@ message TxnResponse {
   repeated ResponseUnion responses = 3;
 }
 
-// Compaction compacts the kv store upto the given revision (including).
-// It removes the old versions of a key. It keeps the newest version of
-// the key even if its latest modification revision is smaller than the given
-// revision.
+// Compaction compacts the kv store upto a given revision. All superseded keys
+// with a revision less than the compaction revision will be removed.
 message CompactionRequest {
   int64 revision = 1;
   // physical is set so the RPC will wait until the compaction is physically

+ 25 - 0
integration/v3_grpc_test.go

@@ -74,6 +74,31 @@ func TestV3PutOverwrite(t *testing.T) {
 	}
 }
 
+// TestV3CompactCurrentRev ensures keys are present when compacting on current revision.
+func TestV3CompactCurrentRev(t *testing.T) {
+	defer testutil.AfterTest(t)
+	clus := NewClusterV3(t, &ClusterConfig{Size: 1})
+	defer clus.Terminate(t)
+
+	kvc := toGRPC(clus.RandClient()).KV
+	preq := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
+	for i := 0; i < 3; i++ {
+		if _, err := kvc.Put(context.Background(), preq); err != nil {
+			t.Fatalf("couldn't put key (%v)", err)
+		}
+	}
+	// compact on current revision
+	_, err := kvc.Compact(context.Background(), &pb.CompactionRequest{Revision: 4})
+	if err != nil {
+		t.Fatalf("couldn't compact kv space (%v)", err)
+	}
+	// key still exists?
+	_, err = kvc.Range(context.Background(), &pb.RangeRequest{Key: []byte("foo")})
+	if err != nil {
+		t.Fatalf("couldn't get key after compaction (%v)", err)
+	}
+}
+
 func TestV3TxnTooManyOps(t *testing.T) {
 	defer testutil.AfterTest(t)
 	clus := NewClusterV3(t, &ClusterConfig{Size: 3})

+ 1 - 0
storage/kv.go

@@ -67,6 +67,7 @@ type KV interface {
 	TxnPut(txnID int64, key, value []byte, lease lease.LeaseID) (rev int64, err error)
 	TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error)
 
+	// Compact frees all superseded keys with revisions less than rev.
 	Compact(rev int64) (<-chan struct{}, error)
 
 	// Hash retrieves the hash of KV state.

+ 3 - 1
storage/kv_test.go

@@ -194,9 +194,11 @@ func testKVRangeBadRev(t *testing.T, f rangeFunc) {
 		rev  int64
 		werr error
 	}{
-		{-1, ErrCompacted},
+		{-1, nil}, // <= 0 is most recent store
+		{0, nil},
 		{1, ErrCompacted},
 		{2, ErrCompacted},
+		{4, nil},
 		{5, ErrFutureRev},
 		{100, ErrFutureRev},
 	}

+ 1 - 1
storage/kvstore.go

@@ -425,7 +425,7 @@ func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64) (kvs []storage
 	} else {
 		rev = rangeRev
 	}
-	if rev <= s.compactMainRev {
+	if rev < s.compactMainRev {
 		return nil, 0, ErrCompacted
 	}
 

+ 1 - 2
storage/kvstore_test.go

@@ -440,12 +440,11 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
 	// wait for scheduled compaction to be finished
 	time.Sleep(100 * time.Millisecond)
 
-	if _, _, err := s1.Range([]byte("foo"), nil, 0, 2); err != ErrCompacted {
+	if _, _, err := s1.Range([]byte("foo"), nil, 0, 1); err != ErrCompacted {
 		t.Errorf("range on compacted rev error = %v, want %v", err, ErrCompacted)
 	}
 	// check the key in backend is deleted
 	revbytes := newRevBytes()
-	// TODO: compact should delete main=2 key too
 	revToBytes(revision{main: 1}, revbytes)
 
 	// The disk compaction is done asynchronously and requires more time on slow disk.