Browse Source

Merge pull request #6885 from fanminshi/refractor_lease_checker

etcd-tester: refactor lease checker
fanmin shi 9 years ago
parent
commit
b9e9c9483b

+ 1 - 1
tools/functional-tester/etcd-agent/agent.go

@@ -157,7 +157,7 @@ func (a *Agent) cleanup() error {
 	// https://github.com/torvalds/linux/blob/master/fs/drop_caches.c
 	cmd := exec.Command("/bin/sh", "-c", `echo "echo 1 > /proc/sys/vm/drop_caches" | sudo sh`)
 	if err := cmd.Run(); err != nil {
-		plog.Printf("error when cleaning page cache (%v)", err)
+		plog.Infof("error when cleaning page cache (%v)", err)
 	}
 	return nil
 }

+ 61 - 11
tools/functional-tester/etcd-tester/checks.go

@@ -18,6 +18,9 @@ import (
 	"fmt"
 	"time"
 
+	"google.golang.org/grpc"
+
+	"github.com/coreos/etcd/clientv3"
 	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"golang.org/x/net/context"
@@ -77,20 +80,31 @@ func (hc *hashChecker) Check() error {
 	return hc.checkRevAndHashes()
 }
 
-type leaseChecker struct{ ls *leaseStresser }
+type leaseChecker struct {
+	ls          *leaseStresser
+	leaseClient pb.LeaseClient
+	kvc         pb.KVClient
+}
 
 func (lc *leaseChecker) Check() error {
-	plog.Infof("checking revoked leases %v", lc.ls.revokedLeases.leases)
+	conn, err := grpc.Dial(lc.ls.endpoint, grpc.WithInsecure(), grpc.WithBackoffMaxDelay(1))
+	if err != nil {
+		return fmt.Errorf("%v (%s)", err, lc.ls.endpoint)
+	}
+	defer func() {
+		if conn != nil {
+			conn.Close()
+		}
+	}()
+	lc.kvc = pb.NewKVClient(conn)
+	lc.leaseClient = pb.NewLeaseClient(conn)
 	if err := lc.check(true, lc.ls.revokedLeases.leases); err != nil {
 		return err
 	}
-	plog.Infof("checking alive leases %v", lc.ls.aliveLeases.leases)
 	if err := lc.check(false, lc.ls.aliveLeases.leases); err != nil {
 		return err
 	}
-	plog.Infof("checking short lived leases %v", lc.ls.shortLivedLeases.leases)
 	return lc.checkShortLivedLeases()
-
 }
 
 // checkShortLivedLeases ensures leases expire.
@@ -117,19 +131,19 @@ func (lc *leaseChecker) checkShortLivedLease(ctx context.Context, leaseID int64)
 	// retry in case of transient failure or lease is expired but not yet revoked due to the fact that etcd cluster didn't have enought time to delete it.
 	var resp *pb.LeaseTimeToLiveResponse
 	for i := 0; i < retries; i++ {
-		resp, err = lc.ls.getLeaseByID(ctx, leaseID)
+		resp, err = lc.getLeaseByID(ctx, leaseID)
 		if rpctypes.Error(err) == rpctypes.ErrLeaseNotFound {
 			return nil
 		}
 		if err != nil {
-			plog.Warningf("retry %d. failed to retrieve lease %v error (%v)", i, leaseID, err)
+			plog.Debugf("retry %d. failed to retrieve lease %v error (%v)", i, leaseID, err)
 			continue
 		}
 		if resp.TTL > 0 {
-			plog.Warningf("lease %v is not expired. sleep for %d until it expires.", leaseID, resp.TTL)
+			plog.Debugf("lease %v is not expired. sleep for %d until it expires.", leaseID, resp.TTL)
 			time.Sleep(time.Duration(resp.TTL) * time.Second)
 		} else {
-			plog.Warningf("retry %d. lease %v is expired but not yet revoked", i, leaseID)
+			plog.Debugf("retry %d. lease %v is expired but not yet revoked", i, leaseID)
 			time.Sleep(time.Second)
 		}
 		if err = lc.checkLease(ctx, false, leaseID); err != nil {
@@ -141,12 +155,12 @@ func (lc *leaseChecker) checkShortLivedLease(ctx context.Context, leaseID int64)
 }
 
 func (lc *leaseChecker) checkLease(ctx context.Context, expired bool, leaseID int64) error {
-	keysExpired, err := lc.ls.hasKeysAttachedToLeaseExpired(ctx, leaseID)
+	keysExpired, err := lc.hasKeysAttachedToLeaseExpired(ctx, leaseID)
 	if err != nil {
 		plog.Errorf("hasKeysAttachedToLeaseExpired error: (%v)", err)
 		return err
 	}
-	leaseExpired, err := lc.ls.hasLeaseExpired(ctx, leaseID)
+	leaseExpired, err := lc.hasLeaseExpired(ctx, leaseID)
 	if err != nil {
 		plog.Errorf("hasLeaseExpired error: (%v)", err)
 		return err
@@ -171,6 +185,42 @@ func (lc *leaseChecker) check(expired bool, leases map[int64]time.Time) error {
 	return nil
 }
 
+func (lc *leaseChecker) getLeaseByID(ctx context.Context, leaseID int64) (*pb.LeaseTimeToLiveResponse, error) {
+	ltl := &pb.LeaseTimeToLiveRequest{ID: leaseID, Keys: true}
+	return lc.leaseClient.LeaseTimeToLive(ctx, ltl, grpc.FailFast(false))
+}
+
+func (lc *leaseChecker) hasLeaseExpired(ctx context.Context, leaseID int64) (bool, error) {
+	// keep retrying until lease's state is known or ctx is being canceled
+	for ctx.Err() == nil {
+		resp, err := lc.getLeaseByID(ctx, leaseID)
+		if err == nil {
+			return false, nil
+		}
+		if rpctypes.Error(err) == rpctypes.ErrLeaseNotFound {
+			return true, nil
+		}
+		plog.Warningf("hasLeaseExpired %v resp %v error (%v)", leaseID, resp, err)
+	}
+	return false, ctx.Err()
+}
+
+// The keys attached to the lease has the format of "<leaseID>_<idx>" where idx is the ordering key creation
+// Since the format of keys contains about leaseID, finding keys base on "<leaseID>" prefix
+// determines whether the attached keys for a given leaseID has been deleted or not
+func (lc *leaseChecker) hasKeysAttachedToLeaseExpired(ctx context.Context, leaseID int64) (bool, error) {
+	resp, err := lc.kvc.Range(ctx, &pb.RangeRequest{
+		Key:      []byte(fmt.Sprintf("%d", leaseID)),
+		RangeEnd: []byte(clientv3.GetPrefixRangeEnd(fmt.Sprintf("%d", leaseID))),
+	}, grpc.FailFast(false))
+	plog.Debugf("hasKeysAttachedToLeaseExpired %v resp %v error (%v)", leaseID, resp, err)
+	if err != nil {
+		plog.Errorf("retriving keys attached to lease %v error: (%v)", leaseID, err)
+		return false, err
+	}
+	return len(resp.Kvs) == 0, nil
+}
+
 // compositeChecker implements a checker that runs a slice of Checkers concurrently.
 type compositeChecker struct{ checkers []Checker }
 

+ 10 - 46
tools/functional-tester/etcd-tester/lease_stresser.go

@@ -22,7 +22,6 @@ import (
 
 	"time"
 
-	"github.com/coreos/etcd/clientv3"
 	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"golang.org/x/net/context"
@@ -114,14 +113,6 @@ func (ls *leaseStresser) setupOnce() error {
 		panic("expect keysPerLease to be set")
 	}
 
-	conn, err := grpc.Dial(ls.endpoint, grpc.WithInsecure(), grpc.WithBackoffMaxDelay(1*time.Second))
-	if err != nil {
-		return fmt.Errorf("%v (%s)", err, ls.endpoint)
-	}
-	ls.conn = conn
-	ls.kvc = pb.NewKVClient(conn)
-	ls.lc = pb.NewLeaseClient(conn)
-
 	ls.aliveLeases = &atomicLeases{leases: make(map[int64]time.Time)}
 
 	return nil
@@ -132,6 +123,14 @@ func (ls *leaseStresser) Stress() error {
 	if err := ls.setupOnce(); err != nil {
 		return err
 	}
+
+	conn, err := grpc.Dial(ls.endpoint, grpc.WithInsecure(), grpc.WithBackoffMaxDelay(1*time.Second))
+	if err != nil {
+		return fmt.Errorf("%v (%s)", err, ls.endpoint)
+	}
+	ls.conn = conn
+	ls.kvc = pb.NewKVClient(conn)
+	ls.lc = pb.NewLeaseClient(conn)
 	ls.revokedLeases = &atomicLeases{leases: make(map[int64]time.Time)}
 	ls.shortLivedLeases = &atomicLeases{leases: make(map[int64]time.Time)}
 
@@ -255,42 +254,6 @@ func (ls *leaseStresser) randomlyDropLeases() {
 	wg.Wait()
 }
 
-func (ls *leaseStresser) getLeaseByID(ctx context.Context, leaseID int64) (*pb.LeaseTimeToLiveResponse, error) {
-	ltl := &pb.LeaseTimeToLiveRequest{ID: leaseID, Keys: true}
-	return ls.lc.LeaseTimeToLive(ctx, ltl, grpc.FailFast(false))
-}
-
-func (ls *leaseStresser) hasLeaseExpired(ctx context.Context, leaseID int64) (bool, error) {
-	// keep retrying until lease's state is known or ctx is being canceled
-	for ctx.Err() == nil {
-		resp, err := ls.getLeaseByID(ctx, leaseID)
-		if err == nil {
-			return false, nil
-		}
-		if rpctypes.Error(err) == rpctypes.ErrLeaseNotFound {
-			return true, nil
-		}
-		plog.Warningf("hasLeaseExpired %v resp %v error (%v)", leaseID, resp, err)
-	}
-	return false, ctx.Err()
-}
-
-// The keys attached to the lease has the format of "<leaseID>_<idx>" where idx is the ordering key creation
-// Since the format of keys contains about leaseID, finding keys base on "<leaseID>" prefix
-// determines whether the attached keys for a given leaseID has been deleted or not
-func (ls *leaseStresser) hasKeysAttachedToLeaseExpired(ctx context.Context, leaseID int64) (bool, error) {
-	resp, err := ls.kvc.Range(ctx, &pb.RangeRequest{
-		Key:      []byte(fmt.Sprintf("%d", leaseID)),
-		RangeEnd: []byte(clientv3.GetPrefixRangeEnd(fmt.Sprintf("%d", leaseID))),
-	}, grpc.FailFast(false))
-	plog.Debugf("hasKeysAttachedToLeaseExpired %v resp %v error (%v)", leaseID, resp, err)
-	if err != nil {
-		plog.Errorf("retriving keys attached to lease %v error: (%v)", leaseID, err)
-		return false, err
-	}
-	return len(resp.Kvs) == 0, nil
-}
-
 func (ls *leaseStresser) createLease(ttl int64) (int64, error) {
 	resp, err := ls.lc.LeaseGrant(ls.ctx, &pb.LeaseGrantRequest{TTL: ttl})
 	if err != nil {
@@ -402,6 +365,7 @@ func (ls *leaseStresser) Cancel() {
 	ls.cancel()
 	ls.runWg.Wait()
 	ls.aliveWg.Wait()
+	ls.conn.Close()
 	plog.Infof("lease stresser %q is canceled", ls.endpoint)
 }
 
@@ -409,4 +373,4 @@ func (ls *leaseStresser) ModifiedKeys() int64 {
 	return atomic.LoadInt64(&ls.atomicModifiedKey)
 }
 
-func (ls *leaseStresser) Checker() Checker { return &leaseChecker{ls} }
+func (ls *leaseStresser) Checker() Checker { return &leaseChecker{ls: ls} }

+ 25 - 20
tools/functional-tester/etcd-tester/tester.go

@@ -75,7 +75,7 @@ func (tt *tester) runLoop() {
 		preModifiedKey = currentModifiedKey
 		timeout := 10 * time.Second
 		timeout += time.Duration(modifiedKey/compactQPS) * time.Second
-		plog.Printf("%s compacting %d modifications (timeout %v)", tt.logPrefix(), modifiedKey, timeout)
+		plog.Infof("%s compacting %d modifications (timeout %v)", tt.logPrefix(), modifiedKey, timeout)
 		if err := tt.compact(revToCompact, timeout); err != nil {
 			plog.Warningf("%s functional-tester compact got error (%v)", tt.logPrefix(), err)
 			if tt.cleanup() != nil {
@@ -92,7 +92,7 @@ func (tt *tester) runLoop() {
 		}
 	}
 
-	plog.Printf("%s functional-tester is finished", tt.logPrefix())
+	plog.Infof("%s functional-tester is finished", tt.logPrefix())
 }
 
 func (tt *tester) doRound(round int) error {
@@ -103,26 +103,31 @@ func (tt *tester) doRound(round int) error {
 		if err := tt.cluster.WaitHealth(); err != nil {
 			return fmt.Errorf("wait full health error: %v", err)
 		}
-		plog.Printf("%s injecting failure %q", tt.logPrefix(), f.Desc())
+		plog.Infof("%s injecting failure %q", tt.logPrefix(), f.Desc())
 		if err := f.Inject(tt.cluster, round); err != nil {
 			return fmt.Errorf("injection error: %v", err)
 		}
-		plog.Printf("%s injected failure", tt.logPrefix())
+		plog.Infof("%s injected failure", tt.logPrefix())
 
-		plog.Printf("%s recovering failure %q", tt.logPrefix(), f.Desc())
+		plog.Infof("%s recovering failure %q", tt.logPrefix(), f.Desc())
 		if err := f.Recover(tt.cluster, round); err != nil {
 			return fmt.Errorf("recovery error: %v", err)
 		}
-		plog.Printf("%s recovered failure", tt.logPrefix())
+		plog.Infof("%s recovered failure", tt.logPrefix())
 		tt.cancelStresser()
-		plog.Printf("%s wait until cluster is healthy", tt.logPrefix())
+		plog.Infof("%s wait until cluster is healthy", tt.logPrefix())
 		if err := tt.cluster.WaitHealth(); err != nil {
 			return fmt.Errorf("wait full health error: %v", err)
 		}
+		plog.Infof("%s cluster is healthy", tt.logPrefix())
+
+		plog.Infof("%s checking consistency and invariant of cluster", tt.logPrefix())
 		if err := tt.checkConsistency(); err != nil {
 			return fmt.Errorf("tt.checkConsistency error (%v)", err)
 		}
-		plog.Printf("%s succeed!", tt.logPrefix())
+		plog.Infof("%s checking consistency and invariant of cluster done", tt.logPrefix())
+
+		plog.Infof("%s succeed!", tt.logPrefix())
 	}
 	return nil
 }
@@ -134,7 +139,7 @@ func (tt *tester) updateRevision() error {
 		break // just need get one of the current revisions
 	}
 
-	plog.Printf("%s updated current revision to %d", tt.logPrefix(), tt.currentRevision)
+	plog.Infof("%s updated current revision to %d", tt.logPrefix(), tt.currentRevision)
 	return err
 }
 
@@ -150,7 +155,7 @@ func (tt *tester) checkConsistency() (err error) {
 		err = tt.startStresser()
 	}()
 	if err = tt.checker.Check(); err != nil {
-		plog.Printf("%s %v", tt.logPrefix(), err)
+		plog.Infof("%s %v", tt.logPrefix(), err)
 	}
 	return err
 }
@@ -163,24 +168,24 @@ func (tt *tester) compact(rev int64, timeout time.Duration) (err error) {
 		}
 	}()
 
-	plog.Printf("%s compacting storage (current revision %d, compact revision %d)", tt.logPrefix(), tt.currentRevision, rev)
+	plog.Infof("%s compacting storage (current revision %d, compact revision %d)", tt.logPrefix(), tt.currentRevision, rev)
 	if err = tt.cluster.compactKV(rev, timeout); err != nil {
 		return err
 	}
-	plog.Printf("%s compacted storage (compact revision %d)", tt.logPrefix(), rev)
+	plog.Infof("%s compacted storage (compact revision %d)", tt.logPrefix(), rev)
 
-	plog.Printf("%s checking compaction (compact revision %d)", tt.logPrefix(), rev)
+	plog.Infof("%s checking compaction (compact revision %d)", tt.logPrefix(), rev)
 	if err = tt.cluster.checkCompact(rev); err != nil {
 		plog.Warningf("%s checkCompact error (%v)", tt.logPrefix(), err)
 		return err
 	}
 
-	plog.Printf("%s confirmed compaction (compact revision %d)", tt.logPrefix(), rev)
+	plog.Infof("%s confirmed compaction (compact revision %d)", tt.logPrefix(), rev)
 	return nil
 }
 
 func (tt *tester) defrag() error {
-	plog.Printf("%s defragmenting...", tt.logPrefix())
+	plog.Infof("%s defragmenting...", tt.logPrefix())
 	if err := tt.cluster.defrag(); err != nil {
 		plog.Warningf("%s defrag error (%v)", tt.logPrefix(), err)
 		if cerr := tt.cleanup(); cerr != nil {
@@ -188,7 +193,7 @@ func (tt *tester) defrag() error {
 		}
 		return err
 	}
-	plog.Printf("%s defragmented...", tt.logPrefix())
+	plog.Infof("%s defragmented...", tt.logPrefix())
 	return nil
 }
 
@@ -225,15 +230,15 @@ func (tt *tester) cleanup() error {
 }
 
 func (tt *tester) cancelStresser() {
-	plog.Printf("%s canceling the stressers...", tt.logPrefix())
+	plog.Infof("%s canceling the stressers...", tt.logPrefix())
 	tt.stresser.Cancel()
-	plog.Printf("%s canceled stressers", tt.logPrefix())
+	plog.Infof("%s canceled stressers", tt.logPrefix())
 }
 
 func (tt *tester) startStresser() (err error) {
-	plog.Printf("%s starting the stressers...", tt.logPrefix())
+	plog.Infof("%s starting the stressers...", tt.logPrefix())
 	err = tt.stresser.Stress()
-	plog.Printf("%s started stressers", tt.logPrefix())
+	plog.Infof("%s started stressers", tt.logPrefix())
 	return err
 }