123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125 |
- package breaker
- import (
- "math"
- "sync/atomic"
- "time"
- "github.com/tal-tech/go-zero/core/collection"
- "github.com/tal-tech/go-zero/core/mathx"
- )
- const (
- // 250ms for bucket duration
- window = time.Second * 10
- buckets = 40
- k = 1.5
- protection = 5
- )
- // googleBreaker is a netflixBreaker pattern from google.
- // see Client-Side Throttling section in https://landing.google.com/sre/sre-book/chapters/handling-overload/
- type googleBreaker struct {
- k float64
- state int32
- stat *collection.RollingWindow
- proba *mathx.Proba
- }
- func newGoogleBreaker() *googleBreaker {
- bucketDuration := time.Duration(int64(window) / int64(buckets))
- st := collection.NewRollingWindow(buckets, bucketDuration)
- return &googleBreaker{
- stat: st,
- k: k,
- state: StateClosed,
- proba: mathx.NewProba(),
- }
- }
- func (b *googleBreaker) accept() error {
- accepts, total := b.history()
- weightedAccepts := b.k * float64(accepts)
- // https://landing.google.com/sre/sre-book/chapters/handling-overload/#eq2101
- dropRatio := math.Max(0, (float64(total-protection)-weightedAccepts)/float64(total+1))
- if dropRatio <= 0 {
- if atomic.LoadInt32(&b.state) == StateOpen {
- atomic.CompareAndSwapInt32(&b.state, StateOpen, StateClosed)
- }
- return nil
- }
- if atomic.LoadInt32(&b.state) == StateClosed {
- atomic.CompareAndSwapInt32(&b.state, StateClosed, StateOpen)
- }
- if b.proba.TrueOnProba(dropRatio) {
- return ErrServiceUnavailable
- }
- return nil
- }
- func (b *googleBreaker) allow() (internalPromise, error) {
- if err := b.accept(); err != nil {
- return nil, err
- }
- return googlePromise{
- b: b,
- }, nil
- }
- func (b *googleBreaker) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error {
- if err := b.accept(); err != nil {
- if fallback != nil {
- return fallback(err)
- } else {
- return err
- }
- }
- defer func() {
- if e := recover(); e != nil {
- b.markFailure()
- panic(e)
- }
- }()
- err := req()
- if acceptable(err) {
- b.markSuccess()
- } else {
- b.markFailure()
- }
- return err
- }
- func (b *googleBreaker) markSuccess() {
- b.stat.Add(1)
- }
- func (b *googleBreaker) markFailure() {
- b.stat.Add(0)
- }
- func (b *googleBreaker) history() (accepts int64, total int64) {
- b.stat.Reduce(func(b *collection.Bucket) {
- accepts += int64(b.Sum)
- total += b.Count
- })
- return
- }
- type googlePromise struct {
- b *googleBreaker
- }
- func (p googlePromise) Accept() {
- p.b.markSuccess()
- }
- func (p googlePromise) Reject() {
- p.b.markFailure()
- }
|