weighted.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. // Copyright 2017 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. // the file is borrowed from github.com/rakyll/boom/boomer/print.go
  15. package report
  16. import (
  17. "time"
  18. )
  19. type weightedReport struct {
  20. baseReport Report
  21. report *report
  22. results chan Result
  23. weightTotal float64
  24. }
  25. // NewWeightedReport returns a report that includes
  26. // both weighted and unweighted statistics.
  27. func NewWeightedReport(r Report, precision string) Report {
  28. return &weightedReport{
  29. baseReport: r,
  30. report: newReport(precision),
  31. results: make(chan Result, 16),
  32. }
  33. }
  34. func (wr *weightedReport) Results() chan<- Result { return wr.results }
  35. func (wr *weightedReport) Run() <-chan string {
  36. donec := make(chan string, 2)
  37. go func() {
  38. defer close(donec)
  39. basec, rc := make(chan string, 1), make(chan Stats, 1)
  40. go func() { basec <- (<-wr.baseReport.Run()) }()
  41. go func() { rc <- (<-wr.report.Stats()) }()
  42. go wr.processResults()
  43. wr.report.stats = wr.reweighStat(<-rc)
  44. donec <- wr.report.String()
  45. donec <- (<-basec)
  46. }()
  47. return donec
  48. }
  49. func (wr *weightedReport) Stats() <-chan Stats {
  50. donec := make(chan Stats, 2)
  51. go func() {
  52. defer close(donec)
  53. basec, rc := make(chan Stats, 1), make(chan Stats, 1)
  54. go func() { basec <- (<-wr.baseReport.Stats()) }()
  55. go func() { rc <- (<-wr.report.Stats()) }()
  56. go wr.processResults()
  57. donec <- wr.reweighStat(<-rc)
  58. donec <- (<-basec)
  59. }()
  60. return donec
  61. }
  62. func (wr *weightedReport) processResults() {
  63. defer close(wr.report.results)
  64. defer close(wr.baseReport.Results())
  65. for res := range wr.results {
  66. wr.processResult(res)
  67. wr.baseReport.Results() <- res
  68. }
  69. }
  70. func (wr *weightedReport) processResult(res Result) {
  71. if res.Err != nil {
  72. wr.report.results <- res
  73. return
  74. }
  75. if res.Weight == 0 {
  76. res.Weight = 1.0
  77. }
  78. wr.weightTotal += res.Weight
  79. res.End = res.Start.Add(time.Duration(float64(res.End.Sub(res.Start)) / res.Weight))
  80. res.Weight = 1.0
  81. wr.report.results <- res
  82. }
  83. func (wr *weightedReport) reweighStat(s Stats) Stats {
  84. weightCoef := wr.weightTotal / float64(len(s.Lats))
  85. // weight > 1 => processing more than one request
  86. s.RPS *= weightCoef
  87. s.AvgTotal *= weightCoef * weightCoef
  88. return s
  89. }