Browse Source

Merge pull request #7902 from fanminshi/fix_runner

etcd-runner: remove mutex on validate() and release() in global.go
fanmin shi 8 years ago
parent
commit
066062a5e0

+ 5 - 3
tools/functional-tester/etcd-runner/command/election_command.go

@@ -89,14 +89,14 @@ func runElectionFunc(cmd *cobra.Command, args []string) {
 				}
 				}
 			}()
 			}()
 			err = e.Campaign(ctx, v)
 			err = e.Campaign(ctx, v)
+			cancel()
+			<-donec
 			if err == nil {
 			if err == nil {
 				observedLeader = v
 				observedLeader = v
 			}
 			}
 			if observedLeader == v {
 			if observedLeader == v {
 				validateWaiters = len(rcs)
 				validateWaiters = len(rcs)
 			}
 			}
-			cancel()
-			<-donec
 			select {
 			select {
 			case <-ctx.Done():
 			case <-ctx.Done():
 				return nil
 				return nil
@@ -129,8 +129,10 @@ func runElectionFunc(cmd *cobra.Command, args []string) {
 				return err
 				return err
 			}
 			}
 			if observedLeader == v {
 			if observedLeader == v {
-				close(nextc)
+				oldNextc := nextc
 				nextc = make(chan struct{})
 				nextc = make(chan struct{})
+				close(oldNextc)
+
 			}
 			}
 			<-rcNextc
 			<-rcNextc
 			observedLeader = ""
 			observedLeader = ""

+ 0 - 7
tools/functional-tester/etcd-runner/command/global.go

@@ -56,7 +56,6 @@ func newClient(eps []string, timeout time.Duration) *clientv3.Client {
 }
 }
 
 
 func doRounds(rcs []roundClient, rounds int, requests int) {
 func doRounds(rcs []roundClient, rounds int, requests int) {
-	var mu sync.Mutex
 	var wg sync.WaitGroup
 	var wg sync.WaitGroup
 
 
 	wg.Add(len(rcs))
 	wg.Add(len(rcs))
@@ -73,22 +72,16 @@ func doRounds(rcs []roundClient, rounds int, requests int) {
 				for rc.acquire() != nil { /* spin */
 				for rc.acquire() != nil { /* spin */
 				}
 				}
 
 
-				mu.Lock()
 				if err := rc.validate(); err != nil {
 				if err := rc.validate(); err != nil {
 					log.Fatal(err)
 					log.Fatal(err)
 				}
 				}
-				mu.Unlock()
 
 
 				time.Sleep(10 * time.Millisecond)
 				time.Sleep(10 * time.Millisecond)
 				rc.progress++
 				rc.progress++
 				finished <- struct{}{}
 				finished <- struct{}{}
 
 
-				mu.Lock()
 				for rc.release() != nil { /* spin */
 				for rc.release() != nil { /* spin */
-					mu.Unlock()
-					mu.Lock()
 				}
 				}
-				mu.Unlock()
 			}
 			}
 		}(&rcs[i])
 		}(&rcs[i])
 	}
 	}

+ 7 - 0
tools/functional-tester/etcd-runner/command/lock_racer_command.go

@@ -18,6 +18,7 @@ import (
 	"context"
 	"context"
 	"errors"
 	"errors"
 	"fmt"
 	"fmt"
+	"sync"
 
 
 	"github.com/coreos/etcd/clientv3/concurrency"
 	"github.com/coreos/etcd/clientv3/concurrency"
 
 
@@ -47,6 +48,8 @@ func runRacerFunc(cmd *cobra.Command, args []string) {
 
 
 	rcs := make([]roundClient, totalClientConnections)
 	rcs := make([]roundClient, totalClientConnections)
 	ctx := context.Background()
 	ctx := context.Background()
+	// mu ensures validate and release funcs are atomic.
+	var mu sync.Mutex
 	cnt := 0
 	cnt := 0
 
 
 	eps := endpointsFromFlag(cmd)
 	eps := endpointsFromFlag(cmd)
@@ -69,12 +72,16 @@ func runRacerFunc(cmd *cobra.Command, args []string) {
 		m := concurrency.NewMutex(s, racers)
 		m := concurrency.NewMutex(s, racers)
 		rcs[i].acquire = func() error { return m.Lock(ctx) }
 		rcs[i].acquire = func() error { return m.Lock(ctx) }
 		rcs[i].validate = func() error {
 		rcs[i].validate = func() error {
+			mu.Lock()
+			defer mu.Unlock()
 			if cnt++; cnt != 1 {
 			if cnt++; cnt != 1 {
 				return fmt.Errorf("bad lock; count: %d", cnt)
 				return fmt.Errorf("bad lock; count: %d", cnt)
 			}
 			}
 			return nil
 			return nil
 		}
 		}
 		rcs[i].release = func() error {
 		rcs[i].release = func() error {
+			mu.Lock()
+			defer mu.Unlock()
 			if err := m.Unlock(ctx); err != nil {
 			if err := m.Unlock(ctx); err != nil {
 				return err
 				return err
 			}
 			}