hostpool.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. package hostpool
  2. import (
  3. "log"
  4. "math"
  5. "sort"
  6. "sync"
  7. "time"
  8. )
  9. // --- timer: this just exists for testing
  10. type timer interface {
  11. between(time.Time, time.Time) time.Duration
  12. }
  13. type realTimer struct{}
  14. // --- Response interfaces and structs ----
  15. type HostPoolResponse interface {
  16. Host() string
  17. Mark(error)
  18. hostPool() HostPool
  19. }
  20. type standardHostPoolResponse struct {
  21. host string
  22. sync.Once
  23. pool HostPool
  24. }
  25. // --- HostPool structs and interfaces ----
  26. type HostPool interface {
  27. Get() HostPoolResponse
  28. // keep the marks separate so we can override independently
  29. markSuccess(HostPoolResponse)
  30. markFailed(HostPoolResponse)
  31. ResetAll()
  32. Hosts() []string
  33. lookupHost(string) HostEntry
  34. // setHostMap(map[string]HostEntry)
  35. sync.Locker
  36. }
  37. type standardHostPool struct {
  38. sync.RWMutex
  39. hosts map[string]HostEntry
  40. initialRetryDelay time.Duration
  41. maxRetryInterval time.Duration
  42. nextHostIndex int
  43. }
  44. // --- Value Calculators -----------------
  45. type EpsilonValueCalculator interface {
  46. CalcValueFromAvgResponseTime(float64) float64
  47. }
  48. type LinearEpsilonValueCalculator struct{}
  49. type LogEpsilonValueCalculator struct{ LinearEpsilonValueCalculator }
  50. type PolynomialEpsilonValueCalculator struct {
  51. LinearEpsilonValueCalculator
  52. exp float64 // the exponent to which we will raise the value to reweight
  53. }
  54. func New(hosts []string) HostPool {
  55. p := &standardHostPool{
  56. hosts: make(map[string]HostEntry, len(hosts)),
  57. initialRetryDelay: time.Duration(30) * time.Second,
  58. maxRetryInterval: time.Duration(900) * time.Second,
  59. }
  60. for _, h := range hosts {
  61. e := newHostEntry(h, p.initialRetryDelay, p.maxRetryInterval)
  62. p.hosts[h] = e
  63. }
  64. return p
  65. }
  66. func (r *standardHostPoolResponse) Host() string {
  67. return r.host
  68. }
  69. func (r *standardHostPoolResponse) hostPool() HostPool {
  70. return r.pool
  71. }
  72. func (r *standardHostPoolResponse) Mark(err error) {
  73. r.Do(func() {
  74. doMark(err, r)
  75. })
  76. }
  77. func doMark(err error, r HostPoolResponse) {
  78. if err == nil {
  79. r.hostPool().markSuccess(r)
  80. } else {
  81. r.hostPool().markFailed(r)
  82. }
  83. }
  84. func (r *epsilonHostPoolResponse) Mark(err error) {
  85. r.Do(func() {
  86. r.ended = time.Now()
  87. doMark(err, r)
  88. })
  89. }
  90. func (rt *realTimer) between(start time.Time, end time.Time) time.Duration {
  91. return end.Sub(start)
  92. }
  93. func (p *epsilonGreedyHostPool) SetEpsilon(newEpsilon float32) {
  94. p.Lock()
  95. defer p.Unlock()
  96. p.epsilon = newEpsilon
  97. }
  98. // return an upstream entry from the HostPool
  99. func (p *standardHostPool) Get() HostPoolResponse {
  100. p.Lock()
  101. defer p.Unlock()
  102. host := p.getRoundRobin()
  103. return &standardHostPoolResponse{host: host, pool: p}
  104. }
  105. func (p *epsilonGreedyHostPool) Get() HostPoolResponse {
  106. host := p.getEpsilonGreedy()
  107. started := time.Now()
  108. return &epsilonHostPoolResponse{
  109. standardHostPoolResponse: standardHostPoolResponse{host: host, pool: p},
  110. started: started,
  111. }
  112. }
  113. func (p *standardHostPool) getRoundRobin() string {
  114. // TODO - will want to replace this with something that runs in a
  115. // goroutine and receives requests on a channel.
  116. // The state being protected in that case is really just the currentIdx
  117. // Question - should I just skip the goroutine shit and select randomly?
  118. // Maybe
  119. now := time.Now()
  120. hostCount := len(p.hosts)
  121. for i := range p.hostList() {
  122. // iterate via sequenece from where we last iterated
  123. currentIndex := (i + p.nextHostIndex) % hostCount
  124. h := p.hostList()[currentIndex]
  125. if h.canTryHost(now) {
  126. if h.IsDead() {
  127. h.willRetryHost()
  128. }
  129. p.nextHostIndex = currentIndex + 1
  130. return h.Host()
  131. }
  132. }
  133. // all hosts are down. re-add them
  134. p.doResetAll()
  135. p.nextHostIndex = 0
  136. return p.hostList()[0].Host()
  137. }
  138. func (p *standardHostPool) ResetAll() {
  139. p.Lock()
  140. defer p.Unlock()
  141. p.doResetAll()
  142. }
  143. // this actually performs the logic to reset,
  144. // and should only be called when the lock has
  145. // already been acquired
  146. func (p *standardHostPool) doResetAll() {
  147. for _, h := range p.hosts {
  148. h.SetDead(false)
  149. }
  150. }
  151. func (p *standardHostPool) markSuccess(hostR HostPoolResponse) {
  152. host := hostR.Host()
  153. p.Lock()
  154. defer p.Unlock()
  155. h, ok := p.hosts[host]
  156. if !ok {
  157. log.Fatalf("host %s not in HostPool %v", host, p.Hosts())
  158. }
  159. h.SetDead(false)
  160. }
  161. func (p *standardHostPool) markFailed(hostR HostPoolResponse) {
  162. host := hostR.Host()
  163. p.Lock()
  164. defer p.Unlock()
  165. h, ok := p.hosts[host]
  166. if !ok {
  167. log.Fatalf("host %s not in HostPool %v", host, p.Hosts())
  168. }
  169. h.SetDead(true)
  170. }
  171. func (p *standardHostPool) Hosts() []string {
  172. hosts := make([]string, 0, len(p.hosts))
  173. for host, _ := range p.hosts {
  174. hosts = append(hosts, host)
  175. }
  176. return hosts
  177. }
  178. func (p *standardHostPool) lookupHost(hostname string) HostEntry {
  179. // We can do a "simple" lookup here because this map doesn't change once init'd
  180. h, ok := p.hosts[hostname]
  181. if !ok {
  182. log.Fatalf("host %s not in HostPool %v", hostname, p.Hosts())
  183. }
  184. return h
  185. }
  186. func (p *standardHostPool) hostList() []HostEntry {
  187. // This returns a sorted list of HostEntry's. We ought
  188. // to do some optimization so that this isn't computed every time
  189. // TODO can totally use Hosts above to accomplish this
  190. keys := make([]string, 0, len(p.hosts))
  191. vals := make([]HostEntry, 0, len(p.hosts))
  192. for hostName := range p.hosts {
  193. keys = append(keys, hostName)
  194. }
  195. sort.Strings(keys)
  196. for _, k := range keys {
  197. vals = append(vals, p.hosts[k])
  198. }
  199. return vals
  200. }
  201. // -------- Epsilon Value Calculators ----------
  202. func (c *LinearEpsilonValueCalculator) CalcValueFromAvgResponseTime(v float64) float64 {
  203. return 1.0 / v
  204. }
  205. func (c *LogEpsilonValueCalculator) CalcValueFromAvgResponseTime(v float64) float64 {
  206. return math.Log(c.LinearEpsilonValueCalculator.CalcValueFromAvgResponseTime(v))
  207. }
  208. func (c *PolynomialEpsilonValueCalculator) CalcValueFromAvgResponseTime(v float64) float64 {
  209. return math.Pow(c.LinearEpsilonValueCalculator.CalcValueFromAvgResponseTime(v), c.exp)
  210. }