瀏覽代碼

Merge pull request #5101 from gyuho/watch_bench_fix

benchmark: ensure all watcher receivers to finish
Gyu-Ho Lee 9 年之前
父節點
當前提交
d4dae7e9e9
共有 1 個文件被更改,包括 3 次插入7 次删除
  1. 3 7
      tools/benchmark/cmd/watch.go

+ 3 - 7
tools/benchmark/cmd/watch.go

@@ -64,7 +64,6 @@ var (
 	nrWatchCompleted       int32
 	nrWatchCompleted       int32
 	nrRecvCompleted        int32
 	nrRecvCompleted        int32
 	watchCompletedNotifier chan struct{}
 	watchCompletedNotifier chan struct{}
-	putStartNotifier       chan struct{}
 	recvCompletedNotifier  chan struct{}
 	recvCompletedNotifier  chan struct{}
 )
 )
 
 
@@ -108,8 +107,6 @@ func watchFunc(cmd *cobra.Command, args []string) {
 		streams[i] = v3.NewWatcher(clients[i%len(clients)])
 		streams[i] = v3.NewWatcher(clients[i%len(clients)])
 	}
 	}
 
 
-	putStartNotifier = make(chan struct{})
-
 	// watching phase
 	// watching phase
 	results = make(chan result)
 	results = make(chan result)
 	bar = pb.New(watchTotal)
 	bar = pb.New(watchTotal)
@@ -150,7 +147,6 @@ func watchFunc(cmd *cobra.Command, args []string) {
 
 
 	atomic.StoreInt32(&nrRecvCompleted, 0)
 	atomic.StoreInt32(&nrRecvCompleted, 0)
 	recvCompletedNotifier = make(chan struct{})
 	recvCompletedNotifier = make(chan struct{})
-	close(putStartNotifier)
 
 
 	putreqc := make(chan v3.Op)
 	putreqc := make(chan v3.Op)
 
 
@@ -169,7 +165,9 @@ func watchFunc(cmd *cobra.Command, args []string) {
 		close(putreqc)
 		close(putreqc)
 	}()
 	}()
 
 
-	<-recvCompletedNotifier
+	for range streams {
+		<-recvCompletedNotifier
+	}
 	bar.Finish()
 	bar.Finish()
 	fmt.Printf("Watch events received summary:\n")
 	fmt.Printf("Watch events received summary:\n")
 	close(results)
 	close(results)
@@ -192,8 +190,6 @@ func doWatch(stream v3.Watcher, requests <-chan string) {
 	if atomic.LoadInt32(&nrWatchCompleted) == int32(watchTotalStreams) {
 	if atomic.LoadInt32(&nrWatchCompleted) == int32(watchTotalStreams) {
 		watchCompletedNotifier <- struct{}{}
 		watchCompletedNotifier <- struct{}{}
 	}
 	}
-
-	<-putStartNotifier
 }
 }
 
 
 func recvWatchChan(wch v3.WatchChan) {
 func recvWatchChan(wch v3.WatchChan) {