hostpool.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428
  1. package hostpool
  2. import (
  3. "log"
  4. "math"
  5. "math/rand"
  6. "sync"
  7. "time"
  8. )
  9. // --- timer: this just exists for testing
  10. type timer interface {
  11. between(time.Time, time.Time) time.Duration
  12. }
  13. type realTimer struct{}
  14. // --- Response interfaces and structs ----
  15. type HostPoolResponse interface {
  16. Host() string
  17. Mark(error)
  18. hostPool() HostPool
  19. }
  20. type standardHostPoolResponse struct {
  21. host string
  22. sync.Once
  23. pool HostPool
  24. }
  25. type epsilonHostPoolResponse struct {
  26. standardHostPoolResponse
  27. started time.Time
  28. ended time.Time
  29. }
  30. // --- HostPool structs and interfaces ----
  31. type HostPool interface {
  32. Get() HostPoolResponse
  33. // keep the marks separate so we can override independently
  34. markSuccess(HostPoolResponse)
  35. markFailed(HostPoolResponse)
  36. ResetAll()
  37. Hosts() []string
  38. sync.Locker
  39. }
  40. type standardHostPool struct {
  41. sync.RWMutex
  42. hosts map[string]*hostEntry
  43. hostList []*hostEntry
  44. initialRetryDelay time.Duration
  45. maxRetryInterval time.Duration
  46. nextHostIndex int
  47. }
  48. type epsilonGreedyHostPool struct {
  49. HostPool
  50. epsilon float32 // this is our exploration factor
  51. decayDuration time.Duration
  52. EpsilonValueCalculator // embed the epsilonValueCalculator
  53. timer
  54. }
  55. // --- Value Calculators -----------------
  56. type EpsilonValueCalculator interface {
  57. CalcValueFromAvgResponseTime(float64) float64
  58. }
  59. type LinearEpsilonValueCalculator struct{}
  60. type LogEpsilonValueCalculator struct{ LinearEpsilonValueCalculator }
  61. type PolynomialEpsilonValueCalculator struct {
  62. LinearEpsilonValueCalculator
  63. exp float64 // the exponent to which we will raise the value to reweight
  64. }
  65. // ------ constants -------------------
  66. const epsilonBuckets = 120
  67. const epsilonDecay = 0.90 // decay the exploration rate
  68. const minEpsilon = 0.01 // explore one percent of the time
  69. const initialEpsilon = 0.3
  70. const defaultDecayDuration = time.Duration(5) * time.Minute
  71. func New(hosts []string) HostPool {
  72. p := &standardHostPool{
  73. hosts: make(map[string]*hostEntry, len(hosts)),
  74. hostList: make([]*hostEntry, len(hosts)),
  75. initialRetryDelay: time.Duration(30) * time.Second,
  76. maxRetryInterval: time.Duration(900) * time.Second,
  77. }
  78. for i, h := range hosts {
  79. e := newHostEntry(h, p.initialRetryDelay, p.maxRetryInterval)
  80. p.hosts[h] = e
  81. p.hostList[i] = e
  82. }
  83. return p
  84. }
  85. func (r *standardHostPoolResponse) Host() string {
  86. return r.host
  87. }
  88. func (r *standardHostPoolResponse) hostPool() HostPool {
  89. return r.pool
  90. }
  91. func (r *standardHostPoolResponse) Mark(err error) {
  92. r.Do(func() {
  93. doMark(err, r)
  94. })
  95. }
  96. func doMark(err error, r HostPoolResponse) {
  97. if err == nil {
  98. r.hostPool().markSuccess(r)
  99. } else {
  100. r.hostPool().markFailed(r)
  101. }
  102. }
  103. func (r *epsilonHostPoolResponse) Mark(err error) {
  104. r.Do(func() {
  105. r.ended = time.Now()
  106. doMark(err, r)
  107. })
  108. }
  109. // Epsilon Greedy is an algorithim that allows HostPool not only to track failure state,
  110. // but also to learn about "better" options in terms of speed, and to pick from available hosts
  111. // based on a percentage of how well they perform. This gives a weighted request rate to better
  112. // performing hosts, while still distributing requests to all hosts (proportionate to their performance)
  113. //
  114. // After enabling Epsilon Greedy, hosts must be marked for sucess along with a time value representing
  115. // how fast (or slow) that host was.
  116. //
  117. // host := pool.Get()
  118. // start := time.Now()
  119. // ..... do work with host
  120. // duration = time.Now().Sub(start)
  121. // pool.MarkSuccessWithTime(host, duration)
  122. //
  123. // a good overview of Epsilon Greedy is here http://stevehanov.ca/blog/index.php?id=132
  124. //
  125. // decayDuration may be set to 0 to use the default value of 5 minutes
  126. func NewEpsilonGreedy(hosts []string, decayDuration time.Duration, calc EpsilonValueCalculator) HostPool {
  127. if decayDuration <= 0 {
  128. decayDuration = defaultDecayDuration
  129. }
  130. // stdHP := New(hosts).(*standardHostPool)
  131. p := &epsilonGreedyHostPool{
  132. HostPool: New(hosts),
  133. epsilon: float32(initialEpsilon),
  134. decayDuration: decayDuration,
  135. EpsilonValueCalculator: calc,
  136. timer: &realTimer{},
  137. }
  138. // allocate structures
  139. for _, h := range p.hostList {
  140. h.epsilonCounts = make([]int64, epsilonBuckets)
  141. h.epsilonValues = make([]int64, epsilonBuckets)
  142. }
  143. go p.epsilonGreedyDecay()
  144. return p
  145. }
  146. func (rt *realTimer) between(start time.Time, end time.Time) time.Duration {
  147. return end.Sub(start)
  148. }
  149. func (p *epsilonGreedyHostPool) SetEpsilon(newEpsilon float32) {
  150. p.Lock()
  151. defer p.Unlock()
  152. p.epsilon = newEpsilon
  153. }
  154. func (p *epsilonGreedyHostPool) epsilonGreedyDecay() {
  155. durationPerBucket := p.decayDuration / epsilonBuckets
  156. ticker := time.Tick(durationPerBucket)
  157. for {
  158. <-ticker
  159. p.performEpsilonGreedyDecay()
  160. }
  161. }
  162. func (p *epsilonGreedyHostPool) performEpsilonGreedyDecay() {
  163. p.Lock()
  164. for _, h := range p.hostList {
  165. h.epsilonIndex += 1
  166. h.epsilonIndex = h.epsilonIndex % epsilonBuckets
  167. h.epsilonCounts[h.epsilonIndex] = 0
  168. h.epsilonValues[h.epsilonIndex] = 0
  169. }
  170. p.Unlock()
  171. }
  172. // return an upstream entry from the HostPool
  173. func (p *standardHostPool) Get() HostPoolResponse {
  174. p.Lock()
  175. defer p.Unlock()
  176. host := p.getRoundRobin()
  177. return &standardHostPoolResponse{host: host, pool: p}
  178. }
  179. func (p *epsilonGreedyHostPool) Get() HostPoolResponse {
  180. p.Lock()
  181. defer p.Unlock()
  182. host := p.getEpsilonGreedy()
  183. started := time.Now()
  184. return &epsilonHostPoolResponse{
  185. standardHostPoolResponse: standardHostPoolResponse{host: host, pool: p},
  186. started: started,
  187. }
  188. }
  189. func (p *standardHostPool) getRoundRobin() string {
  190. now := time.Now()
  191. hostCount := len(p.hostList)
  192. for i := range p.hostList {
  193. // iterate via sequenece from where we last iterated
  194. currentIndex := (i + p.nextHostIndex) % hostCount
  195. h := p.hostList[currentIndex]
  196. if h.canTryHost(now) {
  197. if h.IsDead() {
  198. h.willRetryHost()
  199. }
  200. p.nextHostIndex = currentIndex + 1
  201. return h.Host()
  202. }
  203. }
  204. // all hosts are down. re-add them
  205. p.doResetAll()
  206. p.nextHostIndex = 0
  207. return p.hostList[0].host
  208. }
  209. func (p *epsilonGreedyHostPool) getEpsilonGreedy() string {
  210. var hostToUse *hostEntry
  211. // this is our exploration phase
  212. if rand.Float32() < p.epsilon {
  213. p.epsilon = p.epsilon * epsilonDecay
  214. if p.epsilon < minEpsilon {
  215. p.epsilon = minEpsilon
  216. }
  217. return p.getRoundRobin()
  218. }
  219. // calculate values for each host in the 0..1 range (but not ormalized)
  220. var possibleHosts []*hostEntry
  221. now := time.Now()
  222. var sumValues float64
  223. for _, h := range p.hostList {
  224. if h.canTryHost(now) {
  225. v := h.getWeightedAverageResponseTime()
  226. if v > 0 {
  227. ev := p.CalcValueFromAvgResponseTime(v)
  228. h.epsilonValue = ev
  229. sumValues += ev
  230. possibleHosts = append(possibleHosts, h)
  231. }
  232. }
  233. }
  234. if len(possibleHosts) != 0 {
  235. // now normalize to the 0..1 range to get a percentage
  236. for _, h := range possibleHosts {
  237. h.epsilonPercentage = h.epsilonValue / sumValues
  238. }
  239. // do a weighted random choice among hosts
  240. ceiling := 0.0
  241. pickPercentage := rand.Float64()
  242. for _, h := range possibleHosts {
  243. ceiling += h.epsilonPercentage
  244. if pickPercentage <= ceiling {
  245. hostToUse = h
  246. break
  247. }
  248. }
  249. }
  250. if hostToUse == nil {
  251. if len(possibleHosts) != 0 {
  252. log.Println("Failed to randomly choose a host, Dan loses")
  253. }
  254. return p.getRoundRobin()
  255. }
  256. if hostToUse.dead {
  257. hostToUse.willRetryHost(p.maxRetryInterval)
  258. }
  259. return hostToUse.host
  260. }
  261. func (h *hostEntry) canTryHost(now time.Time) bool {
  262. if !h.dead {
  263. return true
  264. }
  265. if h.nextRetry.Before(now) {
  266. return true
  267. }
  268. return false
  269. }
  270. func (h *hostEntry) willRetryHost(maxRetryInterval time.Duration) {
  271. h.retryCount += 1
  272. newDelay := h.retryDelay * 2
  273. if newDelay < maxRetryInterval {
  274. h.retryDelay = newDelay
  275. } else {
  276. h.retryDelay = maxRetryInterval
  277. }
  278. h.nextRetry = time.Now().Add(h.retryDelay)
  279. }
  280. func (h *hostEntry) getWeightedAverageResponseTime() float64 {
  281. var value float64
  282. var lastValue float64
  283. // start at 1 so we start with the oldest entry
  284. for i := 1; i <= epsilonBuckets; i += 1 {
  285. pos := (h.epsilonIndex + i) % epsilonBuckets
  286. bucketCount := h.epsilonCounts[pos]
  287. // Changing the line below to what I think it should be to get the weights right
  288. weight := float64(i) / float64(epsilonBuckets)
  289. if bucketCount > 0 {
  290. currentValue := float64(h.epsilonValues[pos]) / float64(bucketCount)
  291. value += currentValue * weight
  292. lastValue = currentValue
  293. } else {
  294. value += lastValue * weight
  295. }
  296. }
  297. return value
  298. }
  299. func (p *standardHostPool) ResetAll() {
  300. p.Lock()
  301. defer p.Unlock()
  302. p.doResetAll()
  303. }
  304. // this actually performs the logic to reset,
  305. // and should only be called when the lock has
  306. // already been acquired
  307. func (p *standardHostPool) doResetAll() {
  308. for _, h := range p.hosts {
  309. h.SetDead(false)
  310. }
  311. }
  312. func (p *standardHostPool) markSuccess(hostR HostPoolResponse) {
  313. host := hostR.Host()
  314. p.Lock()
  315. defer p.Unlock()
  316. h, ok := p.hosts[host]
  317. if !ok {
  318. log.Fatalf("host %s not in HostPool %v", host, p.Hosts())
  319. }
  320. h.SetDead(false)
  321. }
  322. func (p *epsilonGreedyHostPool) markSuccess(hostR HostPoolResponse) {
  323. // first do the base markSuccess - a little redundant with host lookup but cleaner than repeating logic
  324. p.HostPool.markSuccess(hostR)
  325. eHostR, ok := hostR.(*epsilonHostPoolResponse)
  326. if !ok {
  327. log.Printf("Incorrect type in eps markSuccess!") // TODO reflection to print out offending type
  328. return
  329. }
  330. host := eHostR.host
  331. duration := p.between(eHostR.started, eHostR.ended)
  332. p.Lock()
  333. defer p.Unlock()
  334. h, ok := p.hosts[host]
  335. if !ok {
  336. log.Fatalf("host %s not in HostPool %v", host, p.Hosts())
  337. }
  338. h.epsilonCounts[h.epsilonIndex]++
  339. h.epsilonValues[h.epsilonIndex] += int64(duration.Seconds() * 1000)
  340. }
  341. func (p *standardHostPool) markFailed(hostR HostPoolResponse) {
  342. host := hostR.Host()
  343. p.Lock()
  344. defer p.Unlock()
  345. h, ok := p.hosts[host]
  346. if !ok {
  347. log.Fatalf("host %s not in HostPool %v", host, p.Hosts())
  348. }
  349. h.SetDead(true)
  350. }
  351. func (p *standardHostPool) Hosts() []string {
  352. hosts := make([]string, len(p.hosts))
  353. for host, _ := range p.hosts {
  354. hosts = append(hosts, host)
  355. }
  356. return hosts
  357. }
  358. // -------- Epsilon Value Calculators ----------
  359. func (c *LinearEpsilonValueCalculator) CalcValueFromAvgResponseTime(v float64) float64 {
  360. return 1.0 / v
  361. }
  362. func (c *LogEpsilonValueCalculator) CalcValueFromAvgResponseTime(v float64) float64 {
  363. return math.Log(c.LinearEpsilonValueCalculator.CalcValueFromAvgResponseTime(v))
  364. }
  365. func (c *PolynomialEpsilonValueCalculator) CalcValueFromAvgResponseTime(v float64) float64 {
  366. return math.Pow(c.LinearEpsilonValueCalculator.CalcValueFromAvgResponseTime(v), c.exp)
  367. }