breaker.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. package breaker
  2. import (
  3. "errors"
  4. "fmt"
  5. "strings"
  6. "sync"
  7. "github.com/tal-tech/go-zero/core/mathx"
  8. "github.com/tal-tech/go-zero/core/proc"
  9. "github.com/tal-tech/go-zero/core/stat"
  10. "github.com/tal-tech/go-zero/core/stringx"
  11. "github.com/tal-tech/go-zero/core/timex"
  12. )
  13. const (
  14. StateClosed State = iota
  15. StateOpen
  16. )
  17. const (
  18. numHistoryReasons = 5
  19. timeFormat = "15:04:05"
  20. )
  21. // ErrServiceUnavailable is returned when the CB state is open
  22. var ErrServiceUnavailable = errors.New("circuit breaker is open")
  23. type (
  24. State = int32
  25. Acceptable func(err error) bool
  26. Breaker interface {
  27. // Name returns the name of the netflixBreaker.
  28. Name() string
  29. // Allow checks if the request is allowed.
  30. // If allowed, a promise will be returned, the caller needs to call promise.Accept()
  31. // on success, or call promise.Reject() on failure.
  32. // If not allow, ErrServiceUnavailable will be returned.
  33. Allow() (Promise, error)
  34. // Do runs the given request if the netflixBreaker accepts it.
  35. // Do returns an error instantly if the netflixBreaker rejects the request.
  36. // If a panic occurs in the request, the netflixBreaker handles it as an error
  37. // and causes the same panic again.
  38. Do(req func() error) error
  39. // DoWithAcceptable runs the given request if the netflixBreaker accepts it.
  40. // Do returns an error instantly if the netflixBreaker rejects the request.
  41. // If a panic occurs in the request, the netflixBreaker handles it as an error
  42. // and causes the same panic again.
  43. // acceptable checks if it's a successful call, even if the err is not nil.
  44. DoWithAcceptable(req func() error, acceptable Acceptable) error
  45. // DoWithFallback runs the given request if the netflixBreaker accepts it.
  46. // DoWithFallback runs the fallback if the netflixBreaker rejects the request.
  47. // If a panic occurs in the request, the netflixBreaker handles it as an error
  48. // and causes the same panic again.
  49. DoWithFallback(req func() error, fallback func(err error) error) error
  50. // DoWithFallbackAcceptable runs the given request if the netflixBreaker accepts it.
  51. // DoWithFallback runs the fallback if the netflixBreaker rejects the request.
  52. // If a panic occurs in the request, the netflixBreaker handles it as an error
  53. // and causes the same panic again.
  54. // acceptable checks if it's a successful call, even if the err is not nil.
  55. DoWithFallbackAcceptable(req func() error, fallback func(err error) error, acceptable Acceptable) error
  56. }
  57. BreakerOption func(breaker *circuitBreaker)
  58. Promise interface {
  59. Accept()
  60. Reject(reason string)
  61. }
  62. internalPromise interface {
  63. Accept()
  64. Reject()
  65. }
  66. circuitBreaker struct {
  67. name string
  68. throttle
  69. }
  70. internalThrottle interface {
  71. allow() (internalPromise, error)
  72. doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error
  73. }
  74. throttle interface {
  75. allow() (Promise, error)
  76. doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error
  77. }
  78. )
  79. func NewBreaker(opts ...BreakerOption) Breaker {
  80. var b circuitBreaker
  81. for _, opt := range opts {
  82. opt(&b)
  83. }
  84. if len(b.name) == 0 {
  85. b.name = stringx.Rand()
  86. }
  87. b.throttle = newLoggedThrottle(b.name, newGoogleBreaker())
  88. return &b
  89. }
  90. func (cb *circuitBreaker) Allow() (Promise, error) {
  91. return cb.throttle.allow()
  92. }
  93. func (cb *circuitBreaker) Do(req func() error) error {
  94. return cb.throttle.doReq(req, nil, defaultAcceptable)
  95. }
  96. func (cb *circuitBreaker) DoWithAcceptable(req func() error, acceptable Acceptable) error {
  97. return cb.throttle.doReq(req, nil, acceptable)
  98. }
  99. func (cb *circuitBreaker) DoWithFallback(req func() error, fallback func(err error) error) error {
  100. return cb.throttle.doReq(req, fallback, defaultAcceptable)
  101. }
  102. func (cb *circuitBreaker) DoWithFallbackAcceptable(req func() error, fallback func(err error) error,
  103. acceptable Acceptable) error {
  104. return cb.throttle.doReq(req, fallback, acceptable)
  105. }
  106. func (cb *circuitBreaker) Name() string {
  107. return cb.name
  108. }
  109. func WithName(name string) BreakerOption {
  110. return func(b *circuitBreaker) {
  111. b.name = name
  112. }
  113. }
  114. func defaultAcceptable(err error) bool {
  115. return err == nil
  116. }
  117. type loggedThrottle struct {
  118. name string
  119. internalThrottle
  120. errWin *errorWindow
  121. }
  122. func newLoggedThrottle(name string, t internalThrottle) loggedThrottle {
  123. return loggedThrottle{
  124. name: name,
  125. internalThrottle: t,
  126. errWin: new(errorWindow),
  127. }
  128. }
  129. func (lt loggedThrottle) allow() (Promise, error) {
  130. promise, err := lt.internalThrottle.allow()
  131. return promiseWithReason{
  132. promise: promise,
  133. errWin: lt.errWin,
  134. }, lt.logError(err)
  135. }
  136. func (lt loggedThrottle) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error {
  137. return lt.logError(lt.internalThrottle.doReq(req, fallback, func(err error) bool {
  138. accept := acceptable(err)
  139. if !accept {
  140. lt.errWin.add(err.Error())
  141. }
  142. return accept
  143. }))
  144. }
  145. func (lt loggedThrottle) logError(err error) error {
  146. if err == ErrServiceUnavailable {
  147. // if circuit open, not possible to have empty error window
  148. stat.Report(fmt.Sprintf(
  149. "proc(%s/%d), callee: %s, breaker is open and requests dropped\nlast errors:\n%s",
  150. proc.ProcessName(), proc.Pid(), lt.name, lt.errWin))
  151. }
  152. return err
  153. }
  154. type errorWindow struct {
  155. reasons [numHistoryReasons]string
  156. index int
  157. count int
  158. lock sync.Mutex
  159. }
  160. func (ew *errorWindow) add(reason string) {
  161. ew.lock.Lock()
  162. ew.reasons[ew.index] = fmt.Sprintf("%s %s", timex.Time().Format(timeFormat), reason)
  163. ew.index = (ew.index + 1) % numHistoryReasons
  164. ew.count = mathx.MinInt(ew.count+1, numHistoryReasons)
  165. ew.lock.Unlock()
  166. }
  167. func (ew *errorWindow) String() string {
  168. var reasons []string
  169. ew.lock.Lock()
  170. // reverse order
  171. for i := ew.index - 1; i >= ew.index-ew.count; i-- {
  172. reasons = append(reasons, ew.reasons[(i+numHistoryReasons)%numHistoryReasons])
  173. }
  174. ew.lock.Unlock()
  175. return strings.Join(reasons, "\n")
  176. }
  177. type promiseWithReason struct {
  178. promise internalPromise
  179. errWin *errorWindow
  180. }
  181. func (p promiseWithReason) Accept() {
  182. p.promise.Accept()
  183. }
  184. func (p promiseWithReason) Reject(reason string) {
  185. p.errWin.add(reason)
  186. p.promise.Reject()
  187. }