Browse Source

etcd-tester: stop stress before compact, fix races

fix race condition between stresser cancel, start
Gyu-Ho Lee 9 years ago
parent
commit
ceb9fe4822

+ 6 - 5
tools/functional-tester/etcd-tester/stresser.go

@@ -72,7 +72,6 @@ func (s *stresser) Stress() error {
 	if err != nil {
 	if err != nil {
 		return fmt.Errorf("%v (%s)", err, s.Endpoint)
 		return fmt.Errorf("%v (%s)", err, s.Endpoint)
 	}
 	}
-	defer conn.Close()
 	ctx, cancel := context.WithCancel(context.Background())
 	ctx, cancel := context.WithCancel(context.Background())
 
 
 	wg := &sync.WaitGroup{}
 	wg := &sync.WaitGroup{}
@@ -91,7 +90,7 @@ func (s *stresser) Stress() error {
 		go s.run(ctx, kvc)
 		go s.run(ctx, kvc)
 	}
 	}
 
 
-	<-ctx.Done()
+	plog.Printf("stresser %q is started", s.Endpoint)
 	return nil
 	return nil
 }
 }
 
 
@@ -161,11 +160,13 @@ func (s *stresser) run(ctx context.Context, kvc pb.KVClient) {
 
 
 func (s *stresser) Cancel() {
 func (s *stresser) Cancel() {
 	s.mu.Lock()
 	s.mu.Lock()
-	cancel, conn, wg := s.cancel, s.conn, s.wg
+	s.cancel()
+	s.conn.Close()
+	wg := s.wg
 	s.mu.Unlock()
 	s.mu.Unlock()
-	cancel()
+
 	wg.Wait()
 	wg.Wait()
-	conn.Close()
+	plog.Printf("stresser %q is canceled", s.Endpoint)
 }
 }
 
 
 func (s *stresser) Report() (int, int) {
 func (s *stresser) Report() (int, int) {

+ 22 - 6
tools/functional-tester/etcd-tester/tester.go

@@ -143,7 +143,12 @@ func (tt *tester) updateRevision() error {
 
 
 func (tt *tester) checkConsistency() (failed bool, err error) {
 func (tt *tester) checkConsistency() (failed bool, err error) {
 	tt.cancelStressers()
 	tt.cancelStressers()
-	defer tt.startStressers()
+	defer func() {
+		serr := tt.startStressers()
+		if err == nil {
+			err = serr
+		}
+	}()
 
 
 	plog.Printf("%s updating current revisions...", tt.logPrefix())
 	plog.Printf("%s updating current revisions...", tt.logPrefix())
 	var (
 	var (
@@ -184,15 +189,23 @@ func (tt *tester) checkConsistency() (failed bool, err error) {
 	return
 	return
 }
 }
 
 
-func (tt *tester) compact(rev int64, timeout time.Duration) error {
+func (tt *tester) compact(rev int64, timeout time.Duration) (err error) {
+	tt.cancelStressers()
+	defer func() {
+		serr := tt.startStressers()
+		if err == nil {
+			err = serr
+		}
+	}()
+
 	plog.Printf("%s compacting storage (current revision %d, compact revision %d)", tt.logPrefix(), tt.currentRevision, rev)
 	plog.Printf("%s compacting storage (current revision %d, compact revision %d)", tt.logPrefix(), tt.currentRevision, rev)
-	if err := tt.cluster.compactKV(rev, timeout); err != nil {
+	if err = tt.cluster.compactKV(rev, timeout); err != nil {
 		return err
 		return err
 	}
 	}
 	plog.Printf("%s compacted storage (compact revision %d)", tt.logPrefix(), rev)
 	plog.Printf("%s compacted storage (compact revision %d)", tt.logPrefix(), rev)
 
 
 	plog.Printf("%s checking compaction (compact revision %d)", tt.logPrefix(), rev)
 	plog.Printf("%s checking compaction (compact revision %d)", tt.logPrefix(), rev)
-	if err := tt.cluster.checkCompact(rev); err != nil {
+	if err = tt.cluster.checkCompact(rev); err != nil {
 		plog.Warningf("%s checkCompact error (%v)", tt.logPrefix(), err)
 		plog.Warningf("%s checkCompact error (%v)", tt.logPrefix(), err)
 		return err
 		return err
 	}
 	}
@@ -257,10 +270,13 @@ func (tt *tester) cancelStressers() {
 	plog.Printf("%s canceled stressers", tt.logPrefix())
 	plog.Printf("%s canceled stressers", tt.logPrefix())
 }
 }
 
 
-func (tt *tester) startStressers() {
+func (tt *tester) startStressers() error {
 	plog.Printf("%s starting the stressers...", tt.logPrefix())
 	plog.Printf("%s starting the stressers...", tt.logPrefix())
 	for _, s := range tt.cluster.Stressers {
 	for _, s := range tt.cluster.Stressers {
-		go s.Stress()
+		if err := s.Stress(); err != nil {
+			return err
+		}
 	}
 	}
 	plog.Printf("%s started stressers", tt.logPrefix())
 	plog.Printf("%s started stressers", tt.logPrefix())
+	return nil
 }
 }