Selaa lähdekoodia

Merge pull request #5340 from heyitsanthony/etcd-runner-election

etcd-runner: election mode
Anthony Romano 9 vuotta sitten
vanhempi
commit
4612e2d59a
1 muutettua tiedostoa jossa 141 lisäystä ja 62 poistoa
  1. 141 62
      tools/functional-tester/etcd-runner/main.go

+ 141 - 62
tools/functional-tester/etcd-runner/main.go

@@ -36,12 +36,14 @@ func main() {
 	log.SetFlags(log.Lmicroseconds)
 
 	endpointStr := flag.String("endpoints", "localhost:2379", "endpoints of etcd cluster")
-	mode := flag.String("mode", "lock-racer", "test mode (lock-racer)")
+	mode := flag.String("mode", "lock-racer", "test mode (election, lock-racer, lease-renewer)")
 	round := flag.Int("rounds", 100, "number of rounds to run")
 	flag.Parse()
 	eps := strings.Split(*endpointStr, ",")
 
 	switch *mode {
+	case "election":
+		runElection(eps, *round)
 	case "lock-racer":
 		runRacer(eps, *round)
 	case "lease-renewer":
@@ -51,6 +53,78 @@ func main() {
 	}
 }
 
+func runElection(eps []string, rounds int) {
+	rcs := make([]roundClient, 15)
+	validatec, releasec := make(chan struct{}, len(rcs)), make(chan struct{}, len(rcs))
+	for range rcs {
+		releasec <- struct{}{}
+	}
+
+	for i := range rcs {
+		v := fmt.Sprintf("%d", i)
+		observedLeader := ""
+		validateWaiters := 0
+
+		rcs[i].c = randClient(eps)
+		e := concurrency.NewElection(rcs[i].c, "electors")
+
+		rcs[i].acquire = func() error {
+			<-releasec
+			ctx, cancel := context.WithCancel(context.Background())
+			go func() {
+				if ol, ok := <-e.Observe(ctx); ok {
+					observedLeader = string(ol.Kvs[0].Value)
+					if observedLeader != v {
+						cancel()
+					}
+				}
+			}()
+			err := e.Campaign(ctx, v)
+			if err == nil {
+				observedLeader = v
+			}
+			if observedLeader == v {
+				validateWaiters = len(rcs)
+			}
+			select {
+			case <-ctx.Done():
+				return nil
+			default:
+				cancel()
+				return err
+			}
+		}
+		rcs[i].validate = func() error {
+			if l, err := e.Leader(); err == nil && l != observedLeader {
+				return fmt.Errorf("expected leader %q, got %q", observedLeader, l)
+			}
+			validatec <- struct{}{}
+			return nil
+		}
+		rcs[i].release = func() error {
+			for validateWaiters > 0 {
+				select {
+				case <-validatec:
+					validateWaiters--
+				default:
+					return fmt.Errorf("waiting on followers")
+				}
+			}
+			if err := e.Resign(); err != nil {
+				return err
+			}
+			if observedLeader == v {
+				for range rcs {
+					releasec <- struct{}{}
+				}
+			}
+			observedLeader = ""
+			return nil
+		}
+	}
+	doRounds(rcs, rounds)
+}
+
 func runLeaseRenewer(eps []string) {
 	c := randClient(eps)
 	ctx := context.Background()
@@ -94,67 +168,92 @@ func runLeaseRenewer(eps []string) {
 }
 
 func runRacer(eps []string, round int) {
-	nrace := 15
-	prefix := "racers"
-	racers := make([]*concurrency.Mutex, nrace)
-	clis := make([]*clientv3.Client, nrace)
-	progress := make([]int, nrace)
-	finished := make(chan struct{}, 0)
-
-	var (
-		mu  sync.Mutex
-		cnt int
-	)
+	rcs := make([]roundClient, 15)
 	ctx := context.Background()
+	cnt := 0
+	for i := range rcs {
+		rcs[i].c = randClient(eps)
+		m := concurrency.NewMutex(rcs[i].c, "racers")
+		rcs[i].acquire = func() error { return m.Lock(ctx) }
+		rcs[i].validate = func() error {
+			if cnt++; cnt != 1 {
+				return fmt.Errorf("bad lock; count: %d", cnt)
+			}
+			return nil
+		}
+		rcs[i].release = func() error {
+			if err := m.Unlock(); err != nil {
+				return err
+			}
+			cnt = 0
+			return nil
+		}
+	}
+	doRounds(rcs, round)
+}
 
-	var wg sync.WaitGroup
+func randClient(eps []string) *clientv3.Client {
+	neps := make([]string, len(eps))
+	copy(neps, eps)
+
+	for i := range neps {
+		j := rand.Intn(i + 1)
+		neps[i], neps[j] = neps[j], neps[i]
+	}
 
-	for i := range racers {
-		clis[i] = randClient(eps)
-		racers[i] = concurrency.NewMutex(clis[i], prefix)
-		wg.Add(1)
+	c, err := clientv3.New(clientv3.Config{
+		Endpoints:   eps,
+		DialTimeout: 5 * time.Second,
+	})
+	if err != nil {
+		log.Fatal(err)
+	}
+	return c
+}
 
-		go func(i int) {
-			defer wg.Done()
+type roundClient struct {
+	c        *clientv3.Client
+	progress int
+	acquire  func() error
+	validate func() error
+	release  func() error
+}
 
-			for {
-				if progress[i] >= round {
-					return
-				}
+func doRounds(rcs []roundClient, rounds int) {
+	var mu sync.Mutex
+	var wg sync.WaitGroup
 
-				for {
-					err := racers[i].Lock(ctx)
-					if err == nil {
-						break
-					}
+	wg.Add(len(rcs))
+	finished := make(chan struct{}, 0)
+	for i := range rcs {
+		go func(rc *roundClient) {
+			defer wg.Done()
+			for rc.progress < rounds {
+				for rc.acquire() != nil { /* spin */
 				}
 
 				mu.Lock()
-				if cnt > 0 {
-					log.Fatalf("bad lock")
+				if err := rc.validate(); err != nil {
+					log.Fatal(err)
 				}
-				cnt = 1
 				mu.Unlock()
 
 				time.Sleep(10 * time.Millisecond)
-				progress[i]++
+				rc.progress++
 				finished <- struct{}{}
 
 				mu.Lock()
-				for {
-					err := racers[i].Unlock()
-					if err == nil {
-						break
-					}
+				for rc.release() != nil {
+					mu.Unlock()
+					mu.Lock()
 				}
-				cnt = 0
 				mu.Unlock()
 			}
-		}(i)
+		}(&rcs[i])
 	}
 
 	start := time.Now()
-	for i := 1; i < nrace*round+1; i++ {
+	for i := 1; i < len(rcs)*rounds+1; i++ {
 		select {
 		case <-finished:
 			if i%100 == 0 {
@@ -167,27 +266,7 @@ func runRacer(eps []string, round int) {
 	}
 
 	wg.Wait()
-
-	for _, cli := range clis {
-		cli.Close()
+	for _, rc := range rcs {
+		rc.c.Close()
 	}
 }
-
-func randClient(eps []string) *clientv3.Client {
-	neps := make([]string, len(eps))
-	copy(neps, eps)
-
-	for i := range neps {
-		j := rand.Intn(i + 1)
-		neps[i], neps[j] = neps[j], neps[i]
-	}
-
-	c, err := clientv3.New(clientv3.Config{
-		Endpoints:   eps,
-		DialTimeout: 5 * time.Second,
-	})
-	if err != nil {
-		log.Fatal(err)
-	}
-	return c
-}