hostpool.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. // A Go package to intelligently and flexibly pool among multiple hosts from your Go application.
  2. // Host selection can operate in round robin or epsilon greedy mode, and unresponsive hosts are
  3. // avoided. A good overview of Epsilon Greedy is here http://stevehanov.ca/blog/index.php?id=132
  4. package hostpool
  5. import (
  6. "log"
  7. "sync"
  8. "time"
  9. )
  10. // Returns current version
  11. func Version() string {
  12. return "0.1"
  13. }
  14. // --- Response interfaces and structs ----
  15. // This interface represents the response from HostPool. You can retrieve the
  16. // hostname by calling Host(), and after making a request to the host you should
  17. // call Mark with any error encountered, which will inform the HostPool issuing
  18. // the HostPoolResponse of what happened to the request and allow it to update.
  19. type HostPoolResponse interface {
  20. Host() string
  21. Mark(error)
  22. hostPool() HostPool
  23. }
  24. type standardHostPoolResponse struct {
  25. host string
  26. sync.Once
  27. pool HostPool
  28. }
  29. // --- HostPool structs and interfaces ----
  30. // This is the main HostPool interface. Structs implementing this interface
  31. // allow you to Get a HostPoolResponse (which includes a hostname to use),
  32. // get the list of all Hosts, and use ResetAll to reset state.
  33. type HostPool interface {
  34. Get() HostPoolResponse
  35. // keep the marks separate so we can override independently
  36. markSuccess(HostPoolResponse)
  37. markFailed(HostPoolResponse)
  38. ResetAll()
  39. // ReturnUnhealthy when called with true will prevent an unhealthy node from
  40. // being returned and will instead return a nil HostPoolResponse. If using
  41. // this feature then you should check the result of Get for nil
  42. ReturnUnhealthy(v bool)
  43. Hosts() []string
  44. SetHosts([]string)
  45. // Close the hostpool and release all resources.
  46. Close()
  47. }
  48. type standardHostPool struct {
  49. sync.RWMutex
  50. hosts map[string]*hostEntry
  51. hostList []*hostEntry
  52. returnUnhealthy bool
  53. initialRetryDelay time.Duration
  54. maxRetryInterval time.Duration
  55. nextHostIndex int
  56. }
  57. // ------ constants -------------------
  58. const epsilonBuckets = 120
  59. const epsilonDecay = 0.90 // decay the exploration rate
  60. const minEpsilon = 0.01 // explore one percent of the time
  61. const initialEpsilon = 0.3
  62. const defaultDecayDuration = time.Duration(5) * time.Minute
  63. // Construct a basic HostPool using the hostnames provided
  64. func New(hosts []string) HostPool {
  65. p := &standardHostPool{
  66. returnUnhealthy: true,
  67. hosts: make(map[string]*hostEntry, len(hosts)),
  68. hostList: make([]*hostEntry, len(hosts)),
  69. initialRetryDelay: time.Duration(30) * time.Second,
  70. maxRetryInterval: time.Duration(900) * time.Second,
  71. }
  72. for i, h := range hosts {
  73. e := &hostEntry{
  74. host: h,
  75. retryDelay: p.initialRetryDelay,
  76. }
  77. p.hosts[h] = e
  78. p.hostList[i] = e
  79. }
  80. return p
  81. }
  82. func (r *standardHostPoolResponse) Host() string {
  83. return r.host
  84. }
  85. func (r *standardHostPoolResponse) hostPool() HostPool {
  86. return r.pool
  87. }
  88. func (r *standardHostPoolResponse) Mark(err error) {
  89. r.Do(func() {
  90. doMark(err, r)
  91. })
  92. }
  93. func doMark(err error, r HostPoolResponse) {
  94. if err == nil {
  95. r.hostPool().markSuccess(r)
  96. } else {
  97. r.hostPool().markFailed(r)
  98. }
  99. }
  100. // return an entry from the HostPool
  101. func (p *standardHostPool) Get() HostPoolResponse {
  102. p.Lock()
  103. defer p.Unlock()
  104. host := p.getRoundRobin()
  105. if host == "" {
  106. return nil
  107. }
  108. return &standardHostPoolResponse{host: host, pool: p}
  109. }
  110. func (p *standardHostPool) getRoundRobin() string {
  111. now := time.Now()
  112. hostCount := len(p.hostList)
  113. for i := range p.hostList {
  114. // iterate via sequenece from where we last iterated
  115. currentIndex := (i + p.nextHostIndex) % hostCount
  116. h := p.hostList[currentIndex]
  117. if !h.dead {
  118. p.nextHostIndex = currentIndex + 1
  119. return h.host
  120. }
  121. if h.nextRetry.Before(now) {
  122. h.willRetryHost(p.maxRetryInterval)
  123. p.nextHostIndex = currentIndex + 1
  124. return h.host
  125. }
  126. }
  127. // all hosts are down and returnUnhealhy is false then return no host
  128. if !p.returnUnhealthy {
  129. return ""
  130. }
  131. // all hosts are down. re-add them
  132. p.doResetAll()
  133. p.nextHostIndex = 0
  134. return p.hostList[0].host
  135. }
  136. func (p *standardHostPool) ResetAll() {
  137. p.Lock()
  138. defer p.Unlock()
  139. p.doResetAll()
  140. }
  141. func (p *standardHostPool) SetHosts(hosts []string) {
  142. p.Lock()
  143. defer p.Unlock()
  144. p.setHosts(hosts)
  145. }
  146. func (p *standardHostPool) ReturnUnhealthy(v bool) {
  147. p.Lock()
  148. defer p.Unlock()
  149. p.returnUnhealthy = v
  150. }
  151. func (p *standardHostPool) setHosts(hosts []string) {
  152. p.hosts = make(map[string]*hostEntry, len(hosts))
  153. p.hostList = make([]*hostEntry, len(hosts))
  154. for i, h := range hosts {
  155. e := &hostEntry{
  156. host: h,
  157. retryDelay: p.initialRetryDelay,
  158. }
  159. p.hosts[h] = e
  160. p.hostList[i] = e
  161. }
  162. }
  163. // this actually performs the logic to reset,
  164. // and should only be called when the lock has
  165. // already been acquired
  166. func (p *standardHostPool) doResetAll() {
  167. for _, h := range p.hosts {
  168. h.dead = false
  169. }
  170. }
  171. func (p *standardHostPool) Close() {
  172. for _, h := range p.hosts {
  173. h.dead = true
  174. }
  175. }
  176. func (p *standardHostPool) markSuccess(hostR HostPoolResponse) {
  177. host := hostR.Host()
  178. p.Lock()
  179. defer p.Unlock()
  180. h, ok := p.hosts[host]
  181. if !ok {
  182. log.Fatalf("host %s not in HostPool %v", host, p.Hosts())
  183. }
  184. h.dead = false
  185. }
  186. func (p *standardHostPool) markFailed(hostR HostPoolResponse) {
  187. host := hostR.Host()
  188. p.Lock()
  189. defer p.Unlock()
  190. h, ok := p.hosts[host]
  191. if !ok {
  192. log.Fatalf("host %s not in HostPool %v", host, p.Hosts())
  193. }
  194. if !h.dead {
  195. h.dead = true
  196. h.retryCount = 0
  197. h.retryDelay = p.initialRetryDelay
  198. h.nextRetry = time.Now().Add(h.retryDelay)
  199. }
  200. }
  201. func (p *standardHostPool) Hosts() []string {
  202. hosts := make([]string, 0, len(p.hosts))
  203. for host := range p.hosts {
  204. hosts = append(hosts, host)
  205. }
  206. return hosts
  207. }