googlebreaker.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. package breaker
  2. import (
  3. "math"
  4. "time"
  5. "github.com/tal-tech/go-zero/core/collection"
  6. "github.com/tal-tech/go-zero/core/mathx"
  7. )
  8. const (
  9. // 250ms for bucket duration
  10. window = time.Second * 10
  11. buckets = 40
  12. k = 1.5
  13. protection = 5
  14. )
  15. // googleBreaker is a netflixBreaker pattern from google.
  16. // see Client-Side Throttling section in https://landing.google.com/sre/sre-book/chapters/handling-overload/
  17. type googleBreaker struct {
  18. k float64
  19. stat *collection.RollingWindow
  20. proba *mathx.Proba
  21. }
  22. func newGoogleBreaker() *googleBreaker {
  23. bucketDuration := time.Duration(int64(window) / int64(buckets))
  24. st := collection.NewRollingWindow(buckets, bucketDuration)
  25. return &googleBreaker{
  26. stat: st,
  27. k: k,
  28. proba: mathx.NewProba(),
  29. }
  30. }
  31. func (b *googleBreaker) accept() error {
  32. accepts, total := b.history()
  33. weightedAccepts := b.k * float64(accepts)
  34. // https://landing.google.com/sre/sre-book/chapters/handling-overload/#eq2101
  35. dropRatio := math.Max(0, (float64(total-protection)-weightedAccepts)/float64(total+1))
  36. if dropRatio <= 0 {
  37. return nil
  38. }
  39. if b.proba.TrueOnProba(dropRatio) {
  40. return ErrServiceUnavailable
  41. }
  42. return nil
  43. }
  44. func (b *googleBreaker) allow() (internalPromise, error) {
  45. if err := b.accept(); err != nil {
  46. return nil, err
  47. }
  48. return googlePromise{
  49. b: b,
  50. }, nil
  51. }
  52. func (b *googleBreaker) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error {
  53. if err := b.accept(); err != nil {
  54. if fallback != nil {
  55. return fallback(err)
  56. }
  57. return err
  58. }
  59. defer func() {
  60. if e := recover(); e != nil {
  61. b.markFailure()
  62. panic(e)
  63. }
  64. }()
  65. err := req()
  66. if acceptable(err) {
  67. b.markSuccess()
  68. } else {
  69. b.markFailure()
  70. }
  71. return err
  72. }
  73. func (b *googleBreaker) markSuccess() {
  74. b.stat.Add(1)
  75. }
  76. func (b *googleBreaker) markFailure() {
  77. b.stat.Add(0)
  78. }
  79. func (b *googleBreaker) history() (accepts int64, total int64) {
  80. b.stat.Reduce(func(b *collection.Bucket) {
  81. accepts += int64(b.Sum)
  82. total += b.Count
  83. })
  84. return
  85. }
  86. type googlePromise struct {
  87. b *googleBreaker
  88. }
  89. func (p googlePromise) Accept() {
  90. p.b.markSuccess()
  91. }
  92. func (p googlePromise) Reject() {
  93. p.b.markFailure()
  94. }