hostpool.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. package hostpool
  2. import (
  3. "log"
  4. "math"
  5. "sort"
  6. "sync"
  7. "time"
  8. )
  9. // --- timer: this just exists for testing
  10. type timer interface {
  11. between(time.Time, time.Time) time.Duration
  12. }
  13. type realTimer struct{}
  14. // --- Response interfaces and structs ----
  15. type HostPoolResponse interface {
  16. Host() string
  17. Mark(error)
  18. hostPool() HostPool
  19. }
  20. type standardHostPoolResponse struct {
  21. host string
  22. sync.Once
  23. pool HostPool
  24. }
  25. // --- HostPool structs and interfaces ----
  26. type HostPool interface {
  27. Get() HostPoolResponse
  28. // keep the marks separate so we can override independently
  29. markSuccess(HostPoolResponse)
  30. markFailed(HostPoolResponse)
  31. ResetAll()
  32. Hosts() []string
  33. lookupHost(string) HostEntry
  34. }
  35. type standardHostPool struct {
  36. hosts map[string]HostEntry
  37. initialRetryDelay time.Duration
  38. maxRetryInterval time.Duration
  39. rrResults chan string
  40. }
  41. // --- Value Calculators -----------------
  42. type EpsilonValueCalculator interface {
  43. CalcValueFromAvgResponseTime(float64) float64
  44. }
  45. type LinearEpsilonValueCalculator struct{}
  46. type LogEpsilonValueCalculator struct{ LinearEpsilonValueCalculator }
  47. type PolynomialEpsilonValueCalculator struct {
  48. LinearEpsilonValueCalculator
  49. exp float64 // the exponent to which we will raise the value to reweight
  50. }
  51. func New(hosts []string) HostPool {
  52. p := &standardHostPool{
  53. hosts: make(map[string]HostEntry, len(hosts)),
  54. initialRetryDelay: time.Duration(30) * time.Second,
  55. maxRetryInterval: time.Duration(900) * time.Second,
  56. }
  57. for _, h := range hosts {
  58. e := newHostEntry(h, p.initialRetryDelay, p.maxRetryInterval)
  59. p.hosts[h] = e
  60. }
  61. p.rrResults = make(chan string)
  62. go p.serveRoundRobin()
  63. return p
  64. }
  65. func (r *standardHostPoolResponse) Host() string {
  66. return r.host
  67. }
  68. func (r *standardHostPoolResponse) hostPool() HostPool {
  69. return r.pool
  70. }
  71. func (r *standardHostPoolResponse) Mark(err error) {
  72. r.Do(func() {
  73. doMark(err, r)
  74. })
  75. }
  76. func doMark(err error, r HostPoolResponse) {
  77. if err == nil {
  78. r.hostPool().markSuccess(r)
  79. } else {
  80. r.hostPool().markFailed(r)
  81. }
  82. }
  83. func (r *epsilonHostPoolResponse) Mark(err error) {
  84. r.Do(func() {
  85. r.ended = time.Now()
  86. doMark(err, r)
  87. })
  88. }
  89. func (rt *realTimer) between(start time.Time, end time.Time) time.Duration {
  90. return end.Sub(start)
  91. }
  92. // return an upstream entry from the HostPool
  93. func (p *standardHostPool) Get() HostPoolResponse {
  94. host := p.getRoundRobin()
  95. return &standardHostPoolResponse{host: host, pool: p}
  96. }
  97. func (p *epsilonGreedyHostPool) Get() HostPoolResponse {
  98. host := p.getEpsilonGreedy()
  99. started := time.Now()
  100. return &epsilonHostPoolResponse{
  101. standardHostPoolResponse: standardHostPoolResponse{host: host, pool: p},
  102. started: started,
  103. }
  104. }
  105. func (p *standardHostPool) getRoundRobin() string {
  106. return <-p.rrResults
  107. }
  108. func (p *standardHostPool) serveRoundRobin() {
  109. nextHostIndex := 0
  110. getHostToServe := func() string {
  111. hostCount := len(p.hosts)
  112. for i := range p.hostList() {
  113. // iterate via sequenece from where we last iterated
  114. currentIndex := (i + nextHostIndex) % hostCount
  115. h := p.hostList()[currentIndex]
  116. if h.canTryHost(time.Now()) {
  117. if h.IsDead() {
  118. h.willRetryHost()
  119. }
  120. nextHostIndex = currentIndex + 1
  121. return h.Host()
  122. }
  123. }
  124. // all hosts are down. re-add them
  125. p.ResetAll()
  126. nextHostIndex = 0
  127. return p.hostList()[0].Host()
  128. }
  129. for {
  130. p.rrResults <- getHostToServe()
  131. }
  132. }
  133. func (p *standardHostPool) ResetAll() {
  134. // SetDead is threadsafe
  135. for _, h := range p.hosts {
  136. h.SetDead(false)
  137. }
  138. }
  139. func (p *standardHostPool) markSuccess(hostR HostPoolResponse) {
  140. host := hostR.Host()
  141. h, ok := p.hosts[host]
  142. if !ok {
  143. log.Fatalf("host %s not in HostPool %v", host, p.Hosts())
  144. }
  145. h.SetDead(false)
  146. }
  147. func (p *standardHostPool) markFailed(hostR HostPoolResponse) {
  148. host := hostR.Host()
  149. h, ok := p.hosts[host]
  150. if !ok {
  151. log.Fatalf("host %s not in HostPool %v", host, p.Hosts())
  152. }
  153. h.SetDead(true)
  154. }
  155. func (p *standardHostPool) Hosts() []string {
  156. hosts := make([]string, 0, len(p.hosts))
  157. for host, _ := range p.hosts {
  158. hosts = append(hosts, host)
  159. }
  160. return hosts
  161. }
  162. func (p *standardHostPool) lookupHost(hostname string) HostEntry {
  163. // We can do a "simple" lookup here because this map doesn't change once init'd
  164. h, ok := p.hosts[hostname]
  165. if !ok {
  166. log.Fatalf("host %s not in HostPool %v", hostname, p.Hosts())
  167. }
  168. return h
  169. }
  170. func (p *standardHostPool) hostList() []HostEntry {
  171. // This returns a sorted list of HostEntry's. We ought
  172. // to do some optimization so that this isn't computed every time
  173. keys := make([]string, 0, len(p.hosts))
  174. vals := make([]HostEntry, 0, len(p.hosts))
  175. for hostName := range p.hosts {
  176. keys = append(keys, hostName)
  177. }
  178. sort.Strings(keys)
  179. for _, k := range keys {
  180. vals = append(vals, p.hosts[k])
  181. }
  182. return vals
  183. }
  184. // -------- Epsilon Value Calculators ----------
  185. func (c *LinearEpsilonValueCalculator) CalcValueFromAvgResponseTime(v float64) float64 {
  186. return 1.0 / v
  187. }
  188. func (c *LogEpsilonValueCalculator) CalcValueFromAvgResponseTime(v float64) float64 {
  189. return math.Log(c.LinearEpsilonValueCalculator.CalcValueFromAvgResponseTime(v))
  190. }
  191. func (c *PolynomialEpsilonValueCalculator) CalcValueFromAvgResponseTime(v float64) float64 {
  192. return math.Pow(c.LinearEpsilonValueCalculator.CalcValueFromAvgResponseTime(v), c.exp)
  193. }