123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232 |
- package breaker
- import (
- "errors"
- "fmt"
- "strings"
- "sync"
- "github.com/tal-tech/go-zero/core/mathx"
- "github.com/tal-tech/go-zero/core/proc"
- "github.com/tal-tech/go-zero/core/stat"
- "github.com/tal-tech/go-zero/core/stringx"
- "github.com/tal-tech/go-zero/core/timex"
- )
- const (
- numHistoryReasons = 5
- timeFormat = "15:04:05"
- )
- // ErrServiceUnavailable is returned when the Breaker state is open.
- var ErrServiceUnavailable = errors.New("circuit breaker is open")
- type (
- // Acceptable is the func to check if the error can be accepted.
- Acceptable func(err error) bool
- // A Breaker represents a circuit breaker.
- Breaker interface {
- // Name returns the name of the Breaker.
- Name() string
- // Allow checks if the request is allowed.
- // If allowed, a promise will be returned, the caller needs to call promise.Accept()
- // on success, or call promise.Reject() on failure.
- // If not allow, ErrServiceUnavailable will be returned.
- Allow() (Promise, error)
- // Do runs the given request if the Breaker accepts it.
- // Do returns an error instantly if the Breaker rejects the request.
- // If a panic occurs in the request, the Breaker handles it as an error
- // and causes the same panic again.
- Do(req func() error) error
- // DoWithAcceptable runs the given request if the Breaker accepts it.
- // DoWithAcceptable returns an error instantly if the Breaker rejects the request.
- // If a panic occurs in the request, the Breaker handles it as an error
- // and causes the same panic again.
- // acceptable checks if it's a successful call, even if the err is not nil.
- DoWithAcceptable(req func() error, acceptable Acceptable) error
- // DoWithFallback runs the given request if the Breaker accepts it.
- // DoWithFallback runs the fallback if the Breaker rejects the request.
- // If a panic occurs in the request, the Breaker handles it as an error
- // and causes the same panic again.
- DoWithFallback(req func() error, fallback func(err error) error) error
- // DoWithFallbackAcceptable runs the given request if the Breaker accepts it.
- // DoWithFallbackAcceptable runs the fallback if the Breaker rejects the request.
- // If a panic occurs in the request, the Breaker handles it as an error
- // and causes the same panic again.
- // acceptable checks if it's a successful call, even if the err is not nil.
- DoWithFallbackAcceptable(req func() error, fallback func(err error) error, acceptable Acceptable) error
- }
- // Option defines the method to customize a Breaker.
- Option func(breaker *circuitBreaker)
- // Promise interface defines the callbacks that returned by Breaker.Allow.
- Promise interface {
- // Accept tells the Breaker that the call is successful.
- Accept()
- // Reject tells the Breaker that the call is failed.
- Reject(reason string)
- }
- internalPromise interface {
- Accept()
- Reject()
- }
- circuitBreaker struct {
- name string
- throttle
- }
- internalThrottle interface {
- allow() (internalPromise, error)
- doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error
- }
- throttle interface {
- allow() (Promise, error)
- doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error
- }
- )
- // NewBreaker returns a Breaker object.
- // opts can be used to customize the Breaker.
- func NewBreaker(opts ...Option) Breaker {
- var b circuitBreaker
- for _, opt := range opts {
- opt(&b)
- }
- if len(b.name) == 0 {
- b.name = stringx.Rand()
- }
- b.throttle = newLoggedThrottle(b.name, newGoogleBreaker())
- return &b
- }
- func (cb *circuitBreaker) Allow() (Promise, error) {
- return cb.throttle.allow()
- }
- func (cb *circuitBreaker) Do(req func() error) error {
- return cb.throttle.doReq(req, nil, defaultAcceptable)
- }
- func (cb *circuitBreaker) DoWithAcceptable(req func() error, acceptable Acceptable) error {
- return cb.throttle.doReq(req, nil, acceptable)
- }
- func (cb *circuitBreaker) DoWithFallback(req func() error, fallback func(err error) error) error {
- return cb.throttle.doReq(req, fallback, defaultAcceptable)
- }
- func (cb *circuitBreaker) DoWithFallbackAcceptable(req func() error, fallback func(err error) error,
- acceptable Acceptable) error {
- return cb.throttle.doReq(req, fallback, acceptable)
- }
- func (cb *circuitBreaker) Name() string {
- return cb.name
- }
- // WithName returns a function to set the name of a Breaker.
- func WithName(name string) Option {
- return func(b *circuitBreaker) {
- b.name = name
- }
- }
- func defaultAcceptable(err error) bool {
- return err == nil
- }
- type loggedThrottle struct {
- name string
- internalThrottle
- errWin *errorWindow
- }
- func newLoggedThrottle(name string, t internalThrottle) loggedThrottle {
- return loggedThrottle{
- name: name,
- internalThrottle: t,
- errWin: new(errorWindow),
- }
- }
- func (lt loggedThrottle) allow() (Promise, error) {
- promise, err := lt.internalThrottle.allow()
- return promiseWithReason{
- promise: promise,
- errWin: lt.errWin,
- }, lt.logError(err)
- }
- func (lt loggedThrottle) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error {
- return lt.logError(lt.internalThrottle.doReq(req, fallback, func(err error) bool {
- accept := acceptable(err)
- if !accept {
- lt.errWin.add(err.Error())
- }
- return accept
- }))
- }
- func (lt loggedThrottle) logError(err error) error {
- if err == ErrServiceUnavailable {
- // if circuit open, not possible to have empty error window
- stat.Report(fmt.Sprintf(
- "proc(%s/%d), callee: %s, breaker is open and requests dropped\nlast errors:\n%s",
- proc.ProcessName(), proc.Pid(), lt.name, lt.errWin))
- }
- return err
- }
- type errorWindow struct {
- reasons [numHistoryReasons]string
- index int
- count int
- lock sync.Mutex
- }
- func (ew *errorWindow) add(reason string) {
- ew.lock.Lock()
- ew.reasons[ew.index] = fmt.Sprintf("%s %s", timex.Time().Format(timeFormat), reason)
- ew.index = (ew.index + 1) % numHistoryReasons
- ew.count = mathx.MinInt(ew.count+1, numHistoryReasons)
- ew.lock.Unlock()
- }
- func (ew *errorWindow) String() string {
- var reasons []string
- ew.lock.Lock()
- // reverse order
- for i := ew.index - 1; i >= ew.index-ew.count; i-- {
- reasons = append(reasons, ew.reasons[(i+numHistoryReasons)%numHistoryReasons])
- }
- ew.lock.Unlock()
- return strings.Join(reasons, "\n")
- }
- type promiseWithReason struct {
- promise internalPromise
- errWin *errorWindow
- }
- func (p promiseWithReason) Accept() {
- p.promise.Accept()
- }
- func (p promiseWithReason) Reject(reason string) {
- p.errWin.add(reason)
- p.promise.Reject()
- }
|