Browse Source

etcd-runner: watcher runner respect rounds

sharat 9 years ago
parent
commit
90146d863c
1 changed files with 48 additions and 10 deletions
  1. 48 10
      tools/functional-tester/etcd-runner/main.go

+ 48 - 10
tools/functional-tester/etcd-runner/main.go

@@ -54,7 +54,7 @@ func main() {
 	case "lease-renewer":
 		runLeaseRenewer(eps)
 	case "watcher":
-		runWatcher(eps)
+		runWatcher(eps, *round)
 	default:
 		fmt.Fprintf(os.Stderr, "unsupported mode %v\n", *mode)
 	}
@@ -219,7 +219,14 @@ func runRacer(eps []string, round int) {
 	doRounds(rcs, round)
 }
 
-func runWatcher(eps []string) {
+func runWatcher(eps []string, limit int) {
+	ctx := context.Background()
+	for round := 0; round < limit; round++ {
+		performWatchOnPrefixes(ctx, eps, round)
+	}
+}
+
+func performWatchOnPrefixes(ctx context.Context, eps []string, round int) {
 	runningTime := 60 * time.Second // time for which operation should be performed
 	noOfPrefixes := 36              // total number of prefixes which will be watched upon
 	watchPerPrefix := 10            // number of watchers per prefix
@@ -229,6 +236,8 @@ func runWatcher(eps []string) {
 	prefixes := generateUniqueKeys(5, noOfPrefixes)
 	keys := generateRandomKeys(10, keyPrePrefix)
 
+	roundPrefix := fmt.Sprint("%16x", round)
+
 	var (
 		revision int64
 		wg       sync.WaitGroup
@@ -236,7 +245,6 @@ func runWatcher(eps []string) {
 		err      error
 	)
 
-	ctx := context.Background()
 	// create client for performing get and put operations
 	client := randClient(eps)
 	defer client.Close()
@@ -253,9 +261,9 @@ func runWatcher(eps []string) {
 
 	go func() {
 		var modrevision int64
-		for i := 0; i < len(keys); i++ {
-			for j := 0; j < len(prefixes); j++ {
-				key := prefixes[j] + "-" + keys[i]
+		for _, key := range keys {
+			for _, prefix := range prefixes {
+				key := roundPrefix + "-" + prefix + "-" + key
 
 				// limit key put as per reqRate
 				if err = limiter.Wait(ctxt); err != nil {
@@ -285,16 +293,22 @@ func runWatcher(eps []string) {
 		}
 	}()
 
+	ctxc, cancelc := context.WithCancel(ctx)
+
+	wcs := make([]clientv3.WatchChan, 0)
+	rcs := make([]*clientv3.Client, 0)
+
 	wg.Add(noOfPrefixes * watchPerPrefix)
-	for i := 0; i < noOfPrefixes; i++ {
+	for _, prefix := range prefixes {
 		for j := 0; j < watchPerPrefix; j++ {
 			go func(prefix string) {
 				defer wg.Done()
 
 				rc := randClient(eps)
-				defer rc.Close()
+				rcs = append(rcs, rc)
 
-				wc := rc.Watch(ctx, prefix, clientv3.WithPrefix(), clientv3.WithRev(revision))
+				wc := rc.Watch(ctxc, prefix, clientv3.WithPrefix(), clientv3.WithRev(revision))
+				wcs = append(wcs, wc)
 				for n := 0; n < len(keys); {
 					select {
 					case watchChan := <-wc:
@@ -310,10 +324,34 @@ func runWatcher(eps []string) {
 						return
 					}
 				}
-			}(prefixes[i])
+			}(roundPrefix + "-" + prefix)
 		}
 	}
 	wg.Wait()
+
+	// cancel all watch channels
+	cancelc()
+
+	// verify all watch channels are closed
+	for e, wc := range wcs {
+		if _, ok := <-wc; ok {
+			log.Fatalf("expected wc to be closed, but received %v", e)
+		}
+	}
+
+	for _, rc := range rcs {
+		rc.Close()
+	}
+
+	deletePrefixWithRety(client, ctx, roundPrefix)
+}
+
+func deletePrefixWithRety(client *clientv3.Client, ctx context.Context, key string) {
+	for {
+		if _, err := client.Delete(ctx, key, clientv3.WithRange(key+"z")); err == nil {
+			return
+		}
+	}
 }
 
 func getWithRetry(client *clientv3.Client, ctx context.Context, key string) *clientv3.GetResponse {