Bläddra i källkod

etcd-runner: election mode

Anthony Romano 9 år sedan
förälder
incheckning
4fe91ed1e2
1 ändrade filer med 75 tillägg och 1 borttagningar
  1. 75 1
      tools/functional-tester/etcd-runner/main.go

+ 75 - 1
tools/functional-tester/etcd-runner/main.go

@@ -36,12 +36,14 @@ func main() {
 	log.SetFlags(log.Lmicroseconds)
 
 	endpointStr := flag.String("endpoints", "localhost:2379", "endpoints of etcd cluster")
-	mode := flag.String("mode", "lock-racer", "test mode (lock-racer, lease-renewer)")
+	mode := flag.String("mode", "lock-racer", "test mode (election, lock-racer, lease-renewer)")
 	round := flag.Int("rounds", 100, "number of rounds to run")
 	flag.Parse()
 	eps := strings.Split(*endpointStr, ",")
 
 	switch *mode {
+	case "election":
+		runElection(eps, *round)
 	case "lock-racer":
 		runRacer(eps, *round)
 	case "lease-renewer":
@@ -51,6 +53,78 @@ func main() {
 	}
 }
 
+func runElection(eps []string, rounds int) {
+	rcs := make([]roundClient, 15)
+	validatec, releasec := make(chan struct{}, len(rcs)), make(chan struct{}, len(rcs))
+	for range rcs {
+		releasec <- struct{}{}
+	}
+
+	for i := range rcs {
+		v := fmt.Sprintf("%d", i)
+		observedLeader := ""
+		validateWaiters := 0
+
+		rcs[i].c = randClient(eps)
+		e := concurrency.NewElection(rcs[i].c, "electors")
+
+		rcs[i].acquire = func() error {
+			<-releasec
+			ctx, cancel := context.WithCancel(context.Background())
+			go func() {
+				if ol, ok := <-e.Observe(ctx); ok {
+					observedLeader = string(ol.Kvs[0].Value)
+					if observedLeader != v {
+						cancel()
+					}
+				}
+			}()
+			err := e.Campaign(ctx, v)
+			if err == nil {
+				observedLeader = v
+			}
+			if observedLeader == v {
+				validateWaiters = len(rcs)
+			}
+			select {
+			case <-ctx.Done():
+				return nil
+			default:
+				cancel()
+				return err
+			}
+		}
+		rcs[i].validate = func() error {
+			if l, err := e.Leader(); err == nil && l != observedLeader {
+				return fmt.Errorf("expected leader %q, got %q", observedLeader, l)
+			}
+			validatec <- struct{}{}
+			return nil
+		}
+		rcs[i].release = func() error {
+			for validateWaiters > 0 {
+				select {
+				case <-validatec:
+					validateWaiters--
+				default:
+					return fmt.Errorf("waiting on followers")
+				}
+			}
+			if err := e.Resign(); err != nil {
+				return err
+			}
+			if observedLeader == v {
+				for range rcs {
+					releasec <- struct{}{}
+				}
+			}
+			observedLeader = ""
+			return nil
+		}
+	}
+	doRounds(rcs, rounds)
+}
+
 func runLeaseRenewer(eps []string) {
 	c := randClient(eps)
 	ctx := context.Background()