adaptiveshedder.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. package load
  2. import (
  3. "errors"
  4. "fmt"
  5. "math"
  6. "sync/atomic"
  7. "time"
  8. "github.com/tal-tech/go-zero/core/collection"
  9. "github.com/tal-tech/go-zero/core/logx"
  10. "github.com/tal-tech/go-zero/core/stat"
  11. "github.com/tal-tech/go-zero/core/syncx"
  12. "github.com/tal-tech/go-zero/core/timex"
  13. )
  14. const (
  15. defaultBuckets = 50
  16. defaultWindow = time.Second * 5
  17. // using 1000m notation, 900m is like 80%, keep it as var for unit test
  18. defaultCpuThreshold = 900
  19. defaultMinRt = float64(time.Second / time.Millisecond)
  20. // moving average hyperparameter beta for calculating requests on the fly
  21. flyingBeta = 0.9
  22. coolOffDuration = time.Second
  23. )
  24. var (
  25. // ErrServiceOverloaded is returned by Shedder.Allow when the service is overloaded.
  26. ErrServiceOverloaded = errors.New("service overloaded")
  27. // default to be enabled
  28. enabled = syncx.ForAtomicBool(true)
  29. // make it a variable for unit test
  30. systemOverloadChecker = func(cpuThreshold int64) bool {
  31. return stat.CpuUsage() >= cpuThreshold
  32. }
  33. )
  34. type (
  35. // A Promise interface is returned by Shedder.Allow to let callers tell
  36. // whether the processing request is successful or not.
  37. Promise interface {
  38. // Pass lets the caller tell that the call is successful.
  39. Pass()
  40. // Fail lets the caller tell that the call is failed.
  41. Fail()
  42. }
  43. // Shedder is the interface that wraps the Allow method.
  44. Shedder interface {
  45. // Allow returns the Promise if allowed, otherwise ErrServiceOverloaded.
  46. Allow() (Promise, error)
  47. }
  48. // ShedderOption lets caller customize the Shedder.
  49. ShedderOption func(opts *shedderOptions)
  50. shedderOptions struct {
  51. window time.Duration
  52. buckets int
  53. cpuThreshold int64
  54. }
  55. adaptiveShedder struct {
  56. cpuThreshold int64
  57. windows int64
  58. flying int64
  59. avgFlying float64
  60. avgFlyingLock syncx.SpinLock
  61. dropTime *syncx.AtomicDuration
  62. droppedRecently *syncx.AtomicBool
  63. passCounter *collection.RollingWindow
  64. rtCounter *collection.RollingWindow
  65. }
  66. )
  67. // Disable lets callers disable load shedding.
  68. func Disable() {
  69. enabled.Set(false)
  70. }
  71. // NewAdaptiveShedder returns an adaptive shedder.
  72. // opts can be used to customize the Shedder.
  73. func NewAdaptiveShedder(opts ...ShedderOption) Shedder {
  74. if !enabled.True() {
  75. return newNopShedder()
  76. }
  77. options := shedderOptions{
  78. window: defaultWindow,
  79. buckets: defaultBuckets,
  80. cpuThreshold: defaultCpuThreshold,
  81. }
  82. for _, opt := range opts {
  83. opt(&options)
  84. }
  85. bucketDuration := options.window / time.Duration(options.buckets)
  86. return &adaptiveShedder{
  87. cpuThreshold: options.cpuThreshold,
  88. windows: int64(time.Second / bucketDuration),
  89. dropTime: syncx.NewAtomicDuration(),
  90. droppedRecently: syncx.NewAtomicBool(),
  91. passCounter: collection.NewRollingWindow(options.buckets, bucketDuration,
  92. collection.IgnoreCurrentBucket()),
  93. rtCounter: collection.NewRollingWindow(options.buckets, bucketDuration,
  94. collection.IgnoreCurrentBucket()),
  95. }
  96. }
  97. // Allow implements Shedder.Allow.
  98. func (as *adaptiveShedder) Allow() (Promise, error) {
  99. if as.shouldDrop() {
  100. as.dropTime.Set(timex.Now())
  101. as.droppedRecently.Set(true)
  102. return nil, ErrServiceOverloaded
  103. }
  104. as.addFlying(1)
  105. return &promise{
  106. start: timex.Now(),
  107. shedder: as,
  108. }, nil
  109. }
  110. func (as *adaptiveShedder) addFlying(delta int64) {
  111. flying := atomic.AddInt64(&as.flying, delta)
  112. // update avgFlying when the request is finished.
  113. // this strategy makes avgFlying have a little bit lag against flying, and smoother.
  114. // when the flying requests increase rapidly, avgFlying increase slower, accept more requests.
  115. // when the flying requests drop rapidly, avgFlying drop slower, accept less requests.
  116. // it makes the service to serve as more requests as possible.
  117. if delta < 0 {
  118. as.avgFlyingLock.Lock()
  119. as.avgFlying = as.avgFlying*flyingBeta + float64(flying)*(1-flyingBeta)
  120. as.avgFlyingLock.Unlock()
  121. }
  122. }
  123. func (as *adaptiveShedder) highThru() bool {
  124. as.avgFlyingLock.Lock()
  125. avgFlying := as.avgFlying
  126. as.avgFlyingLock.Unlock()
  127. maxFlight := as.maxFlight()
  128. return int64(avgFlying) > maxFlight && atomic.LoadInt64(&as.flying) > maxFlight
  129. }
  130. func (as *adaptiveShedder) maxFlight() int64 {
  131. // windows = buckets per second
  132. // maxQPS = maxPASS * windows
  133. // minRT = min average response time in milliseconds
  134. // maxQPS * minRT / milliseconds_per_second
  135. return int64(math.Max(1, float64(as.maxPass()*as.windows)*(as.minRt()/1e3)))
  136. }
  137. func (as *adaptiveShedder) maxPass() int64 {
  138. var result float64 = 1
  139. as.passCounter.Reduce(func(b *collection.Bucket) {
  140. if b.Sum > result {
  141. result = b.Sum
  142. }
  143. })
  144. return int64(result)
  145. }
  146. func (as *adaptiveShedder) minRt() float64 {
  147. result := defaultMinRt
  148. as.rtCounter.Reduce(func(b *collection.Bucket) {
  149. if b.Count <= 0 {
  150. return
  151. }
  152. avg := math.Round(b.Sum / float64(b.Count))
  153. if avg < result {
  154. result = avg
  155. }
  156. })
  157. return result
  158. }
  159. func (as *adaptiveShedder) shouldDrop() bool {
  160. if as.systemOverloaded() || as.stillHot() {
  161. if as.highThru() {
  162. flying := atomic.LoadInt64(&as.flying)
  163. as.avgFlyingLock.Lock()
  164. avgFlying := as.avgFlying
  165. as.avgFlyingLock.Unlock()
  166. msg := fmt.Sprintf(
  167. "dropreq, cpu: %d, maxPass: %d, minRt: %.2f, hot: %t, flying: %d, avgFlying: %.2f",
  168. stat.CpuUsage(), as.maxPass(), as.minRt(), as.stillHot(), flying, avgFlying)
  169. logx.Error(msg)
  170. stat.Report(msg)
  171. return true
  172. }
  173. }
  174. return false
  175. }
  176. func (as *adaptiveShedder) stillHot() bool {
  177. if !as.droppedRecently.True() {
  178. return false
  179. }
  180. dropTime := as.dropTime.Load()
  181. if dropTime == 0 {
  182. return false
  183. }
  184. hot := timex.Since(dropTime) < coolOffDuration
  185. if !hot {
  186. as.droppedRecently.Set(false)
  187. }
  188. return hot
  189. }
  190. func (as *adaptiveShedder) systemOverloaded() bool {
  191. return systemOverloadChecker(as.cpuThreshold)
  192. }
  193. // WithBuckets customizes the Shedder with given number of buckets.
  194. func WithBuckets(buckets int) ShedderOption {
  195. return func(opts *shedderOptions) {
  196. opts.buckets = buckets
  197. }
  198. }
  199. // WithCpuThreshold customizes the Shedder with given cpu threshold.
  200. func WithCpuThreshold(threshold int64) ShedderOption {
  201. return func(opts *shedderOptions) {
  202. opts.cpuThreshold = threshold
  203. }
  204. }
  205. // WithWindow customizes the Shedder with given
  206. func WithWindow(window time.Duration) ShedderOption {
  207. return func(opts *shedderOptions) {
  208. opts.window = window
  209. }
  210. }
  211. type promise struct {
  212. start time.Duration
  213. shedder *adaptiveShedder
  214. }
  215. func (p *promise) Fail() {
  216. p.shedder.addFlying(-1)
  217. }
  218. func (p *promise) Pass() {
  219. rt := float64(timex.Since(p.start)) / float64(time.Millisecond)
  220. p.shedder.addFlying(-1)
  221. p.shedder.rtCounter.Add(math.Ceil(rt))
  222. p.shedder.passCounter.Add(1)
  223. }