breaker.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. package breaker
  2. import (
  3. "errors"
  4. "fmt"
  5. "strings"
  6. "sync"
  7. "github.com/tal-tech/go-zero/core/mathx"
  8. "github.com/tal-tech/go-zero/core/proc"
  9. "github.com/tal-tech/go-zero/core/stat"
  10. "github.com/tal-tech/go-zero/core/stringx"
  11. "github.com/tal-tech/go-zero/core/timex"
  12. )
  13. const (
  14. numHistoryReasons = 5
  15. timeFormat = "15:04:05"
  16. )
  17. // ErrServiceUnavailable is returned when the Breaker state is open.
  18. var ErrServiceUnavailable = errors.New("circuit breaker is open")
  19. type (
  20. // Acceptable is the func to check if the error can be accepted.
  21. Acceptable func(err error) bool
  22. // A Breaker represents a circuit breaker.
  23. Breaker interface {
  24. // Name returns the name of the Breaker.
  25. Name() string
  26. // Allow checks if the request is allowed.
  27. // If allowed, a promise will be returned, the caller needs to call promise.Accept()
  28. // on success, or call promise.Reject() on failure.
  29. // If not allow, ErrServiceUnavailable will be returned.
  30. Allow() (Promise, error)
  31. // Do runs the given request if the Breaker accepts it.
  32. // Do returns an error instantly if the Breaker rejects the request.
  33. // If a panic occurs in the request, the Breaker handles it as an error
  34. // and causes the same panic again.
  35. Do(req func() error) error
  36. // DoWithAcceptable runs the given request if the Breaker accepts it.
  37. // DoWithAcceptable returns an error instantly if the Breaker rejects the request.
  38. // If a panic occurs in the request, the Breaker handles it as an error
  39. // and causes the same panic again.
  40. // acceptable checks if it's a successful call, even if the err is not nil.
  41. DoWithAcceptable(req func() error, acceptable Acceptable) error
  42. // DoWithFallback runs the given request if the Breaker accepts it.
  43. // DoWithFallback runs the fallback if the Breaker rejects the request.
  44. // If a panic occurs in the request, the Breaker handles it as an error
  45. // and causes the same panic again.
  46. DoWithFallback(req func() error, fallback func(err error) error) error
  47. // DoWithFallbackAcceptable runs the given request if the Breaker accepts it.
  48. // DoWithFallbackAcceptable runs the fallback if the Breaker rejects the request.
  49. // If a panic occurs in the request, the Breaker handles it as an error
  50. // and causes the same panic again.
  51. // acceptable checks if it's a successful call, even if the err is not nil.
  52. DoWithFallbackAcceptable(req func() error, fallback func(err error) error, acceptable Acceptable) error
  53. }
  54. // Option defines the method to customize a Breaker.
  55. Option func(breaker *circuitBreaker)
  56. // Promise interface defines the callbacks that returned by Breaker.Allow.
  57. Promise interface {
  58. // Accept tells the Breaker that the call is successful.
  59. Accept()
  60. // Reject tells the Breaker that the call is failed.
  61. Reject(reason string)
  62. }
  63. internalPromise interface {
  64. Accept()
  65. Reject()
  66. }
  67. circuitBreaker struct {
  68. name string
  69. throttle
  70. }
  71. internalThrottle interface {
  72. allow() (internalPromise, error)
  73. doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error
  74. }
  75. throttle interface {
  76. allow() (Promise, error)
  77. doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error
  78. }
  79. )
  80. // NewBreaker returns a Breaker object.
  81. // opts can be used to customize the Breaker.
  82. func NewBreaker(opts ...Option) Breaker {
  83. var b circuitBreaker
  84. for _, opt := range opts {
  85. opt(&b)
  86. }
  87. if len(b.name) == 0 {
  88. b.name = stringx.Rand()
  89. }
  90. b.throttle = newLoggedThrottle(b.name, newGoogleBreaker())
  91. return &b
  92. }
  93. func (cb *circuitBreaker) Allow() (Promise, error) {
  94. return cb.throttle.allow()
  95. }
  96. func (cb *circuitBreaker) Do(req func() error) error {
  97. return cb.throttle.doReq(req, nil, defaultAcceptable)
  98. }
  99. func (cb *circuitBreaker) DoWithAcceptable(req func() error, acceptable Acceptable) error {
  100. return cb.throttle.doReq(req, nil, acceptable)
  101. }
  102. func (cb *circuitBreaker) DoWithFallback(req func() error, fallback func(err error) error) error {
  103. return cb.throttle.doReq(req, fallback, defaultAcceptable)
  104. }
  105. func (cb *circuitBreaker) DoWithFallbackAcceptable(req func() error, fallback func(err error) error,
  106. acceptable Acceptable) error {
  107. return cb.throttle.doReq(req, fallback, acceptable)
  108. }
  109. func (cb *circuitBreaker) Name() string {
  110. return cb.name
  111. }
  112. // WithName returns a function to set the name of a Breaker.
  113. func WithName(name string) Option {
  114. return func(b *circuitBreaker) {
  115. b.name = name
  116. }
  117. }
  118. func defaultAcceptable(err error) bool {
  119. return err == nil
  120. }
  121. type loggedThrottle struct {
  122. name string
  123. internalThrottle
  124. errWin *errorWindow
  125. }
  126. func newLoggedThrottle(name string, t internalThrottle) loggedThrottle {
  127. return loggedThrottle{
  128. name: name,
  129. internalThrottle: t,
  130. errWin: new(errorWindow),
  131. }
  132. }
  133. func (lt loggedThrottle) allow() (Promise, error) {
  134. promise, err := lt.internalThrottle.allow()
  135. return promiseWithReason{
  136. promise: promise,
  137. errWin: lt.errWin,
  138. }, lt.logError(err)
  139. }
  140. func (lt loggedThrottle) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error {
  141. return lt.logError(lt.internalThrottle.doReq(req, fallback, func(err error) bool {
  142. accept := acceptable(err)
  143. if !accept {
  144. lt.errWin.add(err.Error())
  145. }
  146. return accept
  147. }))
  148. }
  149. func (lt loggedThrottle) logError(err error) error {
  150. if err == ErrServiceUnavailable {
  151. // if circuit open, not possible to have empty error window
  152. stat.Report(fmt.Sprintf(
  153. "proc(%s/%d), callee: %s, breaker is open and requests dropped\nlast errors:\n%s",
  154. proc.ProcessName(), proc.Pid(), lt.name, lt.errWin))
  155. }
  156. return err
  157. }
  158. type errorWindow struct {
  159. reasons [numHistoryReasons]string
  160. index int
  161. count int
  162. lock sync.Mutex
  163. }
  164. func (ew *errorWindow) add(reason string) {
  165. ew.lock.Lock()
  166. ew.reasons[ew.index] = fmt.Sprintf("%s %s", timex.Time().Format(timeFormat), reason)
  167. ew.index = (ew.index + 1) % numHistoryReasons
  168. ew.count = mathx.MinInt(ew.count+1, numHistoryReasons)
  169. ew.lock.Unlock()
  170. }
  171. func (ew *errorWindow) String() string {
  172. var reasons []string
  173. ew.lock.Lock()
  174. // reverse order
  175. for i := ew.index - 1; i >= ew.index-ew.count; i-- {
  176. reasons = append(reasons, ew.reasons[(i+numHistoryReasons)%numHistoryReasons])
  177. }
  178. ew.lock.Unlock()
  179. return strings.Join(reasons, "\n")
  180. }
  181. type promiseWithReason struct {
  182. promise internalPromise
  183. errWin *errorWindow
  184. }
  185. func (p promiseWithReason) Accept() {
  186. p.promise.Accept()
  187. }
  188. func (p promiseWithReason) Reject(reason string) {
  189. p.errWin.add(reason)
  190. p.promise.Reject()
  191. }