Browse Source

pkg/report, tools/benchmark: refactor report out of tools/benchmark

Only tracks time series when requested. Can configure output precision.
Anthony Romano 9 years ago
parent
commit
3d28faa3eb

+ 16 - 0
pkg/report/doc.go

@@ -0,0 +1,16 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package report generates human-readable benchmark reports.
+package report

+ 219 - 0
pkg/report/report.go

@@ -0,0 +1,219 @@
+// Copyright 2014 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// the file is borrowed from github.com/rakyll/boom/boomer/print.go
+
+package report
+
+import (
+	"fmt"
+	"math"
+	"sort"
+	"strings"
+	"time"
+)
+
+const (
+	barChar = "∎"
+)
+
+// Result describes the timings for an operation.
+type Result struct {
+	Start time.Time
+	End   time.Time
+	Err   error
+}
+
+func (res *Result) Duration() time.Duration { return res.End.Sub(res.Start) }
+
+type report struct {
+	results   chan Result
+	precision string
+
+	avgTotal float64
+	fastest  float64
+	slowest  float64
+	average  float64
+	stddev   float64
+	rps      float64
+	total    time.Duration
+
+	errorDist map[string]int
+	lats      []float64
+
+	sps *secondPoints
+}
+
+// Report processes a result stream until it is closed, then produces a
+// string with information about the consumed result data.
+type Report interface {
+	Results() chan<- Result
+	Run() <-chan string
+	String() string
+}
+
+func NewReport(precision string) Report {
+	return &report{
+		results:   make(chan Result, 16),
+		precision: precision,
+		errorDist: make(map[string]int),
+	}
+}
+
+func NewReportSample(precision string) Report {
+	r := NewReport(precision).(*report)
+	r.sps = newSecondPoints()
+	return r
+}
+
+func (r *report) Results() chan<- Result { return r.results }
+
+func (r *report) Run() <-chan string {
+	donec := make(chan string, 1)
+	go func() {
+		defer close(donec)
+		r.processResults()
+		donec <- r.String()
+	}()
+	return donec
+}
+
+func (r *report) String() (s string) {
+	if len(r.lats) > 0 {
+		s += fmt.Sprintf("\nSummary:\n")
+		s += fmt.Sprintf("  Total:\t%s.\n", r.sec2str(r.total.Seconds()))
+		s += fmt.Sprintf("  Slowest:\t%s.\n", r.sec2str(r.slowest))
+		s += fmt.Sprintf("  Fastest:\t%s.\n", r.sec2str(r.fastest))
+		s += fmt.Sprintf("  Average:\t%s.\n", r.sec2str(r.average))
+		s += fmt.Sprintf("  Stddev:\t%s.\n", r.sec2str(r.stddev))
+		s += fmt.Sprintf("  Requests/sec:\t"+r.precision+"\n", r.rps)
+		s += r.histogram()
+		s += r.latencies()
+		if r.sps != nil {
+			s += fmt.Sprintf("%v\n", r.sps.getTimeSeries())
+		}
+	}
+	if len(r.errorDist) > 0 {
+		s += r.errors()
+	}
+	return s
+}
+
+func (r *report) sec2str(sec float64) string { return fmt.Sprintf(r.precision+" secs", sec) }
+
+type reportRate struct{ *report }
+
+func NewReportRate(precision string) Report {
+	return &reportRate{NewReport(precision).(*report)}
+}
+
+func (r *reportRate) String() string {
+	return fmt.Sprintf(" Requests/sec:\t"+r.precision+"\n", r.rps)
+}
+
+func (r *report) processResult(res *Result) {
+	if res.Err != nil {
+		r.errorDist[res.Err.Error()]++
+		return
+	}
+	dur := res.Duration()
+	r.lats = append(r.lats, dur.Seconds())
+	r.avgTotal += dur.Seconds()
+	if r.sps != nil {
+		r.sps.Add(res.Start, dur)
+	}
+}
+
+func (r *report) processResults() {
+	st := time.Now()
+	for res := range r.results {
+		r.processResult(&res)
+	}
+	r.total = time.Since(st)
+
+	r.rps = float64(len(r.lats)) / r.total.Seconds()
+	r.average = r.avgTotal / float64(len(r.lats))
+	for i := range r.lats {
+		dev := r.lats[i] - r.average
+		r.stddev += dev * dev
+	}
+	r.stddev = math.Sqrt(r.stddev / float64(len(r.lats)))
+	sort.Float64s(r.lats)
+	if len(r.lats) > 0 {
+		r.fastest = r.lats[0]
+		r.slowest = r.lats[len(r.lats)-1]
+	}
+}
+
+func (r *report) latencies() string {
+	pctls := []int{10, 25, 50, 75, 90, 95, 99}
+	data := make([]float64, len(pctls))
+	j := 0
+	for i := 0; i < len(r.lats) && j < len(pctls); i++ {
+		current := i * 100 / len(r.lats)
+		if current >= pctls[j] {
+			data[j] = r.lats[i]
+			j++
+		}
+	}
+	s := fmt.Sprintf("\nLatency distribution:\n")
+	for i := 0; i < len(pctls); i++ {
+		if data[i] > 0 {
+			s += fmt.Sprintf("  %v%% in %s.\n", pctls[i], r.sec2str(data[i]))
+		}
+	}
+	return s
+}
+
+func (r *report) histogram() string {
+	bc := 10
+	buckets := make([]float64, bc+1)
+	counts := make([]int, bc+1)
+	bs := (r.slowest - r.fastest) / float64(bc)
+	for i := 0; i < bc; i++ {
+		buckets[i] = r.fastest + bs*float64(i)
+	}
+	buckets[bc] = r.slowest
+	var bi int
+	var max int
+	for i := 0; i < len(r.lats); {
+		if r.lats[i] <= buckets[bi] {
+			i++
+			counts[bi]++
+			if max < counts[bi] {
+				max = counts[bi]
+			}
+		} else if bi < len(buckets)-1 {
+			bi++
+		}
+	}
+	s := fmt.Sprintf("\nResponse time histogram:\n")
+	for i := 0; i < len(buckets); i++ {
+		// Normalize bar lengths.
+		var barLen int
+		if max > 0 {
+			barLen = counts[i] * 40 / max
+		}
+		s += fmt.Sprintf("  "+r.precision+" [%v]\t|%v\n", buckets[i], counts[i], strings.Repeat(barChar, barLen))
+	}
+	return s
+}
+
+func (r *report) errors() string {
+	s := fmt.Sprintf("\nError distribution:\n")
+	for err, num := range r.errorDist {
+		s += fmt.Sprintf("  [%d]\t%s\n", num, err)
+	}
+	return s
+}

+ 1 - 1
tools/benchmark/cmd/timeseries.go → pkg/report/timeseries.go

@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package cmd
+package report
 
 import (
 	"bytes"

+ 1 - 1
tools/benchmark/cmd/timeseries_test.go → pkg/report/timeseries_test.go

@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package cmd
+package report
 
 import (
 	"testing"

+ 21 - 32
tools/benchmark/cmd/lease.go

@@ -15,9 +15,12 @@
 package cmd
 
 import (
+	"fmt"
 	"time"
 
 	v3 "github.com/coreos/etcd/clientv3"
+	"github.com/coreos/etcd/pkg/report"
+
 	"github.com/spf13/cobra"
 	"golang.org/x/net/context"
 	"gopkg.in/cheggaaa/pb.v1"
@@ -40,53 +43,39 @@ func init() {
 }
 
 func leaseKeepaliveFunc(cmd *cobra.Command, args []string) {
-	results = make(chan result)
 	requests := make(chan struct{})
-	bar = pb.New(leaseKeepaliveTotal)
-
 	clients := mustCreateClients(totalClients, totalConns)
 
+	bar = pb.New(leaseKeepaliveTotal)
 	bar.Format("Bom !")
 	bar.Start()
 
+	r := newReport()
 	for i := range clients {
 		wg.Add(1)
-		go doLeaseKeepalive(context.Background(), clients[i].Lease, requests)
+		go func(c v3.Lease) {
+			defer wg.Done()
+			resp, err := c.Grant(context.Background(), 100)
+			if err != nil {
+				panic(err)
+			}
+			for _ = range requests {
+				st := time.Now()
+				_, err := c.KeepAliveOnce(context.TODO(), resp.ID)
+				r.Results() <- report.Result{Err: err, Start: st, End: time.Now()}
+				bar.Increment()
+			}
+		}(clients[i])
 	}
 
-	pdoneC := printReport(results)
-
 	for i := 0; i < leaseKeepaliveTotal; i++ {
 		requests <- struct{}{}
 	}
 	close(requests)
 
+	rc := r.Run()
 	wg.Wait()
-
+	close(r.Results())
 	bar.Finish()
-
-	close(results)
-	<-pdoneC
-}
-
-func doLeaseKeepalive(ctx context.Context, client v3.Lease, requests <-chan struct{}) {
-	defer wg.Done()
-
-	resp, err := client.Grant(ctx, 100)
-	if err != nil {
-		panic(err)
-	}
-
-	for _ = range requests {
-		st := time.Now()
-
-		_, err := client.KeepAliveOnce(ctx, resp.ID)
-
-		var errStr string
-		if err != nil {
-			errStr = err.Error()
-		}
-		results <- result{errStr: errStr, duration: time.Since(st), happened: time.Now()}
-		bar.Increment()
-	}
+	fmt.Printf("%s", <-rc)
 }

+ 9 - 33
tools/benchmark/cmd/mvcc-put.go

@@ -22,6 +22,8 @@ import (
 	"time"
 
 	"github.com/coreos/etcd/lease"
+	"github.com/coreos/etcd/pkg/report"
+
 	"github.com/spf13/cobra"
 )
 
@@ -100,14 +102,12 @@ func mvccPutFunc(cmd *cobra.Command, args []string) {
 	keys := createBytesSlice(storageKeySize, totalNrKeys)
 	vals := createBytesSlice(valueSize, totalNrKeys)
 
-	latencies := make([]time.Duration, totalNrKeys)
-
-	minLat := time.Duration(1<<63 - 1)
-	maxLat := time.Duration(0)
+	r := newReport()
+	rrc := r.Results()
 
+	rc := r.Run()
 	for i := 0; i < totalNrKeys; i++ {
-		begin := time.Now()
-
+		st := time.Now()
 		if txn {
 			id := s.TxnBegin()
 			if _, err := s.TxnPut(id, keys[i], vals[i], lease.NoLease); err != nil {
@@ -118,33 +118,9 @@ func mvccPutFunc(cmd *cobra.Command, args []string) {
 		} else {
 			s.Put(keys[i], vals[i], lease.NoLease)
 		}
-
-		end := time.Now()
-
-		lat := end.Sub(begin)
-		latencies[i] = lat
-		if maxLat < lat {
-			maxLat = lat
-		}
-		if lat < minLat {
-			minLat = lat
-		}
+		rrc <- report.Result{Start: st, End: time.Now()}
 	}
 
-	total := time.Duration(0)
-
-	for _, lat := range latencies {
-		total += lat
-	}
-
-	fmt.Printf("total: %v\n", total)
-	fmt.Printf("average: %v\n", total/time.Duration(totalNrKeys))
-	fmt.Printf("rate: %4.4f\n", float64(totalNrKeys)/total.Seconds())
-	fmt.Printf("minimum latency: %v\n", minLat)
-	fmt.Printf("maximum latency: %v\n", maxLat)
-
-	// TODO: Currently this benchmark doesn't use the common histogram infrastructure.
-	// This is because an accuracy of the infrastructure isn't suitable for measuring
-	// performance of kv storage:
-	// https://github.com/coreos/etcd/pull/4070#issuecomment-167954149
+	close(r.Results())
+	fmt.Printf("%s", <-rc)
 }

+ 17 - 28
tools/benchmark/cmd/put.go

@@ -22,6 +22,8 @@ import (
 	"time"
 
 	v3 "github.com/coreos/etcd/clientv3"
+	"github.com/coreos/etcd/pkg/report"
+
 	"github.com/spf13/cobra"
 	"golang.org/x/net/context"
 	"gopkg.in/cheggaaa/pb.v1"
@@ -65,24 +67,28 @@ func putFunc(cmd *cobra.Command, args []string) {
 		os.Exit(1)
 	}
 
-	results = make(chan result)
 	requests := make(chan v3.Op, totalClients)
-	bar = pb.New(putTotal)
-
-	k, v := make([]byte, keySize), string(mustRandBytes(valSize))
-
 	clients := mustCreateClients(totalClients, totalConns)
+	k, v := make([]byte, keySize), string(mustRandBytes(valSize))
 
+	bar = pb.New(putTotal)
 	bar.Format("Bom !")
 	bar.Start()
 
+	r := newReport()
 	for i := range clients {
 		wg.Add(1)
-		go doPut(context.Background(), clients[i], requests)
+		go func(c *v3.Client) {
+			defer wg.Done()
+			for op := range requests {
+				st := time.Now()
+				_, err := c.Do(context.Background(), op)
+				r.Results() <- report.Result{Err: err, Start: st, End: time.Now()}
+				bar.Increment()
+			}
+		}(clients[i])
 	}
 
-	pdoneC := printReport(results)
-
 	go func() {
 		for i := 0; i < putTotal; i++ {
 			if seqKeys {
@@ -104,28 +110,11 @@ func putFunc(cmd *cobra.Command, args []string) {
 		}()
 	}
 
+	rc := r.Run()
 	wg.Wait()
-
+	close(r.Results())
 	bar.Finish()
-
-	close(results)
-	<-pdoneC
-}
-
-func doPut(ctx context.Context, client v3.KV, requests <-chan v3.Op) {
-	defer wg.Done()
-
-	for op := range requests {
-		st := time.Now()
-		_, err := client.Do(ctx, op)
-
-		var errStr string
-		if err != nil {
-			errStr = err.Error()
-		}
-		results <- result{errStr: errStr, duration: time.Since(st), happened: time.Now()}
-		bar.Increment()
-	}
+	fmt.Println(<-rc)
 }
 
 func compactKV(clients []*v3.Client) {

+ 16 - 26
tools/benchmark/cmd/range.go

@@ -20,6 +20,8 @@ import (
 	"time"
 
 	v3 "github.com/coreos/etcd/clientv3"
+	"github.com/coreos/etcd/pkg/report"
+
 	"github.com/spf13/cobra"
 	"golang.org/x/net/context"
 	"gopkg.in/cheggaaa/pb.v1"
@@ -65,22 +67,27 @@ func rangeFunc(cmd *cobra.Command, args []string) {
 		os.Exit(1)
 	}
 
-	results = make(chan result)
 	requests := make(chan v3.Op, totalClients)
-	bar = pb.New(rangeTotal)
-
 	clients := mustCreateClients(totalClients, totalConns)
 
+	bar = pb.New(rangeTotal)
 	bar.Format("Bom !")
 	bar.Start()
 
+	r := newReport()
 	for i := range clients {
 		wg.Add(1)
-		go doRange(clients[i].KV, requests)
+		go func(c *v3.Client) {
+			defer wg.Done()
+			for op := range requests {
+				st := time.Now()
+				_, err := c.Do(context.Background(), op)
+				r.Results() <- report.Result{Err: err, Start: st, End: time.Now()}
+				bar.Increment()
+			}
+		}(clients[i])
 	}
 
-	pdoneC := printReport(results)
-
 	go func() {
 		for i := 0; i < rangeTotal; i++ {
 			opts := []v3.OpOption{v3.WithRange(end)}
@@ -93,26 +100,9 @@ func rangeFunc(cmd *cobra.Command, args []string) {
 		close(requests)
 	}()
 
+	rc := r.Run()
 	wg.Wait()
-
+	close(r.Results())
 	bar.Finish()
-
-	close(results)
-	<-pdoneC
-}
-
-func doRange(client v3.KV, requests <-chan v3.Op) {
-	defer wg.Done()
-
-	for op := range requests {
-		st := time.Now()
-		_, err := client.Do(context.Background(), op)
-
-		var errStr string
-		if err != nil {
-			errStr = err.Error()
-		}
-		results <- result{errStr: errStr, duration: time.Since(st), happened: time.Now()}
-		bar.Increment()
-	}
+	fmt.Printf("%s", <-rc)
 }

+ 0 - 196
tools/benchmark/cmd/report.go

@@ -1,196 +0,0 @@
-// Copyright 2014 The etcd Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-// the file is borrowed from github.com/rakyll/boom/boomer/print.go
-
-package cmd
-
-import (
-	"fmt"
-	"math"
-	"sort"
-	"strings"
-	"time"
-)
-
-const (
-	barChar = "∎"
-)
-
-type result struct {
-	errStr   string
-	duration time.Duration
-	happened time.Time
-}
-
-type report struct {
-	avgTotal float64
-	fastest  float64
-	slowest  float64
-	average  float64
-	stddev   float64
-	rps      float64
-
-	results chan result
-	total   time.Duration
-
-	errorDist map[string]int
-	lats      []float64
-
-	sps *secondPoints
-}
-
-func printReport(results chan result) <-chan struct{} {
-	return wrapReport(func() {
-		r := &report{
-			results:   results,
-			errorDist: make(map[string]int),
-			sps:       newSecondPoints(),
-		}
-		r.finalize()
-		r.print()
-	})
-}
-
-func printRate(results chan result) <-chan struct{} {
-	return wrapReport(func() {
-		r := &report{
-			results:   results,
-			errorDist: make(map[string]int),
-			sps:       newSecondPoints(),
-		}
-		r.finalize()
-		fmt.Printf(" Requests/sec:\t%4.4f\n", r.rps)
-	})
-}
-
-func wrapReport(f func()) <-chan struct{} {
-	donec := make(chan struct{})
-	go func() {
-		defer close(donec)
-		f()
-	}()
-	return donec
-}
-
-func (r *report) finalize() {
-	st := time.Now()
-	for res := range r.results {
-		if res.errStr != "" {
-			r.errorDist[res.errStr]++
-		} else {
-			r.sps.Add(res.happened, res.duration)
-			r.lats = append(r.lats, res.duration.Seconds())
-			r.avgTotal += res.duration.Seconds()
-		}
-	}
-	r.total = time.Since(st)
-
-	r.rps = float64(len(r.lats)) / r.total.Seconds()
-	r.average = r.avgTotal / float64(len(r.lats))
-	for i := range r.lats {
-		dev := r.lats[i] - r.average
-		r.stddev += dev * dev
-	}
-	r.stddev = math.Sqrt(r.stddev / float64(len(r.lats)))
-}
-
-func (r *report) print() {
-	sort.Float64s(r.lats)
-
-	if len(r.lats) > 0 {
-		r.fastest = r.lats[0]
-		r.slowest = r.lats[len(r.lats)-1]
-		fmt.Printf("\nSummary:\n")
-		fmt.Printf("  Total:\t%4.4f secs.\n", r.total.Seconds())
-		fmt.Printf("  Slowest:\t%4.4f secs.\n", r.slowest)
-		fmt.Printf("  Fastest:\t%4.4f secs.\n", r.fastest)
-		fmt.Printf("  Average:\t%4.4f secs.\n", r.average)
-		fmt.Printf("  Stddev:\t%4.4f secs.\n", r.stddev)
-		fmt.Printf("  Requests/sec:\t%4.4f\n", r.rps)
-		r.printHistogram()
-		r.printLatencies()
-		if sample {
-			r.printSecondSample()
-		}
-	}
-
-	if len(r.errorDist) > 0 {
-		r.printErrors()
-	}
-}
-
-// Prints percentile latencies.
-func (r *report) printLatencies() {
-	pctls := []int{10, 25, 50, 75, 90, 95, 99}
-	data := make([]float64, len(pctls))
-	j := 0
-	for i := 0; i < len(r.lats) && j < len(pctls); i++ {
-		current := i * 100 / len(r.lats)
-		if current >= pctls[j] {
-			data[j] = r.lats[i]
-			j++
-		}
-	}
-	fmt.Printf("\nLatency distribution:\n")
-	for i := 0; i < len(pctls); i++ {
-		if data[i] > 0 {
-			fmt.Printf("  %v%% in %4.4f secs.\n", pctls[i], data[i])
-		}
-	}
-}
-
-func (r *report) printSecondSample() {
-	fmt.Println(r.sps.getTimeSeries())
-}
-
-func (r *report) printHistogram() {
-	bc := 10
-	buckets := make([]float64, bc+1)
-	counts := make([]int, bc+1)
-	bs := (r.slowest - r.fastest) / float64(bc)
-	for i := 0; i < bc; i++ {
-		buckets[i] = r.fastest + bs*float64(i)
-	}
-	buckets[bc] = r.slowest
-	var bi int
-	var max int
-	for i := 0; i < len(r.lats); {
-		if r.lats[i] <= buckets[bi] {
-			i++
-			counts[bi]++
-			if max < counts[bi] {
-				max = counts[bi]
-			}
-		} else if bi < len(buckets)-1 {
-			bi++
-		}
-	}
-	fmt.Printf("\nResponse time histogram:\n")
-	for i := 0; i < len(buckets); i++ {
-		// Normalize bar lengths.
-		var barLen int
-		if max > 0 {
-			barLen = counts[i] * 40 / max
-		}
-		fmt.Printf("  %4.3f [%v]\t|%v\n", buckets[i], counts[i], strings.Repeat(barChar, barLen))
-	}
-}
-
-func (r *report) printErrors() {
-	fmt.Printf("\nError distribution:\n")
-	for err, num := range r.errorDist {
-		fmt.Printf("  [%d]\t%s\n", num, err)
-	}
-}

+ 3 - 3
tools/benchmark/cmd/root.go

@@ -18,6 +18,7 @@ import (
 	"sync"
 
 	"github.com/coreos/etcd/pkg/transport"
+
 	"github.com/spf13/cobra"
 	"gopkg.in/cheggaaa/pb.v1"
 )
@@ -38,9 +39,8 @@ var (
 	totalClients uint
 	sample       bool
 
-	bar     *pb.ProgressBar
-	results chan result
-	wg      sync.WaitGroup
+	bar *pb.ProgressBar
+	wg  sync.WaitGroup
 
 	tls transport.TLSInfo
 

+ 10 - 16
tools/benchmark/cmd/stm.go

@@ -23,6 +23,8 @@ import (
 
 	v3 "github.com/coreos/etcd/clientv3"
 	v3sync "github.com/coreos/etcd/clientv3/concurrency"
+	"github.com/coreos/etcd/pkg/report"
+
 	"github.com/spf13/cobra"
 	"golang.org/x/net/context"
 	"gopkg.in/cheggaaa/pb.v1"
@@ -89,22 +91,19 @@ func stmFunc(cmd *cobra.Command, args []string) {
 		os.Exit(1)
 	}
 
-	results = make(chan result)
 	requests := make(chan stmApply, totalClients)
-	bar = pb.New(stmTotal)
-
 	clients := mustCreateClients(totalClients, totalConns)
 
+	bar = pb.New(stmTotal)
 	bar.Format("Bom !")
 	bar.Start()
 
+	r := newReport()
 	for i := range clients {
 		wg.Add(1)
-		go doSTM(context.Background(), clients[i], requests)
+		go doSTM(clients[i], requests, r.Results())
 	}
 
-	pdoneC := printReport(results)
-
 	go func() {
 		for i := 0; i < stmTotal; i++ {
 			kset := make(map[string]struct{})
@@ -132,15 +131,14 @@ func stmFunc(cmd *cobra.Command, args []string) {
 		close(requests)
 	}()
 
+	rc := r.Run()
 	wg.Wait()
-
+	close(r.Results())
 	bar.Finish()
-
-	close(results)
-	<-pdoneC
+	fmt.Printf("%s", <-rc)
 }
 
-func doSTM(ctx context.Context, client *v3.Client, requests <-chan stmApply) {
+func doSTM(client *v3.Client, requests <-chan stmApply, results chan<- report.Result) {
 	defer wg.Done()
 
 	var m *v3sync.Mutex
@@ -161,11 +159,7 @@ func doSTM(ctx context.Context, client *v3.Client, requests <-chan stmApply) {
 		if m != nil {
 			m.Unlock(context.TODO())
 		}
-		var errStr string
-		if err != nil {
-			errStr = err.Error()
-		}
-		results <- result{errStr: errStr, duration: time.Since(st), happened: time.Now()}
+		results <- report.Result{Err: err, Start: st, End: time.Now()}
 		bar.Increment()
 	}
 }

+ 8 - 0
tools/benchmark/cmd/util.go

@@ -21,6 +21,7 @@ import (
 	"strings"
 
 	"github.com/coreos/etcd/clientv3"
+	"github.com/coreos/etcd/pkg/report"
 )
 
 var (
@@ -83,3 +84,10 @@ func mustRandBytes(n int) []byte {
 	}
 	return rb
 }
+
+func newReport() report.Report {
+	if sample {
+		return report.NewReportSample("%4.4f")
+	}
+	return report.NewReport("%4.4f")
+}

+ 25 - 37
tools/benchmark/cmd/watch.go

@@ -23,6 +23,7 @@ import (
 	"time"
 
 	v3 "github.com/coreos/etcd/clientv3"
+	"github.com/coreos/etcd/pkg/report"
 
 	"github.com/spf13/cobra"
 	"golang.org/x/net/context"
@@ -110,18 +111,16 @@ func watchFunc(cmd *cobra.Command, args []string) {
 	}
 
 	// watching phase
-	results = make(chan result)
 	bar = pb.New(watchTotal)
-
 	bar.Format("Bom !")
 	bar.Start()
 
-	pdoneC := printRate(results)
-
 	atomic.StoreInt32(&nrWatchCompleted, int32(0))
 	watchCompletedNotifier = make(chan struct{})
+
+	r := report.NewReportRate("%4.4f")
 	for i := range streams {
-		go doWatch(streams[i], requests)
+		go doWatch(streams[i], requests, r.Results())
 	}
 
 	go func() {
@@ -133,35 +132,38 @@ func watchFunc(cmd *cobra.Command, args []string) {
 		close(requests)
 	}()
 
+	rc := r.Run()
 	<-watchCompletedNotifier
 	bar.Finish()
-
-	fmt.Printf("Watch creation summary:\n")
-	close(results)
-	<-pdoneC
+	close(r.Results())
+	fmt.Printf("Watch creation summary:\n%s", <-rc)
 
 	// put phase
 	eventsTotal = 0
 	for i := 0; i < watchPutTotal; i++ {
 		eventsTotal += numWatchers[watched[i%len(watched)]]
 	}
-	results = make(chan result)
-	bar = pb.New(eventsTotal)
 
+	bar = pb.New(eventsTotal)
 	bar.Format("Bom !")
 	bar.Start()
 
 	atomic.StoreInt32(&nrRecvCompleted, 0)
 	recvCompletedNotifier = make(chan struct{})
-
 	putreqc := make(chan v3.Op)
 
+	r = report.NewReportRate("%4.4f")
 	for i := 0; i < watchPutTotal; i++ {
-		go doPutForWatch(context.TODO(), clients[i%len(clients)].KV, putreqc)
+		go func(c *v3.Client) {
+			for op := range putreqc {
+				if _, err := c.Do(context.TODO(), op); err != nil {
+					fmt.Fprintf(os.Stderr, "failed to Put for watch benchmark: %v\n", err)
+					os.Exit(1)
+				}
+			}
+		}(clients[i%len(clients)])
 	}
 
-	pdoneC = printRate(results)
-
 	go func() {
 		for i := 0; i < watchPutTotal; i++ {
 			putreqc <- v3.OpPut(watched[i%(len(watched))], "data")
@@ -171,24 +173,20 @@ func watchFunc(cmd *cobra.Command, args []string) {
 		close(putreqc)
 	}()
 
+	rc = r.Run()
 	<-recvCompletedNotifier
 	bar.Finish()
-	fmt.Printf("Watch events received summary:\n")
-	close(results)
-	<-pdoneC
+	close(r.Results())
+	fmt.Printf("Watch events received summary:\n%s", <-rc)
 }
 
-func doWatch(stream v3.Watcher, requests <-chan string) {
+func doWatch(stream v3.Watcher, requests <-chan string, results chan<- report.Result) {
 	for r := range requests {
 		st := time.Now()
 		wch := stream.Watch(context.TODO(), r)
-		var errStr string
-		if wch == nil {
-			errStr = "could not open watch channel"
-		}
-		results <- result{errStr: errStr, duration: time.Since(st), happened: time.Now()}
+		results <- report.Result{Start: st, End: time.Now()}
 		bar.Increment()
-		go recvWatchChan(wch)
+		go recvWatchChan(wch, results)
 	}
 	atomic.AddInt32(&nrWatchCompleted, 1)
 	if atomic.LoadInt32(&nrWatchCompleted) == int32(watchTotalStreams) {
@@ -196,11 +194,11 @@ func doWatch(stream v3.Watcher, requests <-chan string) {
 	}
 }
 
-func recvWatchChan(wch v3.WatchChan) {
+func recvWatchChan(wch v3.WatchChan, results chan<- report.Result) {
 	for r := range wch {
 		st := time.Now()
 		for range r.Events {
-			results <- result{duration: time.Since(st), happened: time.Now()}
+			results <- report.Result{Start: st, End: time.Now()}
 			bar.Increment()
 			atomic.AddInt32(&nrRecvCompleted, 1)
 		}
@@ -211,13 +209,3 @@ func recvWatchChan(wch v3.WatchChan) {
 		}
 	}
 }
-
-func doPutForWatch(ctx context.Context, client v3.KV, requests <-chan v3.Op) {
-	for op := range requests {
-		_, err := client.Do(ctx, op)
-		if err != nil {
-			fmt.Fprintf(os.Stderr, "failed to Put for watch benchmark: %v\n", err)
-			os.Exit(1)
-		}
-	}
-}

+ 18 - 26
tools/benchmark/cmd/watch_get.go

@@ -20,6 +20,7 @@ import (
 	"time"
 
 	v3 "github.com/coreos/etcd/clientv3"
+	"github.com/coreos/etcd/pkg/report"
 
 	"github.com/spf13/cobra"
 	"golang.org/x/net/context"
@@ -70,47 +71,39 @@ func watchGetFunc(cmd *cobra.Command, args []string) {
 		streams[i] = v3.NewWatcher(clients[i%len(clients)])
 	}
 
-	// results from trying to do serialized gets with concurrent watchers
-	results = make(chan result)
-
 	bar = pb.New(watchGetTotalWatchers * watchEvents)
 	bar.Format("Bom !")
 	bar.Start()
 
-	pdoneC := printReport(results)
-	wg.Add(watchGetTotalWatchers)
+	// report from trying to do serialized gets with concurrent watchers
+	r := newReport()
 	ctx, cancel := context.WithCancel(context.TODO())
 	f := func() {
-		doSerializedGet(ctx, getClient[0], results)
+		defer close(r.Results())
+		for {
+			st := time.Now()
+			_, err := getClient[0].Get(ctx, "abc", v3.WithSerializable())
+			if ctx.Err() != nil {
+				break
+			}
+			r.Results() <- report.Result{Err: err, Start: st, End: time.Now()}
+		}
 	}
+
+	wg.Add(watchGetTotalWatchers)
 	for i := 0; i < watchGetTotalWatchers; i++ {
 		go doUnsyncWatch(streams[i%len(streams)], watchRev, f)
 	}
+
+	rc := r.Run()
 	wg.Wait()
 	cancel()
 	bar.Finish()
-	fmt.Printf("Get during watch summary:\n")
-	<-pdoneC
-}
-
-func doSerializedGet(ctx context.Context, client *v3.Client, results chan result) {
-	for {
-		st := time.Now()
-		_, err := client.Get(ctx, "abc", v3.WithSerializable())
-		if ctx.Err() != nil {
-			break
-		}
-		var errStr string
-		if err != nil {
-			errStr = err.Error()
-		}
-		res := result{errStr: errStr, duration: time.Since(st), happened: time.Now()}
-		results <- res
-	}
-	close(results)
+	fmt.Printf("Get during watch summary:\n%s", <-rc)
 }
 
 func doUnsyncWatch(stream v3.Watcher, rev int64, f func()) {
+	defer wg.Done()
 	wch := stream.Watch(context.TODO(), "watchkey", v3.WithRev(rev))
 	if wch == nil {
 		panic("could not open watch channel")
@@ -122,5 +115,4 @@ func doUnsyncWatch(stream v3.Watcher, rev int64, f func()) {
 		i += len(wev.Events)
 		bar.Add(len(wev.Events))
 	}
-	wg.Done()
 }