Browse Source

etcd-tester: fix, clean up multiple things (#5462)

* etcd-tester: more logging, fix typo

* etcd-tester: fix prevCompactRev scope

Fix https://github.com/coreos/etcd/issues/5440.

* etcd-tester: move utils to bottom, clean up logs

And remove stresser operation inside defrag

* etcd-tester: separate update revision call

* etcd-tester: fix cleanup when case is -1
Gyu-Ho Lee 9 years ago
parent
commit
3ed5d28e2e

+ 2 - 1
tools/functional-tester/etcd-tester/cluster.go

@@ -175,6 +175,7 @@ func (c *cluster) WaitHealth() error {
 		if err == nil {
 			return nil
 		}
+		plog.Warningf("#%d setHealthKey error (%v)", i, err)
 		time.Sleep(time.Second)
 	}
 	return err
@@ -271,7 +272,7 @@ func setHealthKey(us []string) error {
 		cancel()
 		conn.Close()
 		if err != nil {
-			return err
+			return fmt.Errorf("%v (%s)", err, u)
 		}
 	}
 	return nil

+ 1 - 1
tools/functional-tester/etcd-tester/failure.go

@@ -26,7 +26,7 @@ const (
 	randomVariation    = 50
 
 	// Wait more when it recovers from slow network, because network layer
-	// needs extra time to propogate traffic control (tc command) change.
+	// needs extra time to propagate traffic control (tc command) change.
 	// Otherwise, we get different hash values from the previous revision.
 	// For more detail, please see https://github.com/coreos/etcd/issues/5121.
 	waitRecover = 5 * time.Second

+ 94 - 87
tools/functional-tester/etcd-tester/tester.go

@@ -40,16 +40,17 @@ func (tt *tester) runLoop() {
 	for _, f := range tt.failures {
 		tt.status.Failures = append(tt.status.Failures, f.Desc())
 	}
-	round := 0
+
+	var (
+		round          int
+		prevCompactRev int64
+	)
 	for {
 		tt.status.setRound(round)
 		tt.status.setCase(-1) // -1 so that logPrefix doesn't print out 'case'
 		roundTotalCounter.Inc()
 
-		var (
-			prevCompactRev int64
-			failed         bool
-		)
+		var failed bool
 		for j, f := range tt.failures {
 			caseTotalCounter.WithLabelValues(f.Desc()).Inc()
 			tt.status.setCase(j)
@@ -63,9 +64,7 @@ func (tt *tester) runLoop() {
 				break
 			}
 
-			plog.Printf("%s starting failure %s", tt.logPrefix(), f.Desc())
-
-			plog.Printf("%s injecting failure...", tt.logPrefix())
+			plog.Printf("%s injecting failure %q", tt.logPrefix(), f.Desc())
 			if err := f.Inject(tt.cluster, round); err != nil {
 				plog.Printf("%s injection error: %v", tt.logPrefix(), err)
 				if err := tt.cleanup(); err != nil {
@@ -76,7 +75,7 @@ func (tt *tester) runLoop() {
 			}
 			plog.Printf("%s injected failure", tt.logPrefix())
 
-			plog.Printf("%s recovering failure...", tt.logPrefix())
+			plog.Printf("%s recovering failure %q", tt.logPrefix(), f.Desc())
 			if err := f.Recover(tt.cluster, round); err != nil {
 				plog.Printf("%s recovery error: %v", tt.logPrefix(), err)
 				if err := tt.cleanup(); err != nil {
@@ -92,10 +91,18 @@ func (tt *tester) runLoop() {
 				continue
 			}
 
+			if !tt.consistencyCheck {
+				if err := tt.updateRevision(); err != nil {
+					plog.Warningf("%s functional-tester returning with tt.updateRevision error (%v)", tt.logPrefix(), err)
+					return
+				}
+				continue
+			}
+
 			var err error
-			failed, err = tt.updateCurrentRevisionHash(tt.consistencyCheck)
+			failed, err = tt.checkConsistency()
 			if err != nil {
-				plog.Warningf("%s functional-tester returning with error (%v)", tt.logPrefix(), err)
+				plog.Warningf("%s functional-tester returning with tt.checkConsistency error (%v)", tt.logPrefix(), err)
 				return
 			}
 			if failed {
@@ -139,44 +146,59 @@ func (tt *tester) runLoop() {
 	}
 }
 
-func (tt *tester) logPrefix() string {
-	var (
-		rd     = tt.status.getRound()
-		cs     = tt.status.getCase()
-		prefix = fmt.Sprintf("[round#%d case#%d]", rd, cs)
-	)
-	if cs == -1 {
-		prefix = fmt.Sprintf("[round#%d]", rd)
+func (tt *tester) updateRevision() error {
+	revs, _, err := tt.cluster.getRevisionHash()
+	for _, rev := range revs {
+		tt.currentRevision = rev
+		break // just need get one of the current revisions
 	}
-	return prefix
+	return err
 }
 
-func (tt *tester) cleanup() error {
-	roundFailedTotalCounter.Inc()
-	caseFailedTotalCounter.WithLabelValues(tt.failures[tt.status.Case].Desc()).Inc()
+func (tt *tester) checkConsistency() (failed bool, err error) {
+	tt.cancelStressers()
+	defer tt.startStressers()
 
-	plog.Printf("%s cleaning up...", tt.logPrefix())
-	if err := tt.cluster.Cleanup(); err != nil {
-		plog.Printf("%s cleanup error: %v", tt.logPrefix(), err)
-		return err
+	plog.Printf("%s updating current revisions...", tt.logPrefix())
+	var (
+		revs   map[string]int64
+		hashes map[string]int64
+		rerr   error
+		ok     bool
+	)
+	for i := 0; i < 7; i++ {
+		time.Sleep(time.Second)
+
+		revs, hashes, rerr = tt.cluster.getRevisionHash()
+		if rerr != nil {
+			plog.Printf("%s #%d failed to get current revisions (%v)", tt.logPrefix(), i, rerr)
+			continue
+		}
+		if tt.currentRevision, ok = getSameValue(revs); ok {
+			break
+		}
+
+		plog.Printf("%s #%d inconsistent current revisions %+v", tt.logPrefix(), i, revs)
 	}
-	return tt.cluster.Bootstrap()
-}
+	plog.Printf("%s updated current revisions with %d", tt.logPrefix(), tt.currentRevision)
 
-func (tt *tester) cancelStressers() {
-	plog.Printf("%s canceling the stressers...", tt.logPrefix())
-	for _, s := range tt.cluster.Stressers {
-		s.Cancel()
+	if !ok || rerr != nil {
+		plog.Printf("%s checking current revisions failed [revisions: %v]", tt.logPrefix(), revs)
+		failed = true
+		err = tt.cleanup()
+		return
 	}
-	plog.Printf("%s canceled stressers", tt.logPrefix())
-}
+	plog.Printf("%s all members are consistent with current revisions [revisions: %v]", tt.logPrefix(), revs)
 
-func (tt *tester) startStressers() {
-	plog.Printf("%s starting the stressers...", tt.logPrefix())
-	for _, s := range tt.cluster.Stressers {
-		go s.Stress()
+	plog.Printf("%s checking current storage hashes...", tt.logPrefix())
+	if _, ok = getSameValue(hashes); !ok {
+		plog.Printf("%s checking current storage hashes failed [hashes: %v]", tt.logPrefix(), hashes)
+		failed = true
+		err = tt.cleanup()
+		return
 	}
-	plog.Printf("%s started stressers", tt.logPrefix())
+	plog.Printf("%s all members are consistent with storage hashes", tt.logPrefix())
+	return
 }
 
 func (tt *tester) compact(rev int64, timeout time.Duration) error {
@@ -204,9 +226,6 @@ func (tt *tester) compact(rev int64, timeout time.Duration) error {
 }
 
 func (tt *tester) defrag() error {
-	tt.cancelStressers()
-	defer tt.startStressers()
-
 	plog.Printf("%s defragmenting...", tt.logPrefix())
 	if err := tt.cluster.defrag(); err != nil {
 		plog.Printf("%s defrag error (%v)", tt.logPrefix(), err)
@@ -220,58 +239,46 @@ func (tt *tester) defrag() error {
 	return nil
 }
 
-func (tt *tester) updateCurrentRevisionHash(check bool) (failed bool, err error) {
-	if check {
-		tt.cancelStressers()
-		defer tt.startStressers()
-	}
-
-	plog.Printf("%s updating current revisions...", tt.logPrefix())
+func (tt *tester) logPrefix() string {
 	var (
-		revs   map[string]int64
-		hashes map[string]int64
-		rerr   error
-		ok     bool
+		rd     = tt.status.getRound()
+		cs     = tt.status.getCase()
+		prefix = fmt.Sprintf("[round#%d case#%d]", rd, cs)
 	)
-	for i := 0; i < 7; i++ {
-		revs, hashes, rerr = tt.cluster.getRevisionHash()
-		if rerr != nil {
-			plog.Printf("%s #%d failed to get current revisions (%v)", tt.logPrefix(), i, rerr)
-			continue
-		}
-		if tt.currentRevision, ok = getSameValue(revs); ok {
-			break
-		}
-
-		plog.Printf("%s #%d inconsistent current revisions %+v", tt.logPrefix(), i, revs)
-		if !check {
-			break // if consistency check is false, just try once
-		}
+	if cs == -1 {
+		prefix = fmt.Sprintf("[round#%d]", rd)
+	}
+	return prefix
+}
 
-		time.Sleep(time.Second)
+func (tt *tester) cleanup() error {
+	roundFailedTotalCounter.Inc()
+	desc := "compact/defrag"
+	if tt.status.Case != -1 {
+		desc = tt.failures[tt.status.Case].Desc()
 	}
-	plog.Printf("%s updated current revisions with %d", tt.logPrefix(), tt.currentRevision)
+	caseFailedTotalCounter.WithLabelValues(desc).Inc()
 
-	if !check {
-		failed = false
-		return
+	plog.Printf("%s cleaning up...", tt.logPrefix())
+	if err := tt.cluster.Cleanup(); err != nil {
+		plog.Printf("%s cleanup error: %v", tt.logPrefix(), err)
+		return err
 	}
+	return tt.cluster.Bootstrap()
+}
 
-	if !ok || rerr != nil {
-		plog.Printf("%s checking current revisions failed [revisions: %v]", tt.logPrefix(), revs)
-		failed = true
-		err = tt.cleanup()
-		return
+func (tt *tester) cancelStressers() {
+	plog.Printf("%s canceling the stressers...", tt.logPrefix())
+	for _, s := range tt.cluster.Stressers {
+		s.Cancel()
 	}
-	plog.Printf("%s all members are consistent with current revisions [revisions: %v]", tt.logPrefix(), revs)
+	plog.Printf("%s canceled stressers", tt.logPrefix())
+}
 
-	plog.Printf("%s checking current storage hashes...", tt.logPrefix())
-	if _, ok = getSameValue(hashes); !ok {
-		plog.Printf("%s checking current storage hashes failed [hashes: %v]", tt.logPrefix(), hashes)
-		failed = true
-		err = tt.cleanup()
-		return
+func (tt *tester) startStressers() {
+	plog.Printf("%s starting the stressers...", tt.logPrefix())
+	for _, s := range tt.cluster.Stressers {
+		go s.Stress()
 	}
-	plog.Printf("%s all members are consistent with storage hashes", tt.logPrefix())
-	return
+	plog.Printf("%s started stressers", tt.logPrefix())
 }