Procházet zdrojové kódy

Merge pull request #4763 from gyuho/real_latency

benchmark: printSecondSample with time series
Gyu-Ho Lee před 9 roky
rodič
revize
323687e1f1

+ 4 - 1
tools/benchmark/cmd/put.go

@@ -46,6 +46,8 @@ var (
 
 	compactInterval   time.Duration
 	compactIndexDelta int64
+
+	sample bool
 )
 
 func init() {
@@ -57,6 +59,7 @@ func init() {
 	putCmd.Flags().BoolVar(&seqKeys, "sequential-keys", false, "Use sequential keys")
 	putCmd.Flags().DurationVar(&compactInterval, "compact-interval", 0, `Interval to compact database (do not duplicate this with etcd's 'auto-compaction-retention' flag) (e.g. --compact-interval=5m compacts every 5-minute)`)
 	putCmd.Flags().Int64Var(&compactIndexDelta, "compact-index-delta", 1000, "Delta between current revision and compact revision (e.g. current revision 10000, compact at 9000)")
+	putCmd.Flags().BoolVar(&sample, "sample", false, "'true' to sample requests for every second")
 }
 
 func putFunc(cmd *cobra.Command, args []string) {
@@ -123,7 +126,7 @@ func doPut(ctx context.Context, client v3.KV, requests <-chan v3.Op) {
 		if err != nil {
 			errStr = err.Error()
 		}
-		results <- result{errStr: errStr, duration: time.Since(st)}
+		results <- result{errStr: errStr, duration: time.Since(st), happened: time.Now()}
 		bar.Increment()
 	}
 }

+ 2 - 1
tools/benchmark/cmd/range.go

@@ -42,6 +42,7 @@ func init() {
 	RootCmd.AddCommand(rangeCmd)
 	rangeCmd.Flags().IntVar(&rangeTotal, "total", 10000, "Total number of range requests")
 	rangeCmd.Flags().StringVar(&rangeConsistency, "consistency", "l", "Linearizable(l) or Serializable(s)")
+	rangeCmd.Flags().BoolVar(&sample, "sample", false, "'true' to sample requests for every second")
 }
 
 func rangeFunc(cmd *cobra.Command, args []string) {
@@ -112,7 +113,7 @@ func doRange(client v3.KV, requests <-chan v3.Op) {
 		if err != nil {
 			errStr = err.Error()
 		}
-		results <- result{errStr: errStr, duration: time.Since(st)}
+		results <- result{errStr: errStr, duration: time.Since(st), happened: time.Now()}
 		bar.Increment()
 	}
 }

+ 13 - 0
tools/benchmark/cmd/report.go

@@ -31,6 +31,7 @@ const (
 type result struct {
 	errStr   string
 	duration time.Duration
+	happened time.Time
 }
 
 type report struct {
@@ -46,6 +47,8 @@ type report struct {
 
 	errorDist map[string]int
 	lats      []float64
+
+	sps *secondPoints
 }
 
 func printReport(results chan result) <-chan struct{} {
@@ -53,6 +56,7 @@ func printReport(results chan result) <-chan struct{} {
 		r := &report{
 			results:   results,
 			errorDist: make(map[string]int),
+			sps:       newSecondPoints(),
 		}
 		r.finalize()
 		r.print()
@@ -64,6 +68,7 @@ func printRate(results chan result) <-chan struct{} {
 		r := &report{
 			results:   results,
 			errorDist: make(map[string]int),
+			sps:       newSecondPoints(),
 		}
 		r.finalize()
 		fmt.Printf(" Requests/sec:\t%4.4f\n", r.rps)
@@ -85,6 +90,7 @@ func (r *report) finalize() {
 		if res.errStr != "" {
 			r.errorDist[res.errStr]++
 		} else {
+			r.sps.Add(res.happened, res.duration)
 			r.lats = append(r.lats, res.duration.Seconds())
 			r.avgTotal += res.duration.Seconds()
 		}
@@ -115,6 +121,9 @@ func (r *report) print() {
 		fmt.Printf("  Requests/sec:\t%4.4f\n", r.rps)
 		r.printHistogram()
 		r.printLatencies()
+		if sample {
+			r.printSecondSample()
+		}
 	}
 
 	if len(r.errorDist) > 0 {
@@ -142,6 +151,10 @@ func (r *report) printLatencies() {
 	}
 }
 
+func (r *report) printSecondSample() {
+	fmt.Println(r.sps.getTimeSeries())
+}
+
 func (r *report) printHistogram() {
 	bc := 10
 	buckets := make([]float64, bc+1)

+ 91 - 0
tools/benchmark/cmd/timeseries.go

@@ -0,0 +1,91 @@
+// Copyright 2016 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package cmd
+
+import (
+	"bytes"
+	"fmt"
+	"sort"
+	"sync"
+	"time"
+)
+
+type timeSeries struct {
+	timestamp  int64
+	avgLatency time.Duration
+	throughPut int64
+}
+
+type TimeSeries []timeSeries
+
+func (t TimeSeries) Swap(i, j int)      { t[i], t[j] = t[j], t[i] }
+func (t TimeSeries) Len() int           { return len(t) }
+func (t TimeSeries) Less(i, j int) bool { return t[i].timestamp < t[j].timestamp }
+
+type secondPoint struct {
+	totalLatency time.Duration
+	count        int64
+}
+
+type secondPoints struct {
+	mu sync.Mutex
+	tm map[int64]secondPoint
+}
+
+func newSecondPoints() *secondPoints {
+	return &secondPoints{tm: make(map[int64]secondPoint)}
+}
+
+func (sp *secondPoints) Add(ts time.Time, lat time.Duration) {
+	sp.mu.Lock()
+	defer sp.mu.Unlock()
+
+	tk := ts.Unix()
+	if v, ok := sp.tm[tk]; !ok {
+		sp.tm[tk] = secondPoint{totalLatency: lat, count: 1}
+	} else {
+		v.totalLatency += lat
+		v.count += 1
+		sp.tm[tk] = v
+	}
+}
+
+func (sp *secondPoints) getTimeSeries() TimeSeries {
+	sp.mu.Lock()
+	defer sp.mu.Unlock()
+
+	tslice := make(TimeSeries, len(sp.tm))
+	i := 0
+	for k, v := range sp.tm {
+		tslice[i] = timeSeries{
+			timestamp:  k,
+			avgLatency: time.Duration(v.totalLatency) / time.Duration(v.count),
+			throughPut: v.count,
+		}
+		i++
+	}
+	sort.Sort(tslice)
+	return tslice
+}
+
+func (ts TimeSeries) String() string {
+	buf := new(bytes.Buffer)
+	buf.WriteString("Sample in one second (unix latency throughput):\n")
+	for i := range ts {
+		buf.WriteString(fmt.Sprintf("%7d   %10s   %5d\n", ts[i].timestamp, ts[i].avgLatency, ts[i].throughPut))
+	}
+	buf.WriteString("\n")
+	return buf.String()
+}

+ 3 - 2
tools/benchmark/cmd/watch.go

@@ -80,6 +80,7 @@ func init() {
 	watchCmd.Flags().IntVar(&watchKeySize, "key-size", 32, "Key size of watch request")
 	watchCmd.Flags().IntVar(&watchKeySpaceSize, "key-space-size", 1, "Maximum possible keys")
 	watchCmd.Flags().BoolVar(&watchSeqKeys, "sequential-keys", false, "Use sequential keys")
+	watchCmd.Flags().BoolVar(&sample, "sample", false, "'true' to sample requests for every second")
 }
 
 func watchFunc(cmd *cobra.Command, args []string) {
@@ -184,7 +185,7 @@ func doWatch(stream v3.Watcher, requests <-chan string) {
 		if wch == nil {
 			errStr = "could not open watch channel"
 		}
-		results <- result{errStr: errStr, duration: time.Since(st)}
+		results <- result{errStr: errStr, duration: time.Since(st), happened: time.Now()}
 		bar.Increment()
 		go recvWatchChan(wch)
 	}
@@ -204,7 +205,7 @@ func recvWatchChan(wch v3.WatchChan) {
 		}
 
 		st := time.Now()
-		results <- result{duration: time.Since(st)}
+		results <- result{duration: time.Since(st), happened: time.Now()}
 		bar.Increment()
 
 		atomic.AddInt32(&nrRecvCompleted, 1)