Browse Source

etcd-runner: make run watcher fail safe

sharat 9 years ago
parent
commit
50523e22d8
1 changed files with 30 additions and 16 deletions
  1. 30 16
      tools/functional-tester/etcd-runner/main.go

+ 30 - 16
tools/functional-tester/etcd-runner/main.go

@@ -242,10 +242,7 @@ func runWatcher(eps []string) {
 	defer client.Close()
 	defer client.Close()
 
 
 	// get revision using get request
 	// get revision using get request
-	gr, err = client.Get(ctx, "non-existant")
-	if err != nil {
-		log.Fatal("Error occured while trying to get the revision.")
-	}
+	gr = getWithRetry(client, ctx, "non-existant")
 	revision = gr.Header.Revision
 	revision = gr.Header.Revision
 
 
 	ctxt, cancel := context.WithDeadline(ctx, time.Now().Add(runningTime))
 	ctxt, cancel := context.WithDeadline(ctx, time.Now().Add(runningTime))
@@ -255,25 +252,34 @@ func runWatcher(eps []string) {
 	limiter := rate.NewLimiter(rate.Limit(reqRate), reqRate)
 	limiter := rate.NewLimiter(rate.Limit(reqRate), reqRate)
 
 
 	go func() {
 	go func() {
-		count := 0
+		var modrevision int64
 		for i := 0; i < len(keys); i++ {
 		for i := 0; i < len(keys); i++ {
 			for j := 0; j < len(prefixes); j++ {
 			for j := 0; j < len(prefixes); j++ {
 				key := prefixes[j] + "-" + keys[i]
 				key := prefixes[j] + "-" + keys[i]
+
 				// limit key put as per reqRate
 				// limit key put as per reqRate
 				if err = limiter.Wait(ctxt); err != nil {
 				if err = limiter.Wait(ctxt); err != nil {
 					break
 					break
 				}
 				}
 
 
-				// perform the put operation
-				_, err = client.Put(ctxt, key, key)
-				count++
-				if err == context.DeadlineExceeded {
-					break
+				modrevision = 0
+				gr = getWithRetry(client, ctxt, key)
+				kvs := gr.Kvs
+				if len(kvs) > 0 {
+					modrevision = gr.Kvs[0].ModRevision
 				}
 				}
 
 
-				if err != nil {
-					log.Printf("Error: %v occured while trying to key: %v, value : %v to kv store.", err, key, key)
-					continue
+				for {
+					txn := client.Txn(ctxt)
+					_, err = txn.If(clientv3.Compare(clientv3.ModRevision(key), "=", modrevision)).Then(clientv3.OpPut(key, key)).Commit()
+
+					if err == nil {
+						break
+					}
+
+					if err == context.DeadlineExceeded {
+						return
+					}
 				}
 				}
 			}
 			}
 		}
 		}
@@ -289,14 +295,14 @@ func runWatcher(eps []string) {
 				defer rc.Close()
 				defer rc.Close()
 
 
 				wc := rc.Watch(ctx, prefix, clientv3.WithPrefix(), clientv3.WithRev(revision))
 				wc := rc.Watch(ctx, prefix, clientv3.WithPrefix(), clientv3.WithRev(revision))
-
 				for n := 0; n < len(keys); {
 				for n := 0; n < len(keys); {
 					select {
 					select {
 					case watchChan := <-wc:
 					case watchChan := <-wc:
 						for _, event := range watchChan.Events {
 						for _, event := range watchChan.Events {
 							expectedKey := prefix + "-" + keys[n]
 							expectedKey := prefix + "-" + keys[n]
-							if expectedKey != string(event.Kv.Key) {
-								log.Fatalf("expected key %q, got %q", expectedKey, string(event.Kv.Key))
+							receivedKey := string(event.Kv.Key)
+							if expectedKey != receivedKey {
+								log.Fatalf("expected key %q, got %q for prefix : %q\n", expectedKey, receivedKey, prefix)
 							}
 							}
 							n++
 							n++
 						}
 						}
@@ -310,6 +316,14 @@ func runWatcher(eps []string) {
 	wg.Wait()
 	wg.Wait()
 }
 }
 
 
+func getWithRetry(client *clientv3.Client, ctx context.Context, key string) *clientv3.GetResponse {
+	for {
+		if gr, err := client.Get(ctx, key); err == nil {
+			return gr
+		}
+	}
+}
+
 func generateUniqueKeys(maxstrlen uint, keynos int) []string {
 func generateUniqueKeys(maxstrlen uint, keynos int) []string {
 	keyMap := make(map[string]bool)
 	keyMap := make(map[string]bool)
 	keys := make([]string, 0)
 	keys := make([]string, 0)