hostpool.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. package hostpool
  2. import (
  3. "log"
  4. "sync"
  5. "time"
  6. )
  7. // --- Response interfaces and structs ----
  8. // This interface represents the response from HostPool. You can retrieve the
  9. // hostname by calling Host(), and after making a request to the host you should
  10. // call Mark with any error encountered, which will inform the HostPool issuing
  11. // the HostPoolResponse of what happened to the request and allow it to update.
  12. type HostPoolResponse interface {
  13. Host() string
  14. Mark(error)
  15. hostPool() HostPool
  16. }
  17. type standardHostPoolResponse struct {
  18. host string
  19. sync.Once
  20. pool HostPool
  21. }
  22. // --- HostPool structs and interfaces ----
  23. // This is the main HostPool interface. Structs implementing this interface
  24. // allow you to Get a HostPoolResponse (which includes a hostname to use),
  25. // get the list of all Hosts, and use ResetAll to reset state.
  26. type HostPool interface {
  27. Get() HostPoolResponse
  28. // keep the marks separate so we can override independently
  29. markSuccess(HostPoolResponse)
  30. markFailed(HostPoolResponse)
  31. ResetAll()
  32. Hosts() []string
  33. }
  34. type standardHostPool struct {
  35. sync.RWMutex
  36. hosts map[string]*hostEntry
  37. hostList []*hostEntry
  38. initialRetryDelay time.Duration
  39. maxRetryInterval time.Duration
  40. nextHostIndex int
  41. }
  42. // ------ constants -------------------
  43. const epsilonBuckets = 120
  44. const epsilonDecay = 0.90 // decay the exploration rate
  45. const minEpsilon = 0.01 // explore one percent of the time
  46. const initialEpsilon = 0.3
  47. const defaultDecayDuration = time.Duration(5) * time.Minute
  48. // Construct a basic HostPool using the hostnames provided
  49. func New(hosts []string) HostPool {
  50. p := &standardHostPool{
  51. hosts: make(map[string]*hostEntry, len(hosts)),
  52. hostList: make([]*hostEntry, len(hosts)),
  53. initialRetryDelay: time.Duration(30) * time.Second,
  54. maxRetryInterval: time.Duration(900) * time.Second,
  55. }
  56. for i, h := range hosts {
  57. e := &hostEntry{
  58. host: h,
  59. retryDelay: p.initialRetryDelay,
  60. }
  61. p.hosts[h] = e
  62. p.hostList[i] = e
  63. }
  64. return p
  65. }
  66. func (r *standardHostPoolResponse) Host() string {
  67. return r.host
  68. }
  69. func (r *standardHostPoolResponse) hostPool() HostPool {
  70. return r.pool
  71. }
  72. func (r *standardHostPoolResponse) Mark(err error) {
  73. r.Do(func() {
  74. doMark(err, r)
  75. })
  76. }
  77. func doMark(err error, r HostPoolResponse) {
  78. if err == nil {
  79. r.hostPool().markSuccess(r)
  80. } else {
  81. r.hostPool().markFailed(r)
  82. }
  83. }
  84. // return an entry from the HostPool
  85. func (p *standardHostPool) Get() HostPoolResponse {
  86. p.Lock()
  87. defer p.Unlock()
  88. host := p.getRoundRobin()
  89. return &standardHostPoolResponse{host: host, pool: p}
  90. }
  91. func (p *standardHostPool) getRoundRobin() string {
  92. now := time.Now()
  93. hostCount := len(p.hostList)
  94. for i := range p.hostList {
  95. // iterate via sequenece from where we last iterated
  96. currentIndex := (i + p.nextHostIndex) % hostCount
  97. h := p.hostList[currentIndex]
  98. if !h.dead {
  99. p.nextHostIndex = currentIndex + 1
  100. return h.host
  101. }
  102. if h.nextRetry.Before(now) {
  103. h.willRetryHost(p.maxRetryInterval)
  104. p.nextHostIndex = currentIndex + 1
  105. return h.host
  106. }
  107. }
  108. // all hosts are down. re-add them
  109. p.doResetAll()
  110. p.nextHostIndex = 0
  111. return p.hostList[0].host
  112. }
  113. func (p *standardHostPool) ResetAll() {
  114. p.Lock()
  115. defer p.Unlock()
  116. p.doResetAll()
  117. }
  118. // this actually performs the logic to reset,
  119. // and should only be called when the lock has
  120. // already been acquired
  121. func (p *standardHostPool) doResetAll() {
  122. for _, h := range p.hosts {
  123. h.dead = false
  124. }
  125. }
  126. func (p *standardHostPool) markSuccess(hostR HostPoolResponse) {
  127. host := hostR.Host()
  128. p.Lock()
  129. defer p.Unlock()
  130. h, ok := p.hosts[host]
  131. if !ok {
  132. log.Fatalf("host %s not in HostPool %v", host, p.Hosts())
  133. }
  134. h.dead = false
  135. }
  136. func (p *standardHostPool) markFailed(hostR HostPoolResponse) {
  137. host := hostR.Host()
  138. p.Lock()
  139. defer p.Unlock()
  140. h, ok := p.hosts[host]
  141. if !ok {
  142. log.Fatalf("host %s not in HostPool %v", host, p.Hosts())
  143. }
  144. if !h.dead {
  145. h.dead = true
  146. h.retryCount = 0
  147. h.retryDelay = p.initialRetryDelay
  148. h.nextRetry = time.Now().Add(h.retryDelay)
  149. }
  150. }
  151. func (p *standardHostPool) Hosts() []string {
  152. hosts := make([]string, len(p.hosts))
  153. for host, _ := range p.hosts {
  154. hosts = append(hosts, host)
  155. }
  156. return hosts
  157. }