adaptiveshedder_test.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. package load
  2. import (
  3. "math/rand"
  4. "sync"
  5. "sync/atomic"
  6. "testing"
  7. "time"
  8. "github.com/stretchr/testify/assert"
  9. "github.com/tal-tech/go-zero/core/collection"
  10. "github.com/tal-tech/go-zero/core/logx"
  11. "github.com/tal-tech/go-zero/core/mathx"
  12. "github.com/tal-tech/go-zero/core/stat"
  13. "github.com/tal-tech/go-zero/core/syncx"
  14. )
  15. const (
  16. buckets = 10
  17. bucketDuration = time.Millisecond * 50
  18. )
  19. func init() {
  20. stat.SetReporter(nil)
  21. }
  22. func TestAdaptiveShedder(t *testing.T) {
  23. shedder := NewAdaptiveShedder(WithWindow(bucketDuration), WithBuckets(buckets), WithCpuThreshold(100))
  24. var wg sync.WaitGroup
  25. var drop int64
  26. proba := mathx.NewProba()
  27. for i := 0; i < 100; i++ {
  28. wg.Add(1)
  29. go func() {
  30. defer wg.Done()
  31. for i := 0; i < 30; i++ {
  32. promise, err := shedder.Allow()
  33. if err != nil {
  34. atomic.AddInt64(&drop, 1)
  35. } else {
  36. count := rand.Intn(5)
  37. time.Sleep(time.Millisecond * time.Duration(count))
  38. if proba.TrueOnProba(0.01) {
  39. promise.Fail()
  40. } else {
  41. promise.Pass()
  42. }
  43. }
  44. }
  45. }()
  46. }
  47. wg.Wait()
  48. }
  49. func TestAdaptiveShedderMaxPass(t *testing.T) {
  50. passCounter := newRollingWindow()
  51. for i := 1; i <= 10; i++ {
  52. passCounter.Add(float64(i * 100))
  53. time.Sleep(bucketDuration)
  54. }
  55. shedder := &adaptiveShedder{
  56. passCounter: passCounter,
  57. droppedRecently: syncx.NewAtomicBool(),
  58. }
  59. assert.Equal(t, int64(1000), shedder.maxPass())
  60. // default max pass is equal to 1.
  61. passCounter = newRollingWindow()
  62. shedder = &adaptiveShedder{
  63. passCounter: passCounter,
  64. droppedRecently: syncx.NewAtomicBool(),
  65. }
  66. assert.Equal(t, int64(1), shedder.maxPass())
  67. }
  68. func TestAdaptiveShedderMinRt(t *testing.T) {
  69. rtCounter := newRollingWindow()
  70. for i := 0; i < 10; i++ {
  71. if i > 0 {
  72. time.Sleep(bucketDuration)
  73. }
  74. for j := i*10 + 1; j <= i*10+10; j++ {
  75. rtCounter.Add(float64(j))
  76. }
  77. }
  78. shedder := &adaptiveShedder{
  79. rtCounter: rtCounter,
  80. }
  81. assert.Equal(t, float64(6), shedder.minRt())
  82. // default max min rt is equal to maxFloat64.
  83. rtCounter = newRollingWindow()
  84. shedder = &adaptiveShedder{
  85. rtCounter: rtCounter,
  86. droppedRecently: syncx.NewAtomicBool(),
  87. }
  88. assert.Equal(t, defaultMinRt, shedder.minRt())
  89. }
  90. func TestAdaptiveShedderMaxFlight(t *testing.T) {
  91. passCounter := newRollingWindow()
  92. rtCounter := newRollingWindow()
  93. for i := 0; i < 10; i++ {
  94. if i > 0 {
  95. time.Sleep(bucketDuration)
  96. }
  97. passCounter.Add(float64((i + 1) * 100))
  98. for j := i*10 + 1; j <= i*10+10; j++ {
  99. rtCounter.Add(float64(j))
  100. }
  101. }
  102. shedder := &adaptiveShedder{
  103. passCounter: passCounter,
  104. rtCounter: rtCounter,
  105. windows: buckets,
  106. droppedRecently: syncx.NewAtomicBool(),
  107. }
  108. assert.Equal(t, int64(54), shedder.maxFlight())
  109. }
  110. func TestAdaptiveShedderShouldDrop(t *testing.T) {
  111. logx.Disable()
  112. passCounter := newRollingWindow()
  113. rtCounter := newRollingWindow()
  114. for i := 0; i < 10; i++ {
  115. if i > 0 {
  116. time.Sleep(bucketDuration)
  117. }
  118. passCounter.Add(float64((i + 1) * 100))
  119. for j := i*10 + 1; j <= i*10+10; j++ {
  120. rtCounter.Add(float64(j))
  121. }
  122. }
  123. shedder := &adaptiveShedder{
  124. passCounter: passCounter,
  125. rtCounter: rtCounter,
  126. windows: buckets,
  127. droppedRecently: syncx.NewAtomicBool(),
  128. }
  129. // cpu >= 800, inflight < maxPass
  130. systemOverloadChecker = func(int64) bool {
  131. return true
  132. }
  133. shedder.avgFlying = 50
  134. assert.False(t, shedder.shouldDrop())
  135. // cpu >= 800, inflight > maxPass
  136. shedder.avgFlying = 80
  137. shedder.flying = 50
  138. assert.False(t, shedder.shouldDrop())
  139. // cpu >= 800, inflight > maxPass
  140. shedder.avgFlying = 80
  141. shedder.flying = 80
  142. assert.True(t, shedder.shouldDrop())
  143. // cpu < 800, inflight > maxPass
  144. systemOverloadChecker = func(int64) bool {
  145. return false
  146. }
  147. shedder.avgFlying = 80
  148. assert.False(t, shedder.shouldDrop())
  149. }
  150. func BenchmarkAdaptiveShedder_Allow(b *testing.B) {
  151. logx.Disable()
  152. bench := func(b *testing.B) {
  153. var shedder = NewAdaptiveShedder()
  154. proba := mathx.NewProba()
  155. for i := 0; i < 6000; i++ {
  156. p, err := shedder.Allow()
  157. if err == nil {
  158. time.Sleep(time.Millisecond)
  159. if proba.TrueOnProba(0.01) {
  160. p.Fail()
  161. } else {
  162. p.Pass()
  163. }
  164. }
  165. }
  166. b.ResetTimer()
  167. for i := 0; i < b.N; i++ {
  168. p, err := shedder.Allow()
  169. if err == nil {
  170. p.Pass()
  171. }
  172. }
  173. }
  174. systemOverloadChecker = func(int64) bool {
  175. return true
  176. }
  177. b.Run("high load", bench)
  178. systemOverloadChecker = func(int64) bool {
  179. return false
  180. }
  181. b.Run("low load", bench)
  182. }
  183. func newRollingWindow() *collection.RollingWindow {
  184. return collection.NewRollingWindow(buckets, bucketDuration, collection.IgnoreCurrentBucket())
  185. }