breaker.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. // Package breaker implements the circuit-breaker resiliency pattern for Go.
  2. package breaker
  3. import (
  4. "errors"
  5. "sync"
  6. "time"
  7. )
  8. // ErrBreakerOpen is the error returned from Run() when the function is not executed
  9. // because the breaker is currently open.
  10. var ErrBreakerOpen = errors.New("circuit breaker is open")
  11. type state int
  12. const (
  13. closed state = iota
  14. open
  15. halfOpen
  16. )
  17. // Breaker implements the circuit-breaker resiliency pattern
  18. type Breaker struct {
  19. errorThreshold, successThreshold int
  20. timeout time.Duration
  21. lock sync.RWMutex
  22. state state
  23. errors, successes int
  24. lastError time.Time
  25. }
  26. // New constructs a new circuit-breaker that starts closed.
  27. // From closed, the breaker opens if "errorThreshold" errors are seen
  28. // without an error-free period of at least "timeout". From open, the
  29. // breaker half-closes after "timeout". From half-open, the breaker closes
  30. // after "successThreshold" consecutive successes, or opens on a single error.
  31. func New(errorThreshold, successThreshold int, timeout time.Duration) *Breaker {
  32. return &Breaker{
  33. errorThreshold: errorThreshold,
  34. successThreshold: successThreshold,
  35. timeout: timeout,
  36. }
  37. }
  38. // Run will either return ErrBreakerOpen immediately if the circuit-breaker is
  39. // already open, or it will run the given function and pass along its return
  40. // value. It is safe to call Run concurrently on the same Breaker.
  41. func (b *Breaker) Run(x func() error) error {
  42. b.lock.RLock()
  43. state := b.state
  44. b.lock.RUnlock()
  45. if state == open {
  46. return ErrBreakerOpen
  47. }
  48. var panicValue interface{}
  49. result := func() error {
  50. defer func() {
  51. panicValue = recover()
  52. }()
  53. return x()
  54. }()
  55. if result == nil && panicValue == nil && state == closed {
  56. // short-circuit the normal, success path without contending
  57. // on the lock
  58. return nil
  59. }
  60. b.processResult(result, panicValue)
  61. if panicValue != nil {
  62. // as close as Go lets us come to a "rethrow" although unfortunately
  63. // we lose the original panicing location
  64. panic(panicValue)
  65. }
  66. return result
  67. }
  68. // Go will either return ErrBreakerOpen immediately if the circuit-breaker is
  69. // already open, or it will run the given function in a separate goroutine.
  70. // If the function is run, Go will return nil immediately, and will *not* return
  71. // the return value of the function. It is safe to call Go concurrently on the
  72. // same Breaker.
  73. func (b *Breaker) Go(x func() error) error {
  74. b.lock.RLock()
  75. state := b.state
  76. b.lock.RUnlock()
  77. if state == open {
  78. return ErrBreakerOpen
  79. }
  80. go func() {
  81. var panicValue interface{}
  82. result := func() error {
  83. defer func() {
  84. panicValue = recover()
  85. }()
  86. return x()
  87. }()
  88. if result == nil && panicValue == nil && state == closed {
  89. // short-circuit the normal, success path without
  90. // contending on the lock
  91. return
  92. }
  93. b.processResult(result, panicValue)
  94. if panicValue != nil {
  95. // as close as Go lets us come to a "rethrow" although
  96. // unfortunately we lose the original panicing location
  97. panic(panicValue)
  98. }
  99. }()
  100. return nil
  101. }
  102. func (b *Breaker) processResult(result error, panicValue interface{}) {
  103. b.lock.Lock()
  104. defer b.lock.Unlock()
  105. if result == nil && panicValue == nil {
  106. if b.state == halfOpen {
  107. b.successes++
  108. if b.successes == b.successThreshold {
  109. b.closeBreaker()
  110. }
  111. }
  112. } else {
  113. if b.errors > 0 {
  114. expiry := b.lastError.Add(b.timeout)
  115. if time.Now().After(expiry) {
  116. b.errors = 0
  117. }
  118. }
  119. switch b.state {
  120. case closed:
  121. b.errors++
  122. if b.errors == b.errorThreshold {
  123. b.openBreaker()
  124. } else {
  125. b.lastError = time.Now()
  126. }
  127. case halfOpen:
  128. b.openBreaker()
  129. }
  130. }
  131. }
  132. func (b *Breaker) openBreaker() {
  133. b.changeState(open)
  134. go b.timer()
  135. }
  136. func (b *Breaker) closeBreaker() {
  137. b.changeState(closed)
  138. }
  139. func (b *Breaker) timer() {
  140. time.Sleep(b.timeout)
  141. b.lock.Lock()
  142. defer b.lock.Unlock()
  143. b.changeState(halfOpen)
  144. }
  145. func (b *Breaker) changeState(newState state) {
  146. b.errors = 0
  147. b.successes = 0
  148. b.state = newState
  149. }