浏览代码

Merge pull request #6779 from xiang90/watch_clean

etcd-runner: clean up watcher runner
Xiang Li 9 年之前
父节点
当前提交
0a8e28524b
共有 1 个文件被更改,包括 72 次插入59 次删除
  1. 72 59
      tools/functional-tester/etcd-runner/watcher.go

+ 72 - 59
tools/functional-tester/etcd-runner/watcher.go

@@ -29,6 +29,7 @@ import (
 func runWatcher(getClient getClientFunc, limit int) {
 func runWatcher(getClient getClientFunc, limit int) {
 	ctx := context.Background()
 	ctx := context.Background()
 	for round := 0; round < limit; round++ {
 	for round := 0; round < limit; round++ {
+		fmt.Println("round", round)
 		performWatchOnPrefixes(ctx, getClient, round)
 		performWatchOnPrefixes(ctx, getClient, round)
 	}
 	}
 }
 }
@@ -55,8 +56,10 @@ func performWatchOnPrefixes(ctx context.Context, getClient getClientFunc, round
 	client := getClient()
 	client := getClient()
 	defer client.Close()
 	defer client.Close()
 
 
-	// get revision using get request
-	gr = getWithRetry(client, ctx, "non-existent")
+	gr, err = getKey(ctx, client, "non-existent")
+	if err != nil {
+		log.Fatalf("failed to get the initial revision: %v", err)
+	}
 	revision = gr.Header.Revision
 	revision = gr.Header.Revision
 
 
 	ctxt, cancel := context.WithDeadline(ctx, time.Now().Add(runningTime))
 	ctxt, cancel := context.WithDeadline(ctx, time.Now().Add(runningTime))
@@ -66,34 +69,14 @@ func performWatchOnPrefixes(ctx context.Context, getClient getClientFunc, round
 	limiter := rate.NewLimiter(rate.Limit(reqRate), reqRate)
 	limiter := rate.NewLimiter(rate.Limit(reqRate), reqRate)
 
 
 	go func() {
 	go func() {
-		var modrevision int64
 		for _, key := range keys {
 		for _, key := range keys {
 			for _, prefix := range prefixes {
 			for _, prefix := range prefixes {
-				key := roundPrefix + "-" + prefix + "-" + key
-
-				// limit key put as per reqRate
 				if err = limiter.Wait(ctxt); err != nil {
 				if err = limiter.Wait(ctxt); err != nil {
-					break
-				}
-
-				modrevision = 0
-				gr = getWithRetry(client, ctxt, key)
-				kvs := gr.Kvs
-				if len(kvs) > 0 {
-					modrevision = gr.Kvs[0].ModRevision
+					return
 				}
 				}
-
-				for {
-					txn := client.Txn(ctxt)
-					_, err = txn.If(clientv3.Compare(clientv3.ModRevision(key), "=", modrevision)).Then(clientv3.OpPut(key, key)).Commit()
-
-					if err == nil {
-						break
-					}
-
-					if err == context.DeadlineExceeded {
-						return
-					}
+				if err = putKeyAtMostOnce(ctxt, client, roundPrefix+"-"+prefix+"-"+key); err != nil {
+					log.Fatalf("failed to put key: %v", err)
+					return
 				}
 				}
 			}
 			}
 		}
 		}
@@ -104,38 +87,25 @@ func performWatchOnPrefixes(ctx context.Context, getClient getClientFunc, round
 	wcs := make([]clientv3.WatchChan, 0)
 	wcs := make([]clientv3.WatchChan, 0)
 	rcs := make([]*clientv3.Client, 0)
 	rcs := make([]*clientv3.Client, 0)
 
 
-	wg.Add(noOfPrefixes * watchPerPrefix)
 	for _, prefix := range prefixes {
 	for _, prefix := range prefixes {
 		for j := 0; j < watchPerPrefix; j++ {
 		for j := 0; j < watchPerPrefix; j++ {
-			go func(prefix string) {
-				defer wg.Done()
+			rc := getClient()
+			rcs = append(rcs, rc)
 
 
-				rc := getClient()
-				rcs = append(rcs, rc)
-
-				wc := rc.Watch(ctxc, prefix, clientv3.WithPrefix(), clientv3.WithRev(revision))
-				wcs = append(wcs, wc)
-				for n := 0; n < len(keys); {
-					select {
-					case watchChan := <-wc:
-						for _, event := range watchChan.Events {
-							expectedKey := prefix + "-" + keys[n]
-							receivedKey := string(event.Kv.Key)
-							if expectedKey != receivedKey {
-								log.Fatalf("expected key %q, got %q for prefix : %q\n", expectedKey, receivedKey, prefix)
-							}
-							n++
-						}
-					case <-ctxt.Done():
-						return
-					}
-				}
-			}(roundPrefix + "-" + prefix)
+			watchPrefix := roundPrefix + "-" + prefix
+
+			wc := rc.Watch(ctxc, watchPrefix, clientv3.WithPrefix(), clientv3.WithRev(revision))
+			wcs = append(wcs, wc)
+
+			wg.Add(1)
+			go func() {
+				defer wg.Done()
+				checkWatchResponse(wc, watchPrefix, keys)
+			}()
 		}
 		}
 	}
 	}
 	wg.Wait()
 	wg.Wait()
 
 
-	// cancel all watch channels
 	cancelc()
 	cancelc()
 
 
 	// verify all watch channels are closed
 	// verify all watch channels are closed
@@ -149,21 +119,64 @@ func performWatchOnPrefixes(ctx context.Context, getClient getClientFunc, round
 		rc.Close()
 		rc.Close()
 	}
 	}
 
 
-	deletePrefixWithRety(client, ctx, roundPrefix)
+	if err = deletePrefix(ctx, client, roundPrefix); err != nil {
+		log.Fatalf("failed to clean up keys after test: %v", err)
+	}
+}
+
+func checkWatchResponse(wc clientv3.WatchChan, prefix string, keys []string) {
+	for n := 0; n < len(keys); {
+		wr, more := <-wc
+		if !more {
+			log.Fatalf("expect more keys (received %d/%d) for %s", len(keys), n, prefix)
+		}
+		for _, event := range wr.Events {
+			expectedKey := prefix + "-" + keys[n]
+			receivedKey := string(event.Kv.Key)
+			if expectedKey != receivedKey {
+				log.Fatalf("expected key %q, got %q for prefix : %q\n", expectedKey, receivedKey, prefix)
+			}
+			n++
+		}
+	}
+}
+
+func putKeyAtMostOnce(ctx context.Context, client *clientv3.Client, key string) error {
+	gr, err := getKey(ctx, client, key)
+	if err != nil {
+		return err
+	}
+
+	var modrev int64
+	if len(gr.Kvs) > 0 {
+		modrev = gr.Kvs[0].ModRevision
+	}
+
+	for ctx.Err() == nil {
+		_, err := client.Txn(ctx).If(clientv3.Compare(clientv3.ModRevision(key), "=", modrev)).Then(clientv3.OpPut(key, key)).Commit()
+
+		if err == nil {
+			return nil
+		}
+	}
+
+	return ctx.Err()
 }
 }
 
 
-func deletePrefixWithRety(client *clientv3.Client, ctx context.Context, key string) {
-	for {
-		if _, err := client.Delete(ctx, key, clientv3.WithRange(key+"z")); err == nil {
-			return
+func deletePrefix(ctx context.Context, client *clientv3.Client, key string) error {
+	for ctx.Err() == nil {
+		if _, err := client.Delete(ctx, key, clientv3.WithPrefix()); err == nil {
+			return nil
 		}
 		}
 	}
 	}
+	return ctx.Err()
 }
 }
 
 
-func getWithRetry(client *clientv3.Client, ctx context.Context, key string) *clientv3.GetResponse {
-	for {
+func getKey(ctx context.Context, client *clientv3.Client, key string) (*clientv3.GetResponse, error) {
+	for ctx.Err() == nil {
 		if gr, err := client.Get(ctx, key); err == nil {
 		if gr, err := client.Get(ctx, key); err == nil {
-			return gr
+			return gr, nil
 		}
 		}
 	}
 	}
+	return nil, ctx.Err()
 }
 }