Browse Source

Merge pull request #5943 from gyuho/pause-before-compaction-2

etcd-tester: pause before compaction, fix races, cleanups
Gyu-Ho Lee 9 years ago
parent
commit
14d7dc940d

+ 1 - 1
tools/functional-tester/etcd-tester/main.go

@@ -32,7 +32,7 @@ func main() {
 	stressKeySize := flag.Int("stress-key-size", 100, "the size of each key written into etcd.")
 	stressKeySuffixRange := flag.Int("stress-key-count", 250000, "the count of key range written into etcd.")
 	limit := flag.Int("limit", -1, "the limit of rounds to run failure set (-1 to run without limits).")
-	stressQPS := flag.Int("stress-qps", 5000, "maximum number of stresser requests per second.")
+	stressQPS := flag.Int("stress-qps", 10000, "maximum number of stresser requests per second.")
 	schedCases := flag.String("schedule-cases", "", "test case schedule")
 	consistencyCheck := flag.Bool("consistency-check", true, "true to check consistency (revision, hash)")
 	isV2Only := flag.Bool("v2-only", false, "'true' to run V2 only tester.")

+ 6 - 5
tools/functional-tester/etcd-tester/stresser.go

@@ -72,7 +72,6 @@ func (s *stresser) Stress() error {
 	if err != nil {
 		return fmt.Errorf("%v (%s)", err, s.Endpoint)
 	}
-	defer conn.Close()
 	ctx, cancel := context.WithCancel(context.Background())
 
 	wg := &sync.WaitGroup{}
@@ -91,7 +90,7 @@ func (s *stresser) Stress() error {
 		go s.run(ctx, kvc)
 	}
 
-	<-ctx.Done()
+	plog.Printf("stresser %q is started", s.Endpoint)
 	return nil
 }
 
@@ -161,11 +160,13 @@ func (s *stresser) run(ctx context.Context, kvc pb.KVClient) {
 
 func (s *stresser) Cancel() {
 	s.mu.Lock()
-	cancel, conn, wg := s.cancel, s.conn, s.wg
+	s.cancel()
+	s.conn.Close()
+	wg := s.wg
 	s.mu.Unlock()
-	cancel()
+
 	wg.Wait()
-	conn.Close()
+	plog.Printf("stresser %q is canceled", s.Endpoint)
 }
 
 func (s *stresser) Report() (int, int) {

+ 26 - 13
tools/functional-tester/etcd-tester/tester.go

@@ -143,21 +143,25 @@ func (tt *tester) updateRevision() error {
 
 func (tt *tester) checkConsistency() (failed bool, err error) {
 	tt.cancelStressers()
-	defer tt.startStressers()
+	defer func() {
+		serr := tt.startStressers()
+		if err == nil {
+			err = serr
+		}
+	}()
 
 	plog.Printf("%s updating current revisions...", tt.logPrefix())
 	var (
 		revs   map[string]int64
 		hashes map[string]int64
-		rerr   error
 		ok     bool
 	)
 	for i := 0; i < 7; i++ {
 		time.Sleep(time.Second)
 
-		revs, hashes, rerr = tt.cluster.getRevisionHash()
-		if rerr != nil {
-			plog.Printf("%s #%d failed to get current revisions (%v)", tt.logPrefix(), i, rerr)
+		revs, hashes, err = tt.cluster.getRevisionHash()
+		if err != nil {
+			plog.Printf("%s #%d failed to get current revisions (%v)", tt.logPrefix(), i, err)
 			continue
 		}
 		if tt.currentRevision, ok = getSameValue(revs); ok {
@@ -168,10 +172,9 @@ func (tt *tester) checkConsistency() (failed bool, err error) {
 	}
 	plog.Printf("%s updated current revisions with %d", tt.logPrefix(), tt.currentRevision)
 
-	if !ok || rerr != nil {
+	if !ok || err != nil {
 		plog.Printf("%s checking current revisions failed [revisions: %v]", tt.logPrefix(), revs)
 		failed = true
-		err = tt.cleanup()
 		return
 	}
 	plog.Printf("%s all members are consistent with current revisions [revisions: %v]", tt.logPrefix(), revs)
@@ -180,22 +183,29 @@ func (tt *tester) checkConsistency() (failed bool, err error) {
 	if _, ok = getSameValue(hashes); !ok {
 		plog.Printf("%s checking current storage hashes failed [hashes: %v]", tt.logPrefix(), hashes)
 		failed = true
-		err = tt.cleanup()
 		return
 	}
 	plog.Printf("%s all members are consistent with storage hashes", tt.logPrefix())
 	return
 }
 
-func (tt *tester) compact(rev int64, timeout time.Duration) error {
+func (tt *tester) compact(rev int64, timeout time.Duration) (err error) {
+	tt.cancelStressers()
+	defer func() {
+		serr := tt.startStressers()
+		if err == nil {
+			err = serr
+		}
+	}()
+
 	plog.Printf("%s compacting storage (current revision %d, compact revision %d)", tt.logPrefix(), tt.currentRevision, rev)
-	if err := tt.cluster.compactKV(rev, timeout); err != nil {
+	if err = tt.cluster.compactKV(rev, timeout); err != nil {
 		return err
 	}
 	plog.Printf("%s compacted storage (compact revision %d)", tt.logPrefix(), rev)
 
 	plog.Printf("%s checking compaction (compact revision %d)", tt.logPrefix(), rev)
-	if err := tt.cluster.checkCompact(rev); err != nil {
+	if err = tt.cluster.checkCompact(rev); err != nil {
 		plog.Warningf("%s checkCompact error (%v)", tt.logPrefix(), err)
 		return err
 	}
@@ -260,10 +270,13 @@ func (tt *tester) cancelStressers() {
 	plog.Printf("%s canceled stressers", tt.logPrefix())
 }
 
-func (tt *tester) startStressers() {
+func (tt *tester) startStressers() error {
 	plog.Printf("%s starting the stressers...", tt.logPrefix())
 	for _, s := range tt.cluster.Stressers {
-		go s.Stress()
+		if err := s.Stress(); err != nil {
+			return err
+		}
 	}
 	plog.Printf("%s started stressers", tt.logPrefix())
+	return nil
 }