adaptiveshedder_test.go 5.4 KB


  1. package load
  2. import (
  3. "math/rand"
  4. "sync"
  5. "sync/atomic"
  6. "testing"
  7. "time"
  8. "github.com/stretchr/testify/assert"
  9. "github.com/tal-tech/go-zero/core/collection"
  10. "github.com/tal-tech/go-zero/core/logx"
  11. "github.com/tal-tech/go-zero/core/mathx"
  12. "github.com/tal-tech/go-zero/core/stat"
  13. "github.com/tal-tech/go-zero/core/syncx"
  14. )
  15. const (
  16. buckets = 10
  17. bucketDuration = time.Millisecond * 50
  18. )
  19. func init() {
  20. stat.SetReporter(nil)
  21. }
  22. func TestAdaptiveShedder(t *testing.T) {
  23. shedder := NewAdaptiveShedder(WithWindow(bucketDuration), WithBuckets(buckets), WithCpuThreshold(100))
  24. var wg sync.WaitGroup
  25. var drop int64
  26. proba := mathx.NewProba()
  27. for i := 0; i < 100; i++ {
  28. wg.Add(1)
  29. go func() {
  30. defer wg.Done()
  31. for i := 0; i < 30; i++ {
  32. promise, err := shedder.Allow()
  33. if err != nil {
  34. atomic.AddInt64(&drop, 1)
  35. } else {
  36. count := rand.Intn(5)
  37. time.Sleep(time.Millisecond * time.Duration(count))
  38. if proba.TrueOnProba(0.01) {
  39. promise.Fail()
  40. } else {
  41. promise.Pass()
  42. }
  43. }
  44. }
  45. }()
  46. }
  47. wg.Wait()
  48. }
  49. func TestAdaptiveShedderMaxPass(t *testing.T) {
  50. passCounter := newRollingWindow()
  51. for i := 1; i <= 10; i++ {
  52. passCounter.Add(float64(i * 100))
  53. time.Sleep(bucketDuration)
  54. }
  55. shedder := &adaptiveShedder{
  56. passCounter: passCounter,
  57. droppedRecently: syncx.NewAtomicBool(),
  58. }
  59. assert.Equal(t, int64(1000), shedder.maxPass())
  60. // default max pass is equal to 1.
  61. passCounter = newRollingWindow()
  62. shedder = &adaptiveShedder{
  63. passCounter: passCounter,
  64. droppedRecently: syncx.NewAtomicBool(),
  65. }
  66. assert.Equal(t, int64(1), shedder.maxPass())
  67. }
  68. func TestAdaptiveShedderMinRt(t *testing.T) {
  69. rtCounter := newRollingWindow()
  70. for i := 0; i < 10; i++ {
  71. if i > 0 {
  72. time.Sleep(bucketDuration)
  73. }
  74. for j := i*10 + 1; j <= i*10+10; j++ {
  75. rtCounter.Add(float64(j))
  76. }
  77. }
  78. shedder := &adaptiveShedder{
  79. rtCounter: rtCounter,
  80. }
  81. assert.Equal(t, float64(6), shedder.minRt())
  82. // default max min rt is equal to maxFloat64.
  83. rtCounter = newRollingWindow()
  84. shedder = &adaptiveShedder{
  85. rtCounter: rtCounter,
  86. droppedRecently: syncx.NewAtomicBool(),
  87. }
  88. assert.Equal(t, defaultMinRt, shedder.minRt())
  89. }
  90. func TestAdaptiveShedderMaxFlight(t *testing.T) {
  91. passCounter := newRollingWindow()
  92. rtCounter := newRollingWindow()
  93. for i := 0; i < 10; i++ {
  94. if i > 0 {
  95. time.Sleep(bucketDuration)
  96. }
  97. passCounter.Add(float64((i + 1) * 100))
  98. for j := i*10 + 1; j <= i*10+10; j++ {
  99. rtCounter.Add(float64(j))
  100. }
  101. }
  102. shedder := &adaptiveShedder{
  103. passCounter: passCounter,
  104. rtCounter: rtCounter,
  105. windows: buckets,
  106. droppedRecently: syncx.NewAtomicBool(),
  107. }
  108. assert.Equal(t, int64(54), shedder.maxFlight())
  109. }
  110. func TestAdaptiveShedderShouldDrop(t *testing.T) {
  111. logx.Disable()
  112. passCounter := newRollingWindow()
  113. rtCounter := newRollingWindow()
  114. for i := 0; i < 10; i++ {
  115. if i > 0 {
  116. time.Sleep(bucketDuration)
  117. }
  118. passCounter.Add(float64((i + 1) * 100))
  119. for j := i*10 + 1; j <= i*10+10; j++ {
  120. rtCounter.Add(float64(j))
  121. }
  122. }
  123. shedder := &adaptiveShedder{
  124. passCounter: passCounter,
  125. rtCounter: rtCounter,
  126. windows: buckets,
  127. dropTime: syncx.NewAtomicDuration(),
  128. droppedRecently: syncx.NewAtomicBool(),
  129. }
  130. // cpu >= 800, inflight < maxPass
  131. systemOverloadChecker = func(int64) bool {
  132. return true
  133. }
  134. shedder.avgFlying = 50
  135. assert.False(t, shedder.shouldDrop())
  136. // cpu >= 800, inflight > maxPass
  137. shedder.avgFlying = 80
  138. shedder.flying = 50
  139. assert.False(t, shedder.shouldDrop())
  140. // cpu >= 800, inflight > maxPass
  141. shedder.avgFlying = 80
  142. shedder.flying = 80
  143. assert.True(t, shedder.shouldDrop())
  144. // cpu < 800, inflight > maxPass
  145. systemOverloadChecker = func(int64) bool {
  146. return false
  147. }
  148. shedder.avgFlying = 80
  149. assert.False(t, shedder.shouldDrop())
  150. // cpu >= 800, inflight < maxPass
  151. systemOverloadChecker = func(int64) bool {
  152. return true
  153. }
  154. shedder.avgFlying = 80
  155. shedder.flying = 80
  156. _, err := shedder.Allow()
  157. assert.NotNil(t, err)
  158. }
  159. func TestAdaptiveShedderStillHot(t *testing.T) {
  160. logx.Disable()
  161. passCounter := newRollingWindow()
  162. rtCounter := newRollingWindow()
  163. for i := 0; i < 10; i++ {
  164. if i > 0 {
  165. time.Sleep(bucketDuration)
  166. }
  167. passCounter.Add(float64((i + 1) * 100))
  168. for j := i*10 + 1; j <= i*10+10; j++ {
  169. rtCounter.Add(float64(j))
  170. }
  171. }
  172. shedder := &adaptiveShedder{
  173. passCounter: passCounter,
  174. rtCounter: rtCounter,
  175. windows: buckets,
  176. dropTime: syncx.NewAtomicDuration(),
  177. droppedRecently: syncx.ForAtomicBool(true),
  178. }
  179. assert.False(t, shedder.stillHot())
  180. shedder.dropTime.Set(-coolOffDuration * 2)
  181. assert.False(t, shedder.stillHot())
  182. }
  183. func BenchmarkAdaptiveShedder_Allow(b *testing.B) {
  184. logx.Disable()
  185. bench := func(b *testing.B) {
  186. var shedder = NewAdaptiveShedder()
  187. proba := mathx.NewProba()
  188. for i := 0; i < 6000; i++ {
  189. p, err := shedder.Allow()
  190. if err == nil {
  191. time.Sleep(time.Millisecond)
  192. if proba.TrueOnProba(0.01) {
  193. p.Fail()
  194. } else {
  195. p.Pass()
  196. }
  197. }
  198. }
  199. b.ResetTimer()
  200. for i := 0; i < b.N; i++ {
  201. p, err := shedder.Allow()
  202. if err == nil {
  203. p.Pass()
  204. }
  205. }
  206. }
  207. systemOverloadChecker = func(int64) bool {
  208. return true
  209. }
  210. b.Run("high load", bench)
  211. systemOverloadChecker = func(int64) bool {
  212. return false
  213. }
  214. b.Run("low load", bench)
  215. }
  216. func newRollingWindow() *collection.RollingWindow {
  217. return collection.NewRollingWindow(buckets, bucketDuration, collection.IgnoreCurrentBucket())
  218. }