Browse Source

functional-tester: add "Member.Compact" method

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
Gyuho Lee 7 years ago
parent
commit
5423100def
2 changed files with 16 additions and 19 deletions
  1. 15 0
      tools/functional-tester/rpcpb/member.go
  2. 1 19
      tools/functional-tester/tester/cluster.go

+ 15 - 0
tools/functional-tester/rpcpb/member.go

@@ -126,6 +126,21 @@ func (m *Member) Rev(ctx context.Context) (int64, error) {
 	return resp.Header.Revision, nil
 }
 
+// Compact compacts member storage with given revision.
+// It blocks until it's physically done.
+func (m *Member) Compact(rev int64, timeout time.Duration) error {
+	cli, err := m.CreateEtcdClient()
+	if err != nil {
+		return fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint)
+	}
+	defer cli.Close()
+
+	ctx, cancel := context.WithTimeout(context.Background(), timeout)
+	_, err = cli.Compact(ctx, rev, clientv3.WithCompactPhysical())
+	cancel()
+	return err
+}
+
 // IsLeader returns true if this member is the current cluster leader.
 func (m *Member) IsLeader() (bool, error) {
 	cli, err := m.CreateEtcdClient()

+ 1 - 19
tools/functional-tester/tester/cluster.go

@@ -25,7 +25,6 @@ import (
 	"strings"
 	"time"
 
-	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/pkg/debugutil"
 	"github.com/coreos/etcd/tools/functional-tester/rpcpb"
 
@@ -681,31 +680,14 @@ func (clus *Cluster) compactKV(rev int64, timeout time.Duration) (err error) {
 	}
 
 	for i, m := range clus.Members {
-		conn, derr := m.DialEtcdGRPCServer()
-		if derr != nil {
-			clus.lg.Warn(
-				"compactKV dial failed",
-				zap.String("endpoint", m.EtcdClientEndpoint),
-				zap.Error(derr),
-			)
-			err = derr
-			continue
-		}
-		kvc := pb.NewKVClient(conn)
-
 		clus.lg.Info(
 			"compacting",
 			zap.String("endpoint", m.EtcdClientEndpoint),
 			zap.Int64("compact-revision", rev),
 			zap.Duration("timeout", timeout),
 		)
-
 		now := time.Now()
-		ctx, cancel := context.WithTimeout(context.Background(), timeout)
-		_, cerr := kvc.Compact(ctx, &pb.CompactionRequest{Revision: rev, Physical: true}, grpc.FailFast(false))
-		cancel()
-
-		conn.Close()
+		cerr := m.Compact(rev, timeout)
 		succeed := true
 		if cerr != nil {
 			if strings.Contains(cerr.Error(), "required revision has been compacted") && i > 0 {