hostpool.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. // A Go package to intelligently and flexibly pool among multiple hosts from your Go application.
  2. // Host selection can operate in round robin or epsilon greedy mode, and unresponsive hosts are
  3. // avoided. A good overview of Epsilon Greedy is here http://stevehanov.ca/blog/index.php?id=132
  4. package hostpool
  5. import (
  6. "log"
  7. "sync"
  8. "time"
  9. )
  10. // Returns current version
  11. func Version() string {
  12. return "0.1"
  13. }
  14. // --- Response interfaces and structs ----
  15. // This interface represents the response from HostPool. You can retrieve the
  16. // hostname by calling Host(), and after making a request to the host you should
  17. // call Mark with any error encountered, which will inform the HostPool issuing
  18. // the HostPoolResponse of what happened to the request and allow it to update.
  19. type HostPoolResponse interface {
  20. Host() string
  21. Mark(error)
  22. hostPool() HostPool
  23. }
  24. type standardHostPoolResponse struct {
  25. host string
  26. sync.Once
  27. pool HostPool
  28. }
  29. // --- HostPool structs and interfaces ----
  30. // This is the main HostPool interface. Structs implementing this interface
  31. // allow you to Get a HostPoolResponse (which includes a hostname to use),
  32. // get the list of all Hosts, and use ResetAll to reset state.
  33. type HostPool interface {
  34. Get() HostPoolResponse
  35. // keep the marks separate so we can override independently
  36. markSuccess(HostPoolResponse)
  37. markFailed(HostPoolResponse)
  38. ResetAll()
  39. // ReturnUnhealthy when called with true will prevent an unhealthy node from
  40. // being returned and will instead return a nil HostPoolResponse. If using
  41. // this feature then you should check the result of Get for nil
  42. ReturnUnhealthy(v bool)
  43. Hosts() []string
  44. SetHosts([]string)
  45. }
  46. type standardHostPool struct {
  47. sync.RWMutex
  48. hosts map[string]*hostEntry
  49. hostList []*hostEntry
  50. returnUnhealthy bool
  51. initialRetryDelay time.Duration
  52. maxRetryInterval time.Duration
  53. nextHostIndex int
  54. }
  55. // ------ constants -------------------
  56. const epsilonBuckets = 120
  57. const epsilonDecay = 0.90 // decay the exploration rate
  58. const minEpsilon = 0.01 // explore one percent of the time
  59. const initialEpsilon = 0.3
  60. const defaultDecayDuration = time.Duration(5) * time.Minute
  61. // Construct a basic HostPool using the hostnames provided
  62. func New(hosts []string) HostPool {
  63. p := &standardHostPool{
  64. returnUnhealthy: true,
  65. hosts: make(map[string]*hostEntry, len(hosts)),
  66. hostList: make([]*hostEntry, len(hosts)),
  67. initialRetryDelay: time.Duration(30) * time.Second,
  68. maxRetryInterval: time.Duration(900) * time.Second,
  69. }
  70. for i, h := range hosts {
  71. e := &hostEntry{
  72. host: h,
  73. retryDelay: p.initialRetryDelay,
  74. }
  75. p.hosts[h] = e
  76. p.hostList[i] = e
  77. }
  78. return p
  79. }
  80. func (r *standardHostPoolResponse) Host() string {
  81. return r.host
  82. }
  83. func (r *standardHostPoolResponse) hostPool() HostPool {
  84. return r.pool
  85. }
  86. func (r *standardHostPoolResponse) Mark(err error) {
  87. r.Do(func() {
  88. doMark(err, r)
  89. })
  90. }
  91. func doMark(err error, r HostPoolResponse) {
  92. if err == nil {
  93. r.hostPool().markSuccess(r)
  94. } else {
  95. r.hostPool().markFailed(r)
  96. }
  97. }
  98. // return an entry from the HostPool
  99. func (p *standardHostPool) Get() HostPoolResponse {
  100. p.Lock()
  101. defer p.Unlock()
  102. host := p.getRoundRobin()
  103. if host == "" {
  104. return nil
  105. }
  106. return &standardHostPoolResponse{host: host, pool: p}
  107. }
  108. func (p *standardHostPool) getRoundRobin() string {
  109. now := time.Now()
  110. hostCount := len(p.hostList)
  111. for i := range p.hostList {
  112. // iterate via sequenece from where we last iterated
  113. currentIndex := (i + p.nextHostIndex) % hostCount
  114. h := p.hostList[currentIndex]
  115. if !h.dead {
  116. p.nextHostIndex = currentIndex + 1
  117. return h.host
  118. }
  119. if h.nextRetry.Before(now) {
  120. h.willRetryHost(p.maxRetryInterval)
  121. p.nextHostIndex = currentIndex + 1
  122. return h.host
  123. }
  124. }
  125. // all hosts are down and returnUnhealhy is false then return no host
  126. if p.returnUnhealthy {
  127. return ""
  128. }
  129. // all hosts are down. re-add them
  130. p.doResetAll()
  131. p.nextHostIndex = 0
  132. return p.hostList[0].host
  133. }
  134. func (p *standardHostPool) ResetAll() {
  135. p.Lock()
  136. defer p.Unlock()
  137. p.doResetAll()
  138. }
  139. func (p *standardHostPool) SetHosts(hosts []string) {
  140. p.Lock()
  141. defer p.Unlock()
  142. p.setHosts(hosts)
  143. }
  144. func (p *standardHostPool) ReturnUnhealthy(v bool) {
  145. p.Lock()
  146. defer p.Unlock()
  147. p.returnUnhealthy = v
  148. }
  149. func (p *standardHostPool) setHosts(hosts []string) {
  150. p.hosts = make(map[string]*hostEntry, len(hosts))
  151. p.hostList = make([]*hostEntry, len(hosts))
  152. for i, h := range hosts {
  153. e := &hostEntry{
  154. host: h,
  155. retryDelay: p.initialRetryDelay,
  156. }
  157. p.hosts[h] = e
  158. p.hostList[i] = e
  159. }
  160. }
  161. // this actually performs the logic to reset,
  162. // and should only be called when the lock has
  163. // already been acquired
  164. func (p *standardHostPool) doResetAll() {
  165. for _, h := range p.hosts {
  166. h.dead = false
  167. }
  168. }
  169. func (p *standardHostPool) markSuccess(hostR HostPoolResponse) {
  170. host := hostR.Host()
  171. p.Lock()
  172. defer p.Unlock()
  173. h, ok := p.hosts[host]
  174. if !ok {
  175. log.Fatalf("host %s not in HostPool %v", host, p.Hosts())
  176. }
  177. h.dead = false
  178. }
  179. func (p *standardHostPool) markFailed(hostR HostPoolResponse) {
  180. host := hostR.Host()
  181. p.Lock()
  182. defer p.Unlock()
  183. h, ok := p.hosts[host]
  184. if !ok {
  185. log.Fatalf("host %s not in HostPool %v", host, p.Hosts())
  186. }
  187. if !h.dead {
  188. h.dead = true
  189. h.retryCount = 0
  190. h.retryDelay = p.initialRetryDelay
  191. h.nextRetry = time.Now().Add(h.retryDelay)
  192. }
  193. }
  194. func (p *standardHostPool) Hosts() []string {
  195. hosts := make([]string, len(p.hosts))
  196. for host := range p.hosts {
  197. hosts = append(hosts, host)
  198. }
  199. return hosts
  200. }