Browse Source

Merge pull request #5392 from gyuho/watch_bench

benchmark: fix watch command
Gyu-Ho Lee 9 years ago
parent
commit
61a7d3efb3
1 changed files with 12 additions and 11 deletions
  1. 12 11
      tools/benchmark/cmd/watch.go

+ 12 - 11
tools/benchmark/cmd/watch.go

@@ -88,6 +88,7 @@ func watchFunc(cmd *cobra.Command, args []string) {
 	}
 
 	watched := make([]string, watchedKeyTotal)
+	numWatchers := make(map[string]int)
 	for i := range watched {
 		k := make([]byte, watchKeySize)
 		if watchSeqKeys {
@@ -96,6 +97,7 @@ func watchFunc(cmd *cobra.Command, args []string) {
 			binary.PutVarint(k, int64(rand.Intn(watchKeySpaceSize)))
 		}
 		watched[i] = string(k)
+		numWatchers[watched[i]] = numWatchers[watched[i]] + 1
 	}
 
 	requests := make(chan string, totalClients)
@@ -137,8 +139,10 @@ func watchFunc(cmd *cobra.Command, args []string) {
 	<-pdoneC
 
 	// put phase
-	// total number of puts * number of watchers on each key
-	eventsTotal = watchPutTotal * (watchTotal / watchedKeyTotal)
+	eventsTotal = 0
+	for i := 0; i < watchPutTotal; i++ {
+		eventsTotal += numWatchers[watched[i%len(watched)]]
+	}
 	results = make(chan result)
 	bar = pb.New(eventsTotal)
 
@@ -157,7 +161,7 @@ func watchFunc(cmd *cobra.Command, args []string) {
 	pdoneC = printRate(results)
 
 	go func() {
-		for i := 0; i < eventsTotal; i++ {
+		for i := 0; i < watchPutTotal; i++ {
 			putreqc <- v3.OpPut(watched[i%(len(watched))], "data")
 			// TODO: use a real rate-limiter instead of sleep.
 			time.Sleep(time.Second / time.Duration(watchPutRate))
@@ -165,9 +169,7 @@ func watchFunc(cmd *cobra.Command, args []string) {
 		close(putreqc)
 	}()
 
-	for range streams {
-		<-recvCompletedNotifier
-	}
+	<-recvCompletedNotifier
 	bar.Finish()
 	fmt.Printf("Watch events received summary:\n")
 	close(results)
@@ -194,16 +196,15 @@ func doWatch(stream v3.Watcher, requests <-chan string) {
 
 func recvWatchChan(wch v3.WatchChan) {
 	for range wch {
-		if atomic.LoadInt32(&nrRecvCompleted) == int32(eventsTotal) {
-			recvCompletedNotifier <- struct{}{}
-			break
-		}
-
 		st := time.Now()
 		results <- result{duration: time.Since(st), happened: time.Now()}
 		bar.Increment()
 
 		atomic.AddInt32(&nrRecvCompleted, 1)
+		if atomic.LoadInt32(&nrRecvCompleted) == int32(eventsTotal) {
+			recvCompletedNotifier <- struct{}{}
+			break
+		}
 	}
 }