hostpool.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. // A Go package to intelligently and flexibly pool among multiple hosts from your Go application.
  2. // Host selection can operate in round robin or epsilon greedy mode, and unresponsive hosts are
  3. // avoided. A good overview of Epsilon Greedy is here http://stevehanov.ca/blog/index.php?id=132
  4. package hostpool
  5. import (
  6. "log"
  7. "sync"
  8. "time"
  9. )
  10. // --- Response interfaces and structs ----
  11. // This interface represents the response from HostPool. You can retrieve the
  12. // hostname by calling Host(), and after making a request to the host you should
  13. // call Mark with any error encountered, which will inform the HostPool issuing
  14. // the HostPoolResponse of what happened to the request and allow it to update.
  15. type HostPoolResponse interface {
  16. Host() string
  17. Mark(error)
  18. hostPool() HostPool
  19. }
  20. type standardHostPoolResponse struct {
  21. host string
  22. sync.Once
  23. pool HostPool
  24. }
  25. // --- HostPool structs and interfaces ----
  26. // This is the main HostPool interface. Structs implementing this interface
  27. // allow you to Get a HostPoolResponse (which includes a hostname to use),
  28. // get the list of all Hosts, and use ResetAll to reset state.
  29. type HostPool interface {
  30. Get() HostPoolResponse
  31. // keep the marks separate so we can override independently
  32. markSuccess(HostPoolResponse)
  33. markFailed(HostPoolResponse)
  34. ResetAll()
  35. Hosts() []string
  36. }
  37. type standardHostPool struct {
  38. sync.RWMutex
  39. hosts map[string]*hostEntry
  40. hostList []*hostEntry
  41. initialRetryDelay time.Duration
  42. maxRetryInterval time.Duration
  43. nextHostIndex int
  44. }
  45. // ------ constants -------------------
  46. const epsilonBuckets = 120
  47. const epsilonDecay = 0.90 // decay the exploration rate
  48. const minEpsilon = 0.01 // explore one percent of the time
  49. const initialEpsilon = 0.3
  50. const defaultDecayDuration = time.Duration(5) * time.Minute
  51. // Construct a basic HostPool using the hostnames provided
  52. func New(hosts []string) HostPool {
  53. p := &standardHostPool{
  54. hosts: make(map[string]*hostEntry, len(hosts)),
  55. hostList: make([]*hostEntry, len(hosts)),
  56. initialRetryDelay: time.Duration(30) * time.Second,
  57. maxRetryInterval: time.Duration(900) * time.Second,
  58. }
  59. for i, h := range hosts {
  60. e := &hostEntry{
  61. host: h,
  62. retryDelay: p.initialRetryDelay,
  63. }
  64. p.hosts[h] = e
  65. p.hostList[i] = e
  66. }
  67. return p
  68. }
  69. func (r *standardHostPoolResponse) Host() string {
  70. return r.host
  71. }
  72. func (r *standardHostPoolResponse) hostPool() HostPool {
  73. return r.pool
  74. }
  75. func (r *standardHostPoolResponse) Mark(err error) {
  76. r.Do(func() {
  77. doMark(err, r)
  78. })
  79. }
  80. func doMark(err error, r HostPoolResponse) {
  81. if err == nil {
  82. r.hostPool().markSuccess(r)
  83. } else {
  84. r.hostPool().markFailed(r)
  85. }
  86. }
  87. // return an entry from the HostPool
  88. func (p *standardHostPool) Get() HostPoolResponse {
  89. p.Lock()
  90. defer p.Unlock()
  91. host := p.getRoundRobin()
  92. return &standardHostPoolResponse{host: host, pool: p}
  93. }
  94. func (p *standardHostPool) getRoundRobin() string {
  95. now := time.Now()
  96. hostCount := len(p.hostList)
  97. for i := range p.hostList {
  98. // iterate via sequenece from where we last iterated
  99. currentIndex := (i + p.nextHostIndex) % hostCount
  100. h := p.hostList[currentIndex]
  101. if !h.dead {
  102. p.nextHostIndex = currentIndex + 1
  103. return h.host
  104. }
  105. if h.nextRetry.Before(now) {
  106. h.willRetryHost(p.maxRetryInterval)
  107. p.nextHostIndex = currentIndex + 1
  108. return h.host
  109. }
  110. }
  111. // all hosts are down. re-add them
  112. p.doResetAll()
  113. p.nextHostIndex = 0
  114. return p.hostList[0].host
  115. }
  116. func (p *standardHostPool) ResetAll() {
  117. p.Lock()
  118. defer p.Unlock()
  119. p.doResetAll()
  120. }
  121. // this actually performs the logic to reset,
  122. // and should only be called when the lock has
  123. // already been acquired
  124. func (p *standardHostPool) doResetAll() {
  125. for _, h := range p.hosts {
  126. h.dead = false
  127. }
  128. }
  129. func (p *standardHostPool) markSuccess(hostR HostPoolResponse) {
  130. host := hostR.Host()
  131. p.Lock()
  132. defer p.Unlock()
  133. h, ok := p.hosts[host]
  134. if !ok {
  135. log.Fatalf("host %s not in HostPool %v", host, p.Hosts())
  136. }
  137. h.dead = false
  138. }
  139. func (p *standardHostPool) markFailed(hostR HostPoolResponse) {
  140. host := hostR.Host()
  141. p.Lock()
  142. defer p.Unlock()
  143. h, ok := p.hosts[host]
  144. if !ok {
  145. log.Fatalf("host %s not in HostPool %v", host, p.Hosts())
  146. }
  147. if !h.dead {
  148. h.dead = true
  149. h.retryCount = 0
  150. h.retryDelay = p.initialRetryDelay
  151. h.nextRetry = time.Now().Add(h.retryDelay)
  152. }
  153. }
  154. func (p *standardHostPool) Hosts() []string {
  155. hosts := make([]string, len(p.hosts))
  156. for host, _ := range p.hosts {
  157. hosts = append(hosts, host)
  158. }
  159. return hosts
  160. }