main.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. package main
  2. import (
  3. "flag"
  4. "fmt"
  5. "io"
  6. "math"
  7. "math/rand"
  8. "os"
  9. "sync"
  10. "sync/atomic"
  11. "time"
  12. "github.com/tal-tech/go-zero/core/collection"
  13. "github.com/tal-tech/go-zero/core/executors"
  14. "github.com/tal-tech/go-zero/core/logx"
  15. "github.com/tal-tech/go-zero/core/syncx"
  16. "gopkg.in/cheggaaa/pb.v1"
  17. )
  18. const (
  19. beta = 0.9
  20. total = 400
  21. interval = time.Second
  22. factor = 5
  23. )
  24. var (
  25. seconds = flag.Int("d", 400, "duration to go")
  26. flying uint64
  27. avgFlyingAggressive float64
  28. aggressiveLock syncx.SpinLock
  29. avgFlyingLazy float64
  30. lazyLock syncx.SpinLock
  31. avgFlyingBoth float64
  32. bothLock syncx.SpinLock
  33. lessWriter *executors.LessExecutor
  34. passCounter = collection.NewRollingWindow(50, time.Millisecond*100)
  35. rtCounter = collection.NewRollingWindow(50, time.Millisecond*100)
  36. index int32
  37. )
  38. func main() {
  39. flag.Parse()
  40. // only log 100 records
  41. lessWriter = executors.NewLessExecutor(interval * total / 100)
  42. fp, err := os.Create("result.csv")
  43. logx.Must(err)
  44. defer fp.Close()
  45. fmt.Fprintln(fp, "second,maxFlight,flying,agressiveAvgFlying,lazyAvgFlying,bothAvgFlying")
  46. ticker := time.NewTicker(interval)
  47. defer ticker.Stop()
  48. bar := pb.New(*seconds * 2).Start()
  49. var waitGroup sync.WaitGroup
  50. batchRequests := func(i int) {
  51. <-ticker.C
  52. requests := (i + 1) * factor
  53. func() {
  54. it := time.NewTicker(interval / time.Duration(requests))
  55. defer it.Stop()
  56. for j := 0; j < requests; j++ {
  57. <-it.C
  58. waitGroup.Add(1)
  59. go func() {
  60. issueRequest(fp, atomic.AddInt32(&index, 1))
  61. waitGroup.Done()
  62. }()
  63. }
  64. bar.Increment()
  65. }()
  66. }
  67. for i := 0; i < *seconds; i++ {
  68. batchRequests(i)
  69. }
  70. for i := *seconds; i > 0; i-- {
  71. batchRequests(i)
  72. }
  73. bar.Finish()
  74. waitGroup.Wait()
  75. }
  76. func issueRequest(writer io.Writer, idx int32) {
  77. v := atomic.AddUint64(&flying, 1)
  78. aggressiveLock.Lock()
  79. af := avgFlyingAggressive*beta + float64(v)*(1-beta)
  80. avgFlyingAggressive = af
  81. aggressiveLock.Unlock()
  82. bothLock.Lock()
  83. bf := avgFlyingBoth*beta + float64(v)*(1-beta)
  84. avgFlyingBoth = bf
  85. bothLock.Unlock()
  86. duration := time.Millisecond * time.Duration(rand.Int63n(10)+1)
  87. job(duration)
  88. passCounter.Add(1)
  89. rtCounter.Add(float64(duration) / float64(time.Millisecond))
  90. v1 := atomic.AddUint64(&flying, ^uint64(0))
  91. lazyLock.Lock()
  92. lf := avgFlyingLazy*beta + float64(v1)*(1-beta)
  93. avgFlyingLazy = lf
  94. lazyLock.Unlock()
  95. bothLock.Lock()
  96. bf = avgFlyingBoth*beta + float64(v1)*(1-beta)
  97. avgFlyingBoth = bf
  98. bothLock.Unlock()
  99. lessWriter.DoOrDiscard(func() {
  100. fmt.Fprintf(writer, "%d,%d,%d,%.2f,%.2f,%.2f\n", idx, maxFlight(), v, af, lf, bf)
  101. })
  102. }
  103. func job(duration time.Duration) {
  104. time.Sleep(duration)
  105. }
  106. func maxFlight() int64 {
  107. return int64(math.Max(1, float64(maxPass()*10)*(minRt()/1e3)))
  108. }
  109. func maxPass() int64 {
  110. var result float64 = 1
  111. passCounter.Reduce(func(b *collection.Bucket) {
  112. if b.Sum > result {
  113. result = b.Sum
  114. }
  115. })
  116. return int64(result)
  117. }
  118. func minRt() float64 {
  119. var result float64 = 1000
  120. rtCounter.Reduce(func(b *collection.Bucket) {
  121. if b.Count <= 0 {
  122. return
  123. }
  124. avg := math.Round(b.Sum / float64(b.Count))
  125. if avg < result {
  126. result = avg
  127. }
  128. })
  129. return result
  130. }