adaptiveshedder.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. package load
  2. import (
  3. "errors"
  4. "fmt"
  5. "math"
  6. "sync/atomic"
  7. "time"
  8. "github.com/tal-tech/go-zero/core/collection"
  9. "github.com/tal-tech/go-zero/core/logx"
  10. "github.com/tal-tech/go-zero/core/stat"
  11. "github.com/tal-tech/go-zero/core/syncx"
  12. "github.com/tal-tech/go-zero/core/timex"
  13. )
  14. const (
  15. defaultBuckets = 50
  16. defaultWindow = time.Second * 5
  17. // using 1000m notation, 900m is like 80%, keep it as var for unit test
  18. defaultCpuThreshold = 900
  19. defaultMinRt = float64(time.Second / time.Millisecond)
  20. // moving average hyperparameter beta for calculating requests on the fly
  21. flyingBeta = 0.9
  22. coolOffDuration = time.Second
  23. )
  24. var (
  25. ErrServiceOverloaded = errors.New("service overloaded")
  26. // default to be enabled
  27. enabled = syncx.ForAtomicBool(true)
  28. // make it a variable for unit test
  29. systemOverloadChecker = func(cpuThreshold int64) bool {
  30. return stat.CpuUsage() >= cpuThreshold
  31. }
  32. )
  33. type (
  34. Promise interface {
  35. Pass()
  36. Fail()
  37. }
  38. Shedder interface {
  39. Allow() (Promise, error)
  40. }
  41. ShedderOption func(opts *shedderOptions)
  42. shedderOptions struct {
  43. window time.Duration
  44. buckets int
  45. cpuThreshold int64
  46. }
  47. adaptiveShedder struct {
  48. cpuThreshold int64
  49. windows int64
  50. flying int64
  51. avgFlying float64
  52. avgFlyingLock syncx.SpinLock
  53. dropTime *syncx.AtomicDuration
  54. droppedRecently *syncx.AtomicBool
  55. passCounter *collection.RollingWindow
  56. rtCounter *collection.RollingWindow
  57. }
  58. )
  59. func Disable() {
  60. enabled.Set(false)
  61. }
  62. func NewAdaptiveShedder(opts ...ShedderOption) Shedder {
  63. if !enabled.True() {
  64. return newNopShedder()
  65. }
  66. options := shedderOptions{
  67. window: defaultWindow,
  68. buckets: defaultBuckets,
  69. cpuThreshold: defaultCpuThreshold,
  70. }
  71. for _, opt := range opts {
  72. opt(&options)
  73. }
  74. bucketDuration := options.window / time.Duration(options.buckets)
  75. return &adaptiveShedder{
  76. cpuThreshold: options.cpuThreshold,
  77. windows: int64(time.Second / bucketDuration),
  78. dropTime: syncx.NewAtomicDuration(),
  79. droppedRecently: syncx.NewAtomicBool(),
  80. passCounter: collection.NewRollingWindow(options.buckets, bucketDuration,
  81. collection.IgnoreCurrentBucket()),
  82. rtCounter: collection.NewRollingWindow(options.buckets, bucketDuration,
  83. collection.IgnoreCurrentBucket()),
  84. }
  85. }
  86. func (as *adaptiveShedder) Allow() (Promise, error) {
  87. if as.shouldDrop() {
  88. as.dropTime.Set(timex.Now())
  89. as.droppedRecently.Set(true)
  90. return nil, ErrServiceOverloaded
  91. }
  92. as.addFlying(1)
  93. return &promise{
  94. start: timex.Now(),
  95. shedder: as,
  96. }, nil
  97. }
  98. func (as *adaptiveShedder) addFlying(delta int64) {
  99. flying := atomic.AddInt64(&as.flying, delta)
  100. // update avgFlying when the request is finished.
  101. // this strategy makes avgFlying have a little bit lag against flying, and smoother.
  102. // when the flying requests increase rapidly, avgFlying increase slower, accept more requests.
  103. // when the flying requests drop rapidly, avgFlying drop slower, accept less requests.
  104. // it makes the service to serve as more requests as possible.
  105. if delta < 0 {
  106. as.avgFlyingLock.Lock()
  107. as.avgFlying = as.avgFlying*flyingBeta + float64(flying)*(1-flyingBeta)
  108. as.avgFlyingLock.Unlock()
  109. }
  110. }
  111. func (as *adaptiveShedder) highThru() bool {
  112. as.avgFlyingLock.Lock()
  113. avgFlying := as.avgFlying
  114. as.avgFlyingLock.Unlock()
  115. maxFlight := as.maxFlight()
  116. return int64(avgFlying) > maxFlight && atomic.LoadInt64(&as.flying) > maxFlight
  117. }
  118. func (as *adaptiveShedder) maxFlight() int64 {
  119. // windows = buckets per second
  120. // maxQPS = maxPASS * windows
  121. // minRT = min average response time in milliseconds
  122. // maxQPS * minRT / milliseconds_per_second
  123. return int64(math.Max(1, float64(as.maxPass()*as.windows)*(as.minRt()/1e3)))
  124. }
  125. func (as *adaptiveShedder) maxPass() int64 {
  126. var result float64 = 1
  127. as.passCounter.Reduce(func(b *collection.Bucket) {
  128. if b.Sum > result {
  129. result = b.Sum
  130. }
  131. })
  132. return int64(result)
  133. }
  134. func (as *adaptiveShedder) minRt() float64 {
  135. var result = defaultMinRt
  136. as.rtCounter.Reduce(func(b *collection.Bucket) {
  137. if b.Count <= 0 {
  138. return
  139. }
  140. avg := math.Round(b.Sum / float64(b.Count))
  141. if avg < result {
  142. result = avg
  143. }
  144. })
  145. return result
  146. }
  147. func (as *adaptiveShedder) shouldDrop() bool {
  148. if as.systemOverloaded() || as.stillHot() {
  149. if as.highThru() {
  150. flying := atomic.LoadInt64(&as.flying)
  151. as.avgFlyingLock.Lock()
  152. avgFlying := as.avgFlying
  153. as.avgFlyingLock.Unlock()
  154. msg := fmt.Sprintf(
  155. "dropreq, cpu: %d, maxPass: %d, minRt: %.2f, hot: %t, flying: %d, avgFlying: %.2f",
  156. stat.CpuUsage(), as.maxPass(), as.minRt(), as.stillHot(), flying, avgFlying)
  157. logx.Error(msg)
  158. stat.Report(msg)
  159. return true
  160. }
  161. }
  162. return false
  163. }
  164. func (as *adaptiveShedder) stillHot() bool {
  165. if !as.droppedRecently.True() {
  166. return false
  167. }
  168. dropTime := as.dropTime.Load()
  169. if dropTime == 0 {
  170. return false
  171. }
  172. hot := timex.Since(dropTime) < coolOffDuration
  173. if !hot {
  174. as.droppedRecently.Set(false)
  175. }
  176. return hot
  177. }
  178. func (as *adaptiveShedder) systemOverloaded() bool {
  179. return systemOverloadChecker(as.cpuThreshold)
  180. }
  181. func WithBuckets(buckets int) ShedderOption {
  182. return func(opts *shedderOptions) {
  183. opts.buckets = buckets
  184. }
  185. }
  186. func WithCpuThreshold(threshold int64) ShedderOption {
  187. return func(opts *shedderOptions) {
  188. opts.cpuThreshold = threshold
  189. }
  190. }
  191. func WithWindow(window time.Duration) ShedderOption {
  192. return func(opts *shedderOptions) {
  193. opts.window = window
  194. }
  195. }
  196. type promise struct {
  197. start time.Duration
  198. shedder *adaptiveShedder
  199. }
  200. func (p *promise) Fail() {
  201. p.shedder.addFlying(-1)
  202. }
  203. func (p *promise) Pass() {
  204. rt := float64(timex.Since(p.start)) / float64(time.Millisecond)
  205. p.shedder.addFlying(-1)
  206. p.shedder.rtCounter.Add(math.Ceil(rt))
  207. p.shedder.passCounter.Add(1)
  208. }