breaker.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. package breaker
  2. import (
  3. "errors"
  4. "fmt"
  5. "strings"
  6. "sync"
  7. "github.com/tal-tech/go-zero/core/mathx"
  8. "github.com/tal-tech/go-zero/core/proc"
  9. "github.com/tal-tech/go-zero/core/stat"
  10. "github.com/tal-tech/go-zero/core/stringx"
  11. "github.com/tal-tech/go-zero/core/timex"
  12. )
  13. const (
  14. numHistoryReasons = 5
  15. timeFormat = "15:04:05"
  16. )
  17. // ErrServiceUnavailable is returned when the CB state is open
  18. var ErrServiceUnavailable = errors.New("circuit breaker is open")
  19. type (
  20. Acceptable func(err error) bool
  21. Breaker interface {
  22. // Name returns the name of the Breaker.
  23. Name() string
  24. // Allow checks if the request is allowed.
  25. // If allowed, a promise will be returned, the caller needs to call promise.Accept()
  26. // on success, or call promise.Reject() on failure.
  27. // If not allow, ErrServiceUnavailable will be returned.
  28. Allow() (Promise, error)
  29. // Do runs the given request if the Breaker accepts it.
  30. // Do returns an error instantly if the Breaker rejects the request.
  31. // If a panic occurs in the request, the Breaker handles it as an error
  32. // and causes the same panic again.
  33. Do(req func() error) error
  34. // DoWithAcceptable runs the given request if the Breaker accepts it.
  35. // DoWithAcceptable returns an error instantly if the Breaker rejects the request.
  36. // If a panic occurs in the request, the Breaker handles it as an error
  37. // and causes the same panic again.
  38. // acceptable checks if it's a successful call, even if the err is not nil.
  39. DoWithAcceptable(req func() error, acceptable Acceptable) error
  40. // DoWithFallback runs the given request if the Breaker accepts it.
  41. // DoWithFallback runs the fallback if the Breaker rejects the request.
  42. // If a panic occurs in the request, the Breaker handles it as an error
  43. // and causes the same panic again.
  44. DoWithFallback(req func() error, fallback func(err error) error) error
  45. // DoWithFallbackAcceptable runs the given request if the Breaker accepts it.
  46. // DoWithFallbackAcceptable runs the fallback if the Breaker rejects the request.
  47. // If a panic occurs in the request, the Breaker handles it as an error
  48. // and causes the same panic again.
  49. // acceptable checks if it's a successful call, even if the err is not nil.
  50. DoWithFallbackAcceptable(req func() error, fallback func(err error) error, acceptable Acceptable) error
  51. }
  52. Option func(breaker *circuitBreaker)
  53. Promise interface {
  54. Accept()
  55. Reject(reason string)
  56. }
  57. internalPromise interface {
  58. Accept()
  59. Reject()
  60. }
  61. circuitBreaker struct {
  62. name string
  63. throttle
  64. }
  65. internalThrottle interface {
  66. allow() (internalPromise, error)
  67. doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error
  68. }
  69. throttle interface {
  70. allow() (Promise, error)
  71. doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error
  72. }
  73. )
  74. func NewBreaker(opts ...Option) Breaker {
  75. var b circuitBreaker
  76. for _, opt := range opts {
  77. opt(&b)
  78. }
  79. if len(b.name) == 0 {
  80. b.name = stringx.Rand()
  81. }
  82. b.throttle = newLoggedThrottle(b.name, newGoogleBreaker())
  83. return &b
  84. }
  85. func (cb *circuitBreaker) Allow() (Promise, error) {
  86. return cb.throttle.allow()
  87. }
  88. func (cb *circuitBreaker) Do(req func() error) error {
  89. return cb.throttle.doReq(req, nil, defaultAcceptable)
  90. }
  91. func (cb *circuitBreaker) DoWithAcceptable(req func() error, acceptable Acceptable) error {
  92. return cb.throttle.doReq(req, nil, acceptable)
  93. }
  94. func (cb *circuitBreaker) DoWithFallback(req func() error, fallback func(err error) error) error {
  95. return cb.throttle.doReq(req, fallback, defaultAcceptable)
  96. }
  97. func (cb *circuitBreaker) DoWithFallbackAcceptable(req func() error, fallback func(err error) error,
  98. acceptable Acceptable) error {
  99. return cb.throttle.doReq(req, fallback, acceptable)
  100. }
  101. func (cb *circuitBreaker) Name() string {
  102. return cb.name
  103. }
  104. func WithName(name string) Option {
  105. return func(b *circuitBreaker) {
  106. b.name = name
  107. }
  108. }
  109. func defaultAcceptable(err error) bool {
  110. return err == nil
  111. }
  112. type loggedThrottle struct {
  113. name string
  114. internalThrottle
  115. errWin *errorWindow
  116. }
  117. func newLoggedThrottle(name string, t internalThrottle) loggedThrottle {
  118. return loggedThrottle{
  119. name: name,
  120. internalThrottle: t,
  121. errWin: new(errorWindow),
  122. }
  123. }
  124. func (lt loggedThrottle) allow() (Promise, error) {
  125. promise, err := lt.internalThrottle.allow()
  126. return promiseWithReason{
  127. promise: promise,
  128. errWin: lt.errWin,
  129. }, lt.logError(err)
  130. }
  131. func (lt loggedThrottle) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error {
  132. return lt.logError(lt.internalThrottle.doReq(req, fallback, func(err error) bool {
  133. accept := acceptable(err)
  134. if !accept {
  135. lt.errWin.add(err.Error())
  136. }
  137. return accept
  138. }))
  139. }
  140. func (lt loggedThrottle) logError(err error) error {
  141. if err == ErrServiceUnavailable {
  142. // if circuit open, not possible to have empty error window
  143. stat.Report(fmt.Sprintf(
  144. "proc(%s/%d), callee: %s, breaker is open and requests dropped\nlast errors:\n%s",
  145. proc.ProcessName(), proc.Pid(), lt.name, lt.errWin))
  146. }
  147. return err
  148. }
  149. type errorWindow struct {
  150. reasons [numHistoryReasons]string
  151. index int
  152. count int
  153. lock sync.Mutex
  154. }
  155. func (ew *errorWindow) add(reason string) {
  156. ew.lock.Lock()
  157. ew.reasons[ew.index] = fmt.Sprintf("%s %s", timex.Time().Format(timeFormat), reason)
  158. ew.index = (ew.index + 1) % numHistoryReasons
  159. ew.count = mathx.MinInt(ew.count+1, numHistoryReasons)
  160. ew.lock.Unlock()
  161. }
  162. func (ew *errorWindow) String() string {
  163. var reasons []string
  164. ew.lock.Lock()
  165. // reverse order
  166. for i := ew.index - 1; i >= ew.index-ew.count; i-- {
  167. reasons = append(reasons, ew.reasons[(i+numHistoryReasons)%numHistoryReasons])
  168. }
  169. ew.lock.Unlock()
  170. return strings.Join(reasons, "\n")
  171. }
  172. type promiseWithReason struct {
  173. promise internalPromise
  174. errWin *errorWindow
  175. }
  176. func (p promiseWithReason) Accept() {
  177. p.promise.Accept()
  178. }
  179. func (p promiseWithReason) Reject(reason string) {
  180. p.errWin.add(reason)
  181. p.promise.Reject()
  182. }