hostpool.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. // A Go package to intelligently and flexibly pool among multiple hosts from your Go application.
  2. // Host selection can operate in round robin or epsilon greedy mode, and unresponsive hosts are
  3. // avoided. A good overview of Epsilon Greedy is here http://stevehanov.ca/blog/index.php?id=132
  4. package hostpool
  5. import (
  6. "log"
  7. "sync"
  8. "time"
  9. )
  10. // Returns current version
  11. func Version() string {
  12. return "0.1"
  13. }
  14. // --- Response interfaces and structs ----
  15. // This interface represents the response from HostPool. You can retrieve the
  16. // hostname by calling Host(), and after making a request to the host you should
  17. // call Mark with any error encountered, which will inform the HostPool issuing
  18. // the HostPoolResponse of what happened to the request and allow it to update.
  19. type HostPoolResponse interface {
  20. Host() string
  21. Mark(error)
  22. }
  23. type standardHostPoolResponse struct {
  24. host string
  25. pool *standardHostPool
  26. }
  27. // --- HostPool structs and interfaces ----
  28. // This is the main HostPool interface. Structs implementing this interface
  29. // allow you to Get a HostPoolResponse (which includes a hostname to use),
  30. // get the list of all Hosts, and use ResetAll to reset state.
  31. type HostPool interface {
  32. // Get() HostPoolResponse
  33. ResetAll()
  34. Hosts() []string
  35. ChooseNextHost() string
  36. DeliverHostResponse(string) HostPoolResponse
  37. }
  38. type standardHostPool struct {
  39. sync.RWMutex
  40. hosts map[string]*hostEntry
  41. hostList []*hostEntry
  42. initialRetryDelay time.Duration
  43. maxRetryInterval time.Duration
  44. nextHostIndex int
  45. }
  46. // ------ constants -------------------
  47. const epsilonBuckets = 120
  48. const epsilonDecay = 0.90 // decay the exploration rate
  49. const minEpsilon = 0.01 // explore one percent of the time
  50. const initialEpsilon = 0.3
  51. const defaultDecayDuration = time.Duration(5) * time.Minute
  52. // Construct a basic HostPool using the hostnames provided
  53. func New(hosts []string) HostPool {
  54. p := &standardHostPool{
  55. hosts: make(map[string]*hostEntry, len(hosts)),
  56. hostList: make([]*hostEntry, len(hosts)),
  57. initialRetryDelay: time.Duration(30) * time.Second,
  58. maxRetryInterval: time.Duration(900) * time.Second,
  59. }
  60. for i, h := range hosts {
  61. e := &hostEntry{
  62. host: h,
  63. retryDelay: p.initialRetryDelay,
  64. }
  65. p.hosts[h] = e
  66. p.hostList[i] = e
  67. }
  68. return p
  69. }
  70. func (r *standardHostPoolResponse) Host() string {
  71. return r.host
  72. }
  73. func (r *standardHostPoolResponse) Mark(err error) {
  74. if err == nil {
  75. r.pool.markSuccess(r)
  76. } else {
  77. r.pool.markFailed(r)
  78. }
  79. }
  80. // return an entry from the HostPool
  81. func Get(p HostPool) HostPoolResponse {
  82. host := p.ChooseNextHost()
  83. return p.DeliverHostResponse(host)
  84. }
  85. func (p *standardHostPool) ChooseNextHost() string {
  86. p.Lock()
  87. host := p.getRoundRobin()
  88. p.Unlock()
  89. return host
  90. }
  91. func (p *standardHostPool) getRoundRobin() string {
  92. now := time.Now()
  93. hostCount := len(p.hostList)
  94. for i := range p.hostList {
  95. // iterate via sequenece from where we last iterated
  96. currentIndex := (i + p.nextHostIndex) % hostCount
  97. h := p.hostList[currentIndex]
  98. if h.canTryHost(now) {
  99. p.nextHostIndex = currentIndex + 1
  100. return h.host
  101. }
  102. }
  103. // all hosts are down. re-add them
  104. p.doResetAll()
  105. p.nextHostIndex = 0
  106. return p.hostList[0].host
  107. }
  108. func (p *standardHostPool) ResetAll() {
  109. p.Lock()
  110. defer p.Unlock()
  111. p.doResetAll()
  112. }
  113. // this actually performs the logic to reset,
  114. // and should only be called when the lock has
  115. // already been acquired
  116. func (p *standardHostPool) doResetAll() {
  117. for _, h := range p.hosts {
  118. h.dead = false
  119. }
  120. }
  121. func (p *standardHostPool) markSuccess(hostR *standardHostPoolResponse) {
  122. host := hostR.Host()
  123. p.Lock()
  124. defer p.Unlock()
  125. h, ok := p.hosts[host]
  126. if !ok {
  127. log.Fatalf("host %s not in HostPool %v", host, p.Hosts())
  128. }
  129. h.dead = false
  130. }
  131. func (p *standardHostPool) markFailed(hostR *standardHostPoolResponse) {
  132. host := hostR.Host()
  133. p.Lock()
  134. defer p.Unlock()
  135. h, ok := p.hosts[host]
  136. if !ok {
  137. log.Fatalf("host %s not in HostPool %v", host, p.Hosts())
  138. }
  139. if !h.dead {
  140. h.dead = true
  141. h.retryCount = 0
  142. h.retryDelay = p.initialRetryDelay
  143. h.nextRetry = time.Now().Add(h.retryDelay)
  144. }
  145. }
  146. func (p *standardHostPool) Hosts() []string {
  147. hosts := make([]string, len(p.hosts))
  148. for host, _ := range p.hosts {
  149. hosts = append(hosts, host)
  150. }
  151. return hosts
  152. }
  153. func (p *standardHostPool) DeliverHostResponse(host string) HostPoolResponse {
  154. p.Lock()
  155. defer p.Unlock()
  156. h, ok := p.hosts[host]
  157. if !ok {
  158. log.Fatalf("host %s not in HostPool %v", host, p.Hosts())
  159. }
  160. now := time.Now()
  161. if h.dead && h.nextRetry.Before(now) {
  162. h.willRetryHost(p.maxRetryInterval)
  163. }
  164. return &standardHostPoolResponse{host: host, pool: p}
  165. }