googlebreaker.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. package breaker
  2. import (
  3. "math"
  4. "time"
  5. "github.com/tal-tech/go-zero/core/collection"
  6. "github.com/tal-tech/go-zero/core/mathx"
  7. )
  8. const (
  9. // 250ms for bucket duration
  10. window = time.Second * 10
  11. buckets = 40
  12. k = 1.5
  13. protection = 5
  14. )
  15. // googleBreaker is a netflixBreaker pattern from google.
  16. // see Client-Side Throttling section in https://landing.google.com/sre/sre-book/chapters/handling-overload/
  17. type googleBreaker struct {
  18. k float64
  19. stat *collection.RollingWindow
  20. proba *mathx.Proba
  21. }
  22. func newGoogleBreaker() *googleBreaker {
  23. bucketDuration := time.Duration(int64(window) / int64(buckets))
  24. st := collection.NewRollingWindow(buckets, bucketDuration)
  25. return &googleBreaker{
  26. stat: st,
  27. k: k,
  28. proba: mathx.NewProba(),
  29. }
  30. }
  31. func (b *googleBreaker) accept() error {
  32. accepts, total := b.history()
  33. weightedAccepts := b.k * float64(accepts)
  34. // https://landing.google.com/sre/sre-book/chapters/handling-overload/#eq2101
  35. dropRatio := math.Max(0, (float64(total-protection)-weightedAccepts)/float64(total+1))
  36. if dropRatio <= 0 {
  37. return nil
  38. }
  39. if b.proba.TrueOnProba(dropRatio) {
  40. return ErrServiceUnavailable
  41. }
  42. return nil
  43. }
  44. func (b *googleBreaker) allow() (internalPromise, error) {
  45. if err := b.accept(); err != nil {
  46. return nil, err
  47. }
  48. return googlePromise{
  49. b: b,
  50. }, nil
  51. }
  52. func (b *googleBreaker) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error {
  53. if err := b.accept(); err != nil {
  54. if fallback != nil {
  55. return fallback(err)
  56. } else {
  57. return err
  58. }
  59. }
  60. defer func() {
  61. if e := recover(); e != nil {
  62. b.markFailure()
  63. panic(e)
  64. }
  65. }()
  66. err := req()
  67. if acceptable(err) {
  68. b.markSuccess()
  69. } else {
  70. b.markFailure()
  71. }
  72. return err
  73. }
  74. func (b *googleBreaker) markSuccess() {
  75. b.stat.Add(1)
  76. }
  77. func (b *googleBreaker) markFailure() {
  78. b.stat.Add(0)
  79. }
  80. func (b *googleBreaker) history() (accepts int64, total int64) {
  81. b.stat.Reduce(func(b *collection.Bucket) {
  82. accepts += int64(b.Sum)
  83. total += b.Count
  84. })
  85. return
  86. }
  87. type googlePromise struct {
  88. b *googleBreaker
  89. }
  90. func (p googlePromise) Accept() {
  91. p.b.markSuccess()
  92. }
  93. func (p googlePromise) Reject() {
  94. p.b.markFailure()
  95. }