Browse Source

Merge pull request #4863 from heyitsanthony/ft-check-compact

etcd-tester: check compaction revision
Anthony Romano 9 years ago
parent
commit
a5172974da

+ 10 - 6
etcdserver/apply.go

@@ -37,6 +37,10 @@ const (
 type applyResult struct {
 type applyResult struct {
 	resp proto.Message
 	resp proto.Message
 	err  error
 	err  error
+	// physc signals the physical effect of the request has completed in addition
+	// to being logically reflected by the node. Currently only used for
+	// Compaction requests.
+	physc <-chan struct{}
 }
 }
 
 
 // applierV3 is the interface for processing V3 raft messages
 // applierV3 is the interface for processing V3 raft messages
@@ -45,7 +49,7 @@ type applierV3 interface {
 	Range(txnID int64, r *pb.RangeRequest) (*pb.RangeResponse, error)
 	Range(txnID int64, r *pb.RangeRequest) (*pb.RangeResponse, error)
 	DeleteRange(txnID int64, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error)
 	DeleteRange(txnID int64, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error)
 	Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error)
 	Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error)
-	Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, error)
+	Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, error)
 	LeaseCreate(lc *pb.LeaseCreateRequest) (*pb.LeaseCreateResponse, error)
 	LeaseCreate(lc *pb.LeaseCreateRequest) (*pb.LeaseCreateResponse, error)
 	LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error)
 	LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error)
 	Alarm(*pb.AlarmRequest) (*pb.AlarmResponse, error)
 	Alarm(*pb.AlarmRequest) (*pb.AlarmResponse, error)
@@ -69,7 +73,7 @@ func (s *EtcdServer) applyV3Request(r *pb.InternalRaftRequest) *applyResult {
 	case r.Txn != nil:
 	case r.Txn != nil:
 		ar.resp, ar.err = s.applyV3.Txn(r.Txn)
 		ar.resp, ar.err = s.applyV3.Txn(r.Txn)
 	case r.Compaction != nil:
 	case r.Compaction != nil:
-		ar.resp, ar.err = s.applyV3.Compaction(r.Compaction)
+		ar.resp, ar.physc, ar.err = s.applyV3.Compaction(r.Compaction)
 	case r.LeaseCreate != nil:
 	case r.LeaseCreate != nil:
 		ar.resp, ar.err = s.applyV3.LeaseCreate(r.LeaseCreate)
 		ar.resp, ar.err = s.applyV3.LeaseCreate(r.LeaseCreate)
 	case r.LeaseRevoke != nil:
 	case r.LeaseRevoke != nil:
@@ -362,16 +366,16 @@ func (a *applierV3backend) applyUnion(txnID int64, union *pb.RequestUnion) *pb.R
 
 
 }
 }
 
 
-func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, error) {
+func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, error) {
 	resp := &pb.CompactionResponse{}
 	resp := &pb.CompactionResponse{}
 	resp.Header = &pb.ResponseHeader{}
 	resp.Header = &pb.ResponseHeader{}
-	err := a.s.KV().Compact(compaction.Revision)
+	ch, err := a.s.KV().Compact(compaction.Revision)
 	if err != nil {
 	if err != nil {
-		return nil, err
+		return nil, nil, err
 	}
 	}
 	// get the current revision. which key to get is not important.
 	// get the current revision. which key to get is not important.
 	_, resp.Header.Revision, _ = a.s.KV().Range([]byte("compaction"), nil, 1, 0)
 	_, resp.Header.Revision, _ = a.s.KV().Range([]byte("compaction"), nil, 1, 0)
-	return resp, err
+	return resp, ch, err
 }
 }
 
 
 func (a *applierV3backend) LeaseCreate(lc *pb.LeaseCreateRequest) (*pb.LeaseCreateResponse, error) {
 func (a *applierV3backend) LeaseCreate(lc *pb.LeaseCreateRequest) (*pb.LeaseCreateResponse, error) {

+ 37 - 0
etcdserver/etcdserverpb/rpc.pb.go

@@ -765,6 +765,10 @@ func (m *TxnResponse) GetResponses() []*ResponseUnion {
 // revision.
 // revision.
 type CompactionRequest struct {
 type CompactionRequest struct {
 	Revision int64 `protobuf:"varint,1,opt,name=revision,proto3" json:"revision,omitempty"`
 	Revision int64 `protobuf:"varint,1,opt,name=revision,proto3" json:"revision,omitempty"`
+	// physical is set so the RPC will wait until the compaction is physically
+	// applied to the local database such that compacted entries are totally
+	// removed from the backing store.
+	Physical bool `protobuf:"varint,2,opt,name=physical,proto3" json:"physical,omitempty"`
 }
 }
 
 
 func (m *CompactionRequest) Reset()         { *m = CompactionRequest{} }
 func (m *CompactionRequest) Reset()         { *m = CompactionRequest{} }
@@ -3411,6 +3415,16 @@ func (m *CompactionRequest) MarshalTo(data []byte) (int, error) {
 		i++
 		i++
 		i = encodeVarintRpc(data, i, uint64(m.Revision))
 		i = encodeVarintRpc(data, i, uint64(m.Revision))
 	}
 	}
+	if m.Physical {
+		data[i] = 0x10
+		i++
+		if m.Physical {
+			data[i] = 1
+		} else {
+			data[i] = 0
+		}
+		i++
+	}
 	return i, nil
 	return i, nil
 }
 }
 
 
@@ -5318,6 +5332,9 @@ func (m *CompactionRequest) Size() (n int) {
 	if m.Revision != 0 {
 	if m.Revision != 0 {
 		n += 1 + sovRpc(uint64(m.Revision))
 		n += 1 + sovRpc(uint64(m.Revision))
 	}
 	}
+	if m.Physical {
+		n += 2
+	}
 	return n
 	return n
 }
 }
 
 
@@ -7660,6 +7677,26 @@ func (m *CompactionRequest) Unmarshal(data []byte) error {
 					break
 					break
 				}
 				}
 			}
 			}
+		case 2:
+			if wireType != 0 {
+				return fmt.Errorf("proto: wrong wireType = %d for field Physical", wireType)
+			}
+			var v int
+			for shift := uint(0); ; shift += 7 {
+				if shift >= 64 {
+					return ErrIntOverflowRpc
+				}
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[iNdEx]
+				iNdEx++
+				v |= (int(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			m.Physical = bool(v != 0)
 		default:
 		default:
 			iNdEx = preIndex
 			iNdEx = preIndex
 			skippy, err := skipRpc(data[iNdEx:])
 			skippy, err := skipRpc(data[iNdEx:])

+ 4 - 0
etcdserver/etcdserverpb/rpc.proto

@@ -289,6 +289,10 @@ message TxnResponse {
 // revision.
 // revision.
 message CompactionRequest {
 message CompactionRequest {
   int64 revision = 1;
   int64 revision = 1;
+  // physical is set so the RPC will wait until the compaction is physically
+  // applied to the local database such that compacted entries are totally
+  // removed from the backing store.
+  bool physical = 2;
 }
 }
 
 
 message CompactionResponse {
 message CompactionResponse {

+ 3 - 0
etcdserver/v3demo_server.go

@@ -97,6 +97,9 @@ func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
+	if r.Physical && result.physc != nil {
+		<-result.physc
+	}
 	resp := result.resp.(*pb.CompactionResponse)
 	resp := result.resp.(*pb.CompactionResponse)
 	if resp == nil {
 	if resp == nil {
 		resp = &pb.CompactionResponse{}
 		resp = &pb.CompactionResponse{}

+ 1 - 1
storage/kv.go

@@ -67,7 +67,7 @@ type KV interface {
 	TxnPut(txnID int64, key, value []byte, lease lease.LeaseID) (rev int64, err error)
 	TxnPut(txnID int64, key, value []byte, lease lease.LeaseID) (rev int64, err error)
 	TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error)
 	TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error)
 
 
-	Compact(rev int64) error
+	Compact(rev int64) (<-chan struct{}, error)
 
 
 	// Hash retrieves the hash of KV state.
 	// Hash retrieves the hash of KV state.
 	// This method is designed for consistency checking purpose.
 	// This method is designed for consistency checking purpose.

+ 3 - 3
storage/kv_test.go

@@ -186,7 +186,7 @@ func testKVRangeBadRev(t *testing.T, f rangeFunc) {
 	defer cleanup(s, b, tmpPath)
 	defer cleanup(s, b, tmpPath)
 
 
 	put3TestKVs(s)
 	put3TestKVs(s)
-	if err := s.Compact(4); err != nil {
+	if _, err := s.Compact(4); err != nil {
 		t.Fatalf("compact error (%v)", err)
 		t.Fatalf("compact error (%v)", err)
 	}
 	}
 
 
@@ -566,7 +566,7 @@ func TestKVCompactReserveLastValue(t *testing.T) {
 		},
 		},
 	}
 	}
 	for i, tt := range tests {
 	for i, tt := range tests {
-		err := s.Compact(tt.rev)
+		_, err := s.Compact(tt.rev)
 		if err != nil {
 		if err != nil {
 			t.Errorf("#%d: unexpect compact error %v", i, err)
 			t.Errorf("#%d: unexpect compact error %v", i, err)
 		}
 		}
@@ -602,7 +602,7 @@ func TestKVCompactBad(t *testing.T) {
 		{100, ErrFutureRev},
 		{100, ErrFutureRev},
 	}
 	}
 	for i, tt := range tests {
 	for i, tt := range tests {
-		err := s.Compact(tt.rev)
+		_, err := s.Compact(tt.rev)
 		if err != tt.werr {
 		if err != tt.werr {
 			t.Errorf("#%d: compact error = %v, want %v", i, err, tt.werr)
 			t.Errorf("#%d: compact error = %v, want %v", i, err, tt.werr)
 		}
 		}

+ 6 - 5
storage/kvstore.go

@@ -218,14 +218,14 @@ func (s *store) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err
 	return n, rev, nil
 	return n, rev, nil
 }
 }
 
 
-func (s *store) Compact(rev int64) error {
+func (s *store) Compact(rev int64) (<-chan struct{}, error) {
 	s.mu.Lock()
 	s.mu.Lock()
 	defer s.mu.Unlock()
 	defer s.mu.Unlock()
 	if rev <= s.compactMainRev {
 	if rev <= s.compactMainRev {
-		return ErrCompacted
+		return nil, ErrCompacted
 	}
 	}
 	if rev > s.currentRev.main {
 	if rev > s.currentRev.main {
-		return ErrFutureRev
+		return nil, ErrFutureRev
 	}
 	}
 
 
 	start := time.Now()
 	start := time.Now()
@@ -243,8 +243,9 @@ func (s *store) Compact(rev int64) error {
 	s.b.ForceCommit()
 	s.b.ForceCommit()
 
 
 	keep := s.kvindex.Compact(rev)
 	keep := s.kvindex.Compact(rev)
-
+	ch := make(chan struct{})
 	var j = func(ctx context.Context) {
 	var j = func(ctx context.Context) {
+		defer close(ch)
 		select {
 		select {
 		case <-ctx.Done():
 		case <-ctx.Done():
 			return
 			return
@@ -256,7 +257,7 @@ func (s *store) Compact(rev int64) error {
 	s.fifoSched.Schedule(j)
 	s.fifoSched.Schedule(j)
 
 
 	indexCompactionPauseDurations.Observe(float64(time.Now().Sub(start) / time.Millisecond))
 	indexCompactionPauseDurations.Observe(float64(time.Now().Sub(start) / time.Millisecond))
-	return nil
+	return ch, nil
 }
 }
 
 
 func (s *store) Hash() (uint32, error) {
 func (s *store) Hash() (uint32, error) {

+ 1 - 1
storage/watchable_store_test.go

@@ -234,7 +234,7 @@ func TestWatchCompacted(t *testing.T) {
 	for i := 0; i < maxRev; i++ {
 	for i := 0; i < maxRev; i++ {
 		s.Put(testKey, testValue, lease.NoLease)
 		s.Put(testKey, testValue, lease.NoLease)
 	}
 	}
-	err := s.Compact(compactRev)
+	_, err := s.Compact(compactRev)
 	if err != nil {
 	if err != nil {
 		t.Fatalf("failed to compact kv (%v)", err)
 		t.Fatalf("failed to compact kv (%v)", err)
 	}
 	}

+ 36 - 1
tools/functional-tester/etcd-tester/cluster.go

@@ -322,6 +322,11 @@ func (c *cluster) compactKV(rev int64) error {
 		conn *grpc.ClientConn
 		conn *grpc.ClientConn
 		err  error
 		err  error
 	)
 	)
+
+	if rev <= 0 {
+		return nil
+	}
+
 	for _, u := range c.GRPCURLs {
 	for _, u := range c.GRPCURLs {
 		conn, err = grpc.Dial(u, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second))
 		conn, err = grpc.Dial(u, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second))
 		if err != nil {
 		if err != nil {
@@ -329,7 +334,7 @@ func (c *cluster) compactKV(rev int64) error {
 		}
 		}
 		kvc := pb.NewKVClient(conn)
 		kvc := pb.NewKVClient(conn)
 		ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
 		ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
-		_, err = kvc.Compact(ctx, &pb.CompactionRequest{Revision: rev})
+		_, err = kvc.Compact(ctx, &pb.CompactionRequest{Revision: rev, Physical: true})
 		cancel()
 		cancel()
 		conn.Close()
 		conn.Close()
 		if err == nil {
 		if err == nil {
@@ -338,3 +343,33 @@ func (c *cluster) compactKV(rev int64) error {
 	}
 	}
 	return err
 	return err
 }
 }
+
+func (c *cluster) checkCompact(rev int64) error {
+	if rev == 0 {
+		return nil
+	}
+	for _, u := range c.GRPCURLs {
+		cli, err := clientv3.New(clientv3.Config{
+			Endpoints:   []string{u},
+			DialTimeout: 5 * time.Second,
+		})
+		if err != nil {
+			return err
+		}
+
+		ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+		wch := cli.Watch(ctx, "\x00", clientv3.WithFromKey(), clientv3.WithRev(rev-1))
+		wr, ok := <-wch
+		cancel()
+
+		cli.Close()
+
+		if !ok {
+			return fmt.Errorf("watch channel terminated")
+		}
+		if wr.CompactRevision != rev {
+			return fmt.Errorf("got compact revision %v, wanted %v", wr.CompactRevision, rev)
+		}
+	}
+	return nil
+}

+ 9 - 2
tools/functional-tester/etcd-tester/tester.go

@@ -149,8 +149,15 @@ func (tt *tester) runLoop() {
 		}
 		}
 		plog.Printf("[round#%d] compacted storage", i)
 		plog.Printf("[round#%d] compacted storage", i)
 
 
-		// TODO: make sure compaction is finished
-		time.Sleep(30 * time.Second)
+		plog.Printf("[round#%d] check compaction at %d", i, revToCompact)
+		if err := tt.cluster.checkCompact(revToCompact); err != nil {
+			plog.Printf("[round#%d] checkCompact error (%v)", i, err)
+			if err := tt.cleanup(i, 0); err != nil {
+				plog.Printf("[round#%d] cleanup error: %v", i, err)
+				return
+			}
+		}
+		plog.Printf("[round#%d] confirmed compaction at %d", i, revToCompact)
 	}
 	}
 }
 }