hostpool.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. package hostpool
  2. import (
  3. "log"
  4. "sync"
  5. "time"
  6. )
  7. // --- Response interfaces and structs ----
  8. type HostPoolResponse interface {
  9. Host() string
  10. Mark(error)
  11. hostPool() HostPool
  12. }
  13. type standardHostPoolResponse struct {
  14. host string
  15. sync.Once
  16. pool HostPool
  17. }
  18. // --- HostPool structs and interfaces ----
  19. type HostPool interface {
  20. Get() HostPoolResponse
  21. // keep the marks separate so we can override independently
  22. markSuccess(HostPoolResponse)
  23. markFailed(HostPoolResponse)
  24. ResetAll()
  25. Hosts() []string
  26. }
  27. type standardHostPool struct {
  28. sync.RWMutex
  29. hosts map[string]*hostEntry
  30. hostList []*hostEntry
  31. initialRetryDelay time.Duration
  32. maxRetryInterval time.Duration
  33. nextHostIndex int
  34. }
  35. // ------ constants -------------------
  36. const epsilonBuckets = 120
  37. const epsilonDecay = 0.90 // decay the exploration rate
  38. const minEpsilon = 0.01 // explore one percent of the time
  39. const initialEpsilon = 0.3
  40. const defaultDecayDuration = time.Duration(5) * time.Minute
  41. func New(hosts []string) HostPool {
  42. p := &standardHostPool{
  43. hosts: make(map[string]*hostEntry, len(hosts)),
  44. hostList: make([]*hostEntry, len(hosts)),
  45. initialRetryDelay: time.Duration(30) * time.Second,
  46. maxRetryInterval: time.Duration(900) * time.Second,
  47. }
  48. for i, h := range hosts {
  49. e := &hostEntry{
  50. host: h,
  51. retryDelay: p.initialRetryDelay,
  52. }
  53. p.hosts[h] = e
  54. p.hostList[i] = e
  55. }
  56. return p
  57. }
  58. func (r *standardHostPoolResponse) Host() string {
  59. return r.host
  60. }
  61. func (r *standardHostPoolResponse) hostPool() HostPool {
  62. return r.pool
  63. }
  64. func (r *standardHostPoolResponse) Mark(err error) {
  65. r.Do(func() {
  66. doMark(err, r)
  67. })
  68. }
  69. func doMark(err error, r HostPoolResponse) {
  70. if err == nil {
  71. r.hostPool().markSuccess(r)
  72. } else {
  73. r.hostPool().markFailed(r)
  74. }
  75. }
  76. // return an upstream entry from the HostPool
  77. func (p *standardHostPool) Get() HostPoolResponse {
  78. p.Lock()
  79. defer p.Unlock()
  80. host := p.getRoundRobin()
  81. return &standardHostPoolResponse{host: host, pool: p}
  82. }
  83. func (p *standardHostPool) getRoundRobin() string {
  84. now := time.Now()
  85. hostCount := len(p.hostList)
  86. for i := range p.hostList {
  87. // iterate via sequenece from where we last iterated
  88. currentIndex := (i + p.nextHostIndex) % hostCount
  89. h := p.hostList[currentIndex]
  90. if !h.dead {
  91. p.nextHostIndex = currentIndex + 1
  92. return h.host
  93. }
  94. if h.nextRetry.Before(now) {
  95. h.willRetryHost(p.maxRetryInterval)
  96. p.nextHostIndex = currentIndex + 1
  97. return h.host
  98. }
  99. }
  100. // all hosts are down. re-add them
  101. p.doResetAll()
  102. p.nextHostIndex = 0
  103. return p.hostList[0].host
  104. }
  105. func (p *standardHostPool) ResetAll() {
  106. p.Lock()
  107. defer p.Unlock()
  108. p.doResetAll()
  109. }
  110. // this actually performs the logic to reset,
  111. // and should only be called when the lock has
  112. // already been acquired
  113. func (p *standardHostPool) doResetAll() {
  114. for _, h := range p.hosts {
  115. h.dead = false
  116. }
  117. }
  118. func (p *standardHostPool) markSuccess(hostR HostPoolResponse) {
  119. host := hostR.Host()
  120. p.Lock()
  121. defer p.Unlock()
  122. h, ok := p.hosts[host]
  123. if !ok {
  124. log.Fatalf("host %s not in HostPool %v", host, p.Hosts())
  125. }
  126. h.dead = false
  127. }
  128. func (p *standardHostPool) markFailed(hostR HostPoolResponse) {
  129. host := hostR.Host()
  130. p.Lock()
  131. defer p.Unlock()
  132. h, ok := p.hosts[host]
  133. if !ok {
  134. log.Fatalf("host %s not in HostPool %v", host, p.Hosts())
  135. }
  136. if !h.dead {
  137. h.dead = true
  138. h.retryCount = 0
  139. h.retryDelay = p.initialRetryDelay
  140. h.nextRetry = time.Now().Add(h.retryDelay)
  141. }
  142. }
  143. func (p *standardHostPool) Hosts() []string {
  144. hosts := make([]string, len(p.hosts))
  145. for host, _ := range p.hosts {
  146. hosts = append(hosts, host)
  147. }
  148. return hosts
  149. }