Browse Source

etcd-tester: changed compaction timeout calculation

functional tester sometime experiences timeout during compaction phase. I changed the timeout calculation base on number of entries created and deleted.

FIX #6805
fanmin shi 9 years ago
parent
commit
107d7b663c

+ 23 - 28
tools/functional-tester/etcd-tester/key_stresser.go

@@ -18,6 +18,7 @@ import (
 	"fmt"
 	"math/rand"
 	"sync"
+	"sync/atomic"
 	"time"
 
 	"golang.org/x/net/context" // grpc does a comparison on context.Cancel; can't use "context" package
@@ -46,9 +47,8 @@ type keyStresser struct {
 
 	cancel func()
 	conn   *grpc.ClientConn
-
-	success int
-	failure int
+	// atomicModifiedKeys records the number of keys created and deleted by the stresser.
+	atomicModifiedKeys int64
 
 	stressTable *stressTable
 }
@@ -100,18 +100,13 @@ func (s *keyStresser) run(ctx context.Context) {
 		// and immediate leader election. Find out what other cases this
 		// could be timed out.
 		sctx, scancel := context.WithTimeout(ctx, 10*time.Second)
-		err := s.stressTable.choose()(sctx)
+		err, modifiedKeys := s.stressTable.choose()(sctx)
 		scancel()
 		if err == nil {
-			s.mu.Lock()
-			s.success++
-			s.mu.Unlock()
+			atomic.AddInt64(&s.atomicModifiedKeys, modifiedKeys)
 			continue
 		}
 
-		s.mu.Lock()
-		s.failure++
-		s.mu.Unlock()
 		switch grpc.ErrorDesc(err) {
 		case context.DeadlineExceeded.Error():
 			// This retries when request is triggered at the same time as
@@ -140,8 +135,7 @@ func (s *keyStresser) run(ctx context.Context) {
 			// from stresser.Cancel method:
 			return
 		default:
-			su, fa := s.Report()
-			plog.Errorf("keyStresser %v (success %d, failure %d) exited with error (%v)", s.Endpoint, su, fa, err)
+			plog.Errorf("keyStresser %v exited with error (%v)", s.Endpoint, err)
 			return
 		}
 	}
@@ -154,15 +148,13 @@ func (s *keyStresser) Cancel() {
 	plog.Infof("keyStresser %q is canceled", s.Endpoint)
 }
 
-func (s *keyStresser) Report() (int, int) {
-	s.mu.Lock()
-	defer s.mu.Unlock()
-	return s.success, s.failure
+func (s *keyStresser) ModifiedKeys() int64 {
+	return atomic.LoadInt64(&s.atomicModifiedKeys)
 }
 
 func (s *keyStresser) Checker() Checker { return nil }
 
-type stressFunc func(ctx context.Context) error
+type stressFunc func(ctx context.Context) (err error, modifiedKeys int64)
 
 type stressEntry struct {
 	weight float32
@@ -197,53 +189,56 @@ func (st *stressTable) choose() stressFunc {
 }
 
 func newStressPut(kvc pb.KVClient, keySuffixRange, keySize int) stressFunc {
-	return func(ctx context.Context) error {
+	return func(ctx context.Context) (error, int64) {
 		_, err := kvc.Put(ctx, &pb.PutRequest{
 			Key:   []byte(fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange))),
 			Value: randBytes(keySize),
 		}, grpc.FailFast(false))
-		return err
+		return err, 1
 	}
 }
 
 func newStressRange(kvc pb.KVClient, keySuffixRange int) stressFunc {
-	return func(ctx context.Context) error {
+	return func(ctx context.Context) (error, int64) {
 		_, err := kvc.Range(ctx, &pb.RangeRequest{
 			Key: []byte(fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange))),
 		}, grpc.FailFast(false))
-		return err
+		return err, 0
 	}
 }
 
 func newStressRangeInterval(kvc pb.KVClient, keySuffixRange int) stressFunc {
-	return func(ctx context.Context) error {
+	return func(ctx context.Context) (error, int64) {
 		start := rand.Intn(keySuffixRange)
 		end := start + 500
 		_, err := kvc.Range(ctx, &pb.RangeRequest{
 			Key:      []byte(fmt.Sprintf("foo%016x", start)),
 			RangeEnd: []byte(fmt.Sprintf("foo%016x", end)),
 		}, grpc.FailFast(false))
-		return err
+		return err, 0
 	}
 }
 
 func newStressDelete(kvc pb.KVClient, keySuffixRange int) stressFunc {
-	return func(ctx context.Context) error {
+	return func(ctx context.Context) (error, int64) {
 		_, err := kvc.DeleteRange(ctx, &pb.DeleteRangeRequest{
 			Key: []byte(fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange))),
 		}, grpc.FailFast(false))
-		return err
+		return err, 1
 	}
 }
 
 func newStressDeleteInterval(kvc pb.KVClient, keySuffixRange int) stressFunc {
-	return func(ctx context.Context) error {
+	return func(ctx context.Context) (error, int64) {
 		start := rand.Intn(keySuffixRange)
 		end := start + 500
-		_, err := kvc.DeleteRange(ctx, &pb.DeleteRangeRequest{
+		resp, err := kvc.DeleteRange(ctx, &pb.DeleteRangeRequest{
 			Key:      []byte(fmt.Sprintf("foo%016x", start)),
 			RangeEnd: []byte(fmt.Sprintf("foo%016x", end)),
 		}, grpc.FailFast(false))
-		return err
+		if err == nil {
+			return nil, resp.Deleted
+		}
+		return err, 0
 	}
 }

+ 13 - 8
tools/functional-tester/etcd-tester/lease_stresser.go

@@ -18,6 +18,8 @@ import (
 	"fmt"
 	"math/rand"
 	"sync"
+	"sync/atomic"
+
 	"time"
 
 	"github.com/coreos/etcd/clientv3"
@@ -43,11 +45,10 @@ type leaseStresser struct {
 	ctx      context.Context
 
 	rateLimiter *rate.Limiter
-
-	success      int
-	failure      int
-	numLeases    int
-	keysPerLease int
+	// atomicModifiedKey records the number of keys created and deleted during a test case
+	atomicModifiedKey int64
+	numLeases         int
+	keysPerLease      int
 
 	aliveLeases      *atomicLeases
 	revokedLeases    *atomicLeases
@@ -147,7 +148,9 @@ func (ls *leaseStresser) run() {
 	defer ls.runWg.Done()
 	ls.restartKeepAlives()
 	for {
-		err := ls.rateLimiter.WaitN(ls.ctx, ls.numLeases*ls.keysPerLease)
+		// the number of keys created and deleted is roughly 2x the number of created keys for an iteration.
+		// the rateLimiter therefore consumes 2x ls.numLeases*ls.keysPerLease tokens where each token represents a create/delete operation for key.
+		err := ls.rateLimiter.WaitN(ls.ctx, 2*ls.numLeases*ls.keysPerLease)
 		if err == context.Canceled {
 			return
 		}
@@ -366,6 +369,8 @@ func (ls *leaseStresser) attachKeysWithLease(leaseID int64) error {
 		txn := &pb.TxnRequest{Success: txnPuts}
 		_, err := ls.kvc.Txn(ls.ctx, txn)
 		if err == nil {
+			// since all created keys will be deleted too, the number of operations on keys will be roughly 2x the number of created keys
+			atomic.AddInt64(&ls.atomicModifiedKey, 2*int64(ls.keysPerLease))
 			return nil
 		}
 		if rpctypes.Error(err) == rpctypes.ErrLeaseNotFound {
@@ -400,8 +405,8 @@ func (ls *leaseStresser) Cancel() {
 	plog.Infof("lease stresser %q is canceled", ls.endpoint)
 }
 
-func (ls *leaseStresser) Report() (int, int) {
-	return ls.success, ls.failure
+func (ls *leaseStresser) ModifiedKeys() int64 {
+	return atomic.LoadInt64(&ls.atomicModifiedKey)
 }
 
 func (ls *leaseStresser) Checker() Checker { return &leaseChecker{ls} }

+ 7 - 9
tools/functional-tester/etcd-tester/stresser.go

@@ -30,8 +30,8 @@ type Stresser interface {
 	Stress() error
 	// Cancel cancels the stress test on the etcd cluster
 	Cancel()
-	// Report reports the success and failure of the stress test
-	Report() (success int, failure int)
+	// ModifiedKeys reports the number of keys created and deleted by stresser
+	ModifiedKeys() int64
 	// Checker returns an invariant checker for after the stresser is canceled.
 	Checker() Checker
 }
@@ -44,8 +44,8 @@ type nopStresser struct {
 
 func (s *nopStresser) Stress() error { return nil }
 func (s *nopStresser) Cancel()       {}
-func (s *nopStresser) Report() (int, int) {
-	return int(time.Since(s.start).Seconds()) * s.qps, 0
+func (s *nopStresser) ModifiedKeys() int64 {
+	return 0
 }
 func (s *nopStresser) Checker() Checker { return nil }
 
@@ -79,13 +79,11 @@ func (cs *compositeStresser) Cancel() {
 	wg.Wait()
 }
 
-func (cs *compositeStresser) Report() (succ int, fail int) {
+func (cs *compositeStresser) ModifiedKeys() (modifiedKey int64) {
 	for _, stress := range cs.stressers {
-		s, f := stress.Report()
-		succ += s
-		fail += f
+		modifiedKey += stress.ModifiedKeys()
 	}
-	return succ, fail
+	return modifiedKey
 }
 
 func (cs *compositeStresser) Checker() Checker {

+ 12 - 12
tools/functional-tester/etcd-tester/tester.go

@@ -48,11 +48,11 @@ func (tt *tester) runLoop() {
 	}
 
 	if err := tt.resetStressCheck(); err != nil {
-		plog.Errorf("%s failed to start stresser (%v)", err)
+		plog.Errorf("%s failed to start stresser (%v)", tt.logPrefix(), err)
 		return
 	}
 
-	var prevCompactRev int64
+	var preModifiedKey int64
 	for round := 0; round < tt.limit || tt.limit == -1; round++ {
 		tt.status.setRound(round)
 		roundTotalCounter.Inc()
@@ -62,27 +62,27 @@ func (tt *tester) runLoop() {
 			if tt.cleanup() != nil {
 				return
 			}
-			prevCompactRev = 0 // reset after clean up
+			// reset preModifiedKey after clean up
+			preModifiedKey = 0
 			continue
 		}
 		// -1 so that logPrefix doesn't print out 'case'
 		tt.status.setCase(-1)
 
 		revToCompact := max(0, tt.currentRevision-10000)
-		compactN := revToCompact - prevCompactRev
+		currentModifiedKey := tt.stresser.ModifiedKeys()
+		modifiedKey := currentModifiedKey - preModifiedKey
+		preModifiedKey = currentModifiedKey
 		timeout := 10 * time.Second
-		if compactN > 0 {
-			timeout += time.Duration(compactN/compactQPS) * time.Second
-		}
-		prevCompactRev = revToCompact
-
-		plog.Printf("%s compacting %d entries (timeout %v)", tt.logPrefix(), compactN, timeout)
+		timeout += time.Duration(modifiedKey/compactQPS) * time.Second
+		plog.Printf("%s compacting %d modifications (timeout %v)", tt.logPrefix(), modifiedKey, timeout)
 		if err := tt.compact(revToCompact, timeout); err != nil {
 			plog.Warningf("%s functional-tester compact got error (%v)", tt.logPrefix(), err)
 			if tt.cleanup() != nil {
 				return
 			}
-			prevCompactRev = 0 // reset after clean up
+			// reset preModifiedKey after clean up
+			preModifiedKey = 0
 		}
 		if round > 0 && round%500 == 0 { // every 500 rounds
 			if err := tt.defrag(); err != nil {
@@ -257,4 +257,4 @@ func (tt *tester) resetStressCheck() error {
 	return tt.startStresser()
 }
 
-func (tt *tester) Report() (success, failure int) { return tt.stresser.Report() }
+func (tt *tester) Report() int64 { return tt.stresser.ModifiedKeys() }

+ 8 - 14
tools/functional-tester/etcd-tester/v2_stresser.go

@@ -21,6 +21,7 @@ import (
 	"net"
 	"net/http"
 	"sync"
+	"sync/atomic"
 	"time"
 
 	"golang.org/x/time/rate"
@@ -40,9 +41,8 @@ type v2Stresser struct {
 
 	wg sync.WaitGroup
 
-	mu      sync.Mutex
-	failure int
-	success int
+	mu                sync.Mutex
+	atomicModifiedKey int64
 
 	cancel func()
 }
@@ -84,17 +84,13 @@ func (s *v2Stresser) run(ctx context.Context, kv clientV2.KeysAPI) {
 		setctx, setcancel := context.WithTimeout(ctx, clientV2.DefaultRequestTimeout)
 		key := fmt.Sprintf("foo%016x", rand.Intn(s.keySuffixRange))
 		_, err := kv.Set(setctx, key, string(randBytes(s.keySize)), nil)
+		if err == nil {
+			atomic.AddInt64(&s.atomicModifiedKey, 1)
+		}
 		setcancel()
 		if err == context.Canceled {
 			return
 		}
-		s.mu.Lock()
-		if err != nil {
-			s.failure++
-		} else {
-			s.success++
-		}
-		s.mu.Unlock()
 	}
 }
 
@@ -103,10 +99,8 @@ func (s *v2Stresser) Cancel() {
 	s.wg.Wait()
 }
 
-func (s *v2Stresser) Report() (success int, failure int) {
-	s.mu.Lock()
-	defer s.mu.Unlock()
-	return s.success, s.failure
+func (s *v2Stresser) ModifiedKeys() int64 {
+	return atomic.LoadInt64(&s.atomicModifiedKey)
 }
 
 func (s *v2Stresser) Checker() Checker { return nil }