Browse Source

report: add NewWeightedReport

Reports with weighted results.
Anthony Romano 8 years ago
parent
commit
c09f0ca9d4
3 changed files with 184 additions and 62 deletions
  1. 52 62
      pkg/report/report.go
  2. 31 0
      pkg/report/report_test.go
  3. 101 0
      pkg/report/weighted.go

+ 52 - 62
pkg/report/report.go

@@ -30,9 +30,10 @@ const (
 
 // Result describes the timings for an operation.
 type Result struct {
-	Start time.Time
-	End   time.Time
-	Err   error
+	Start  time.Time
+	End    time.Time
+	Err    error
+	Weight float64
 }
 
 func (res *Result) Duration() time.Duration { return res.End.Sub(res.Start) }
@@ -41,18 +42,8 @@ type report struct {
 	results   chan Result
 	precision string
 
-	avgTotal float64
-	fastest  float64
-	slowest  float64
-	average  float64
-	stddev   float64
-	rps      float64
-	total    time.Duration
-
-	errorDist map[string]int
-	lats      []float64
-
-	sps *secondPoints
+	stats Stats
+	sps   *secondPoints
 }
 
 // Stats exposes results raw data.
@@ -69,6 +60,13 @@ type Stats struct {
 	TimeSeries TimeSeries
 }
 
+func (s *Stats) copy() Stats {
+	ss := *s
+	ss.ErrorDist = copyMap(ss.ErrorDist)
+	ss.Lats = copyFloats(ss.Lats)
+	return ss
+}
+
 // Report processes a result stream until it is closed, then produces a
 // string with information about the consumed result data.
 type Report interface {
@@ -81,12 +79,15 @@ type Report interface {
 	Stats() <-chan Stats
 }
 
-func NewReport(precision string) Report {
-	return &report{
+func NewReport(precision string) Report { return newReport(precision) }
+
+func newReport(precision string) *report {
+	r := &report{
 		results:   make(chan Result, 16),
 		precision: precision,
-		errorDist: make(map[string]int),
 	}
+	r.stats.ErrorDist = make(map[string]int)
+	return r
 }
 
 func NewReportSample(precision string) Report {
@@ -112,22 +113,11 @@ func (r *report) Stats() <-chan Stats {
 	go func() {
 		defer close(donec)
 		r.processResults()
-		var ts TimeSeries
+		s := r.stats.copy()
 		if r.sps != nil {
-			ts = r.sps.getTimeSeries()
-		}
-		donec <- Stats{
-			AvgTotal:   r.avgTotal,
-			Fastest:    r.fastest,
-			Slowest:    r.slowest,
-			Average:    r.average,
-			Stddev:     r.stddev,
-			RPS:        r.rps,
-			Total:      r.total,
-			ErrorDist:  copyMap(r.errorDist),
-			Lats:       copyFloats(r.lats),
-			TimeSeries: ts,
+			s.TimeSeries = r.sps.getTimeSeries()
 		}
+		donec <- s
 	}()
 	return donec
 }
@@ -147,21 +137,21 @@ func copyFloats(s []float64) (c []float64) {
 }
 
 func (r *report) String() (s string) {
-	if len(r.lats) > 0 {
+	if len(r.stats.Lats) > 0 {
 		s += fmt.Sprintf("\nSummary:\n")
-		s += fmt.Sprintf("  Total:\t%s.\n", r.sec2str(r.total.Seconds()))
-		s += fmt.Sprintf("  Slowest:\t%s.\n", r.sec2str(r.slowest))
-		s += fmt.Sprintf("  Fastest:\t%s.\n", r.sec2str(r.fastest))
-		s += fmt.Sprintf("  Average:\t%s.\n", r.sec2str(r.average))
-		s += fmt.Sprintf("  Stddev:\t%s.\n", r.sec2str(r.stddev))
-		s += fmt.Sprintf("  Requests/sec:\t"+r.precision+"\n", r.rps)
+		s += fmt.Sprintf("  Total:\t%s.\n", r.sec2str(r.stats.Total.Seconds()))
+		s += fmt.Sprintf("  Slowest:\t%s.\n", r.sec2str(r.stats.Slowest))
+		s += fmt.Sprintf("  Fastest:\t%s.\n", r.sec2str(r.stats.Fastest))
+		s += fmt.Sprintf("  Average:\t%s.\n", r.sec2str(r.stats.Average))
+		s += fmt.Sprintf("  Stddev:\t%s.\n", r.sec2str(r.stats.Stddev))
+		s += fmt.Sprintf("  Requests/sec:\t"+r.precision+"\n", r.stats.RPS)
 		s += r.histogram()
 		s += r.sprintLatencies()
 		if r.sps != nil {
 			s += fmt.Sprintf("%v\n", r.sps.getTimeSeries())
 		}
 	}
-	if len(r.errorDist) > 0 {
+	if len(r.stats.ErrorDist) > 0 {
 		s += r.errors()
 	}
 	return s
@@ -176,17 +166,17 @@ func NewReportRate(precision string) Report {
 }
 
 func (r *reportRate) String() string {
-	return fmt.Sprintf(" Requests/sec:\t"+r.precision+"\n", r.rps)
+	return fmt.Sprintf(" Requests/sec:\t"+r.precision+"\n", r.stats.RPS)
 }
 
 func (r *report) processResult(res *Result) {
 	if res.Err != nil {
-		r.errorDist[res.Err.Error()]++
+		r.stats.ErrorDist[res.Err.Error()]++
 		return
 	}
 	dur := res.Duration()
-	r.lats = append(r.lats, dur.Seconds())
-	r.avgTotal += dur.Seconds()
+	r.stats.Lats = append(r.stats.Lats, dur.Seconds())
+	r.stats.AvgTotal += dur.Seconds()
 	if r.sps != nil {
 		r.sps.Add(res.Start, dur)
 	}
@@ -197,19 +187,19 @@ func (r *report) processResults() {
 	for res := range r.results {
 		r.processResult(&res)
 	}
-	r.total = time.Since(st)
+	r.stats.Total = time.Since(st)
 
-	r.rps = float64(len(r.lats)) / r.total.Seconds()
-	r.average = r.avgTotal / float64(len(r.lats))
-	for i := range r.lats {
-		dev := r.lats[i] - r.average
-		r.stddev += dev * dev
+	r.stats.RPS = float64(len(r.stats.Lats)) / r.stats.Total.Seconds()
+	r.stats.Average = r.stats.AvgTotal / float64(len(r.stats.Lats))
+	for i := range r.stats.Lats {
+		dev := r.stats.Lats[i] - r.stats.Average
+		r.stats.Stddev += dev * dev
 	}
-	r.stddev = math.Sqrt(r.stddev / float64(len(r.lats)))
-	sort.Float64s(r.lats)
-	if len(r.lats) > 0 {
-		r.fastest = r.lats[0]
-		r.slowest = r.lats[len(r.lats)-1]
+	r.stats.Stddev = math.Sqrt(r.stats.Stddev / float64(len(r.stats.Lats)))
+	sort.Float64s(r.stats.Lats)
+	if len(r.stats.Lats) > 0 {
+		r.stats.Fastest = r.stats.Lats[0]
+		r.stats.Slowest = r.stats.Lats[len(r.stats.Lats)-1]
 	}
 }
 
@@ -235,7 +225,7 @@ func percentiles(nums []float64) (data []float64) {
 }
 
 func (r *report) sprintLatencies() string {
-	data := percentiles(r.lats)
+	data := percentiles(r.stats.Lats)
 	s := fmt.Sprintf("\nLatency distribution:\n")
 	for i := 0; i < len(pctls); i++ {
 		if data[i] > 0 {
@@ -249,15 +239,15 @@ func (r *report) histogram() string {
 	bc := 10
 	buckets := make([]float64, bc+1)
 	counts := make([]int, bc+1)
-	bs := (r.slowest - r.fastest) / float64(bc)
+	bs := (r.stats.Slowest - r.stats.Fastest) / float64(bc)
 	for i := 0; i < bc; i++ {
-		buckets[i] = r.fastest + bs*float64(i)
+		buckets[i] = r.stats.Fastest + bs*float64(i)
 	}
-	buckets[bc] = r.slowest
+	buckets[bc] = r.stats.Slowest
 	var bi int
 	var max int
-	for i := 0; i < len(r.lats); {
-		if r.lats[i] <= buckets[bi] {
+	for i := 0; i < len(r.stats.Lats); {
+		if r.stats.Lats[i] <= buckets[bi] {
 			i++
 			counts[bi]++
 			if max < counts[bi] {
@@ -281,7 +271,7 @@ func (r *report) histogram() string {
 
 func (r *report) errors() string {
 	s := fmt.Sprintf("\nError distribution:\n")
-	for err, num := range r.errorDist {
+	for err, num := range r.stats.ErrorDist {
 		s += fmt.Sprintf("  [%d]\t%s\n", num, err)
 	}
 	return s

+ 31 - 0
pkg/report/report_test.go

@@ -81,3 +81,34 @@ func TestReport(t *testing.T) {
 		}
 	}
 }
+
+func TestWeightedReport(t *testing.T) {
+	r := NewWeightedReport(NewReport("%f"), "%f")
+	go func() {
+		start := time.Now()
+		for i := 0; i < 5; i++ {
+			end := start.Add(time.Second)
+			r.Results() <- Result{Start: start, End: end, Weight: 2.0}
+			start = end
+		}
+		r.Results() <- Result{Start: start, End: start.Add(time.Second), Err: fmt.Errorf("oops")}
+		close(r.Results())
+	}()
+
+	stats := <-r.Stats()
+	stats.TimeSeries = nil // ignore timeseries since it uses wall clock
+	wStats := Stats{
+		AvgTotal:  10.0,
+		Fastest:   0.5,
+		Slowest:   0.5,
+		Average:   0.5,
+		Stddev:    0.0,
+		Total:     stats.Total,
+		RPS:       10.0 / stats.Total.Seconds(),
+		ErrorDist: map[string]int{"oops": 1},
+		Lats:      []float64{0.5, 0.5, 0.5, 0.5, 0.5},
+	}
+	if !reflect.DeepEqual(stats, wStats) {
+		t.Fatalf("got %+v, want %+v", stats, wStats)
+	}
+}

+ 101 - 0
pkg/report/weighted.go

@@ -0,0 +1,101 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// the file is borrowed from github.com/rakyll/boom/boomer/print.go
+
+package report
+
+import (
+	"time"
+)
+
+type weightedReport struct {
+	baseReport Report
+
+	report      *report
+	results     chan Result
+	weightTotal float64
+}
+
+// NewWeightedReport returns a report that includes
+// both weighted and unweighted statistics.
+func NewWeightedReport(r Report, precision string) Report {
+	return &weightedReport{
+		baseReport: r,
+		report:     newReport(precision),
+		results:    make(chan Result, 16),
+	}
+}
+
+func (wr *weightedReport) Results() chan<- Result { return wr.results }
+
+func (wr *weightedReport) Run() <-chan string {
+	donec := make(chan string, 2)
+	go func() {
+		defer close(donec)
+		basec, rc := make(chan string, 1), make(chan Stats, 1)
+		go func() { basec <- (<-wr.baseReport.Run()) }()
+		go func() { rc <- (<-wr.report.Stats()) }()
+		go wr.processResults()
+		wr.report.stats = wr.reweighStat(<-rc)
+		donec <- wr.report.String()
+		donec <- (<-basec)
+	}()
+	return donec
+}
+
+func (wr *weightedReport) Stats() <-chan Stats {
+	donec := make(chan Stats, 2)
+	go func() {
+		defer close(donec)
+		basec, rc := make(chan Stats, 1), make(chan Stats, 1)
+		go func() { basec <- (<-wr.baseReport.Stats()) }()
+		go func() { rc <- (<-wr.report.Stats()) }()
+		go wr.processResults()
+		donec <- wr.reweighStat(<-rc)
+		donec <- (<-basec)
+	}()
+	return donec
+}
+
+func (wr *weightedReport) processResults() {
+	defer close(wr.report.results)
+	defer close(wr.baseReport.Results())
+	for res := range wr.results {
+		wr.processResult(res)
+		wr.baseReport.Results() <- res
+	}
+}
+
+func (wr *weightedReport) processResult(res Result) {
+	if res.Err != nil {
+		wr.report.results <- res
+		return
+	}
+	if res.Weight == 0 {
+		res.Weight = 1.0
+	}
+	wr.weightTotal += res.Weight
+	res.End = res.Start.Add(time.Duration(float64(res.End.Sub(res.Start)) / res.Weight))
+	res.Weight = 1.0
+	wr.report.results <- res
+}
+
+func (wr *weightedReport) reweighStat(s Stats) Stats {
+	weightCoef := wr.weightTotal / float64(len(s.Lats))
+	// weight > 1 => processing more than one request
+	s.RPS *= weightCoef
+	s.AvgTotal *= weightCoef * weightCoef
+	return s
+}