hostpool.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452
  1. package hostpool
  2. import (
  3. "log"
  4. "math"
  5. "math/rand"
  6. "sync"
  7. "time"
  8. )
  9. // --- timer: this just exists for testing
  10. type timer interface {
  11. between(time.Time, time.Time) time.Duration
  12. }
  13. type realTimer struct{}
  14. // --- Response interfaces and structs ----
  15. type HostPoolResponse interface {
  16. Host() string
  17. Mark(error)
  18. hostPool() HostPool
  19. }
  20. type standardHostPoolResponse struct {
  21. host string
  22. sync.Once
  23. pool HostPool
  24. }
  25. type epsilonHostPoolResponse struct {
  26. standardHostPoolResponse
  27. started time.Time
  28. ended time.Time
  29. }
  30. // --- HostPool structs and interfaces ----
  31. type HostPool interface {
  32. Get() HostPoolResponse
  33. // keep the marks separate so we can override independently
  34. markSuccess(HostPoolResponse)
  35. markFailed(HostPoolResponse)
  36. ResetAll()
  37. Hosts() []string
  38. }
  39. type standardHostPool struct {
  40. sync.RWMutex
  41. hosts map[string]*hostEntry
  42. hostList []*hostEntry
  43. initialRetryDelay time.Duration
  44. maxRetryInterval time.Duration
  45. nextHostIndex int
  46. }
  47. type epsilonGreedyHostPool struct {
  48. standardHostPool // TODO - would be nifty if we could embed HostPool and Locker interfaces
  49. epsilon float32 // this is our exploration factor
  50. decayDuration time.Duration
  51. EpsilonValueCalculator // embed the epsilonValueCalculator
  52. timer
  53. }
  54. // --- hostEntry - this is due to get upgraded
  55. type hostEntry struct {
  56. host string
  57. nextRetry time.Time
  58. retryCount int16
  59. retryDelay time.Duration
  60. dead bool
  61. epsilonCounts []int64
  62. epsilonValues []int64
  63. epsilonIndex int
  64. epsilonValue float64
  65. epsilonPercentage float64
  66. }
  67. // --- Value Calculators -----------------
  68. type EpsilonValueCalculator interface {
  69. CalcValueFromAvgResponseTime(float64) float64
  70. }
  71. type LinearEpsilonValueCalculator struct{}
  72. type LogEpsilonValueCalculator struct{ LinearEpsilonValueCalculator }
  73. type PolynomialEpsilonValueCalculator struct {
  74. LinearEpsilonValueCalculator
  75. exp float64 // the exponent to which we will raise the value to reweight
  76. }
  77. // ------ constants -------------------
  78. const epsilonBuckets = 120
  79. const epsilonDecay = 0.90 // decay the exploration rate
  80. const minEpsilon = 0.01 // explore one percent of the time
  81. const initialEpsilon = 0.3
  82. const defaultDecayDuration = time.Duration(5) * time.Minute
  83. func New(hosts []string) HostPool {
  84. p := &standardHostPool{
  85. hosts: make(map[string]*hostEntry, len(hosts)),
  86. hostList: make([]*hostEntry, len(hosts)),
  87. initialRetryDelay: time.Duration(30) * time.Second,
  88. maxRetryInterval: time.Duration(900) * time.Second,
  89. }
  90. for i, h := range hosts {
  91. e := &hostEntry{
  92. host: h,
  93. retryDelay: p.initialRetryDelay,
  94. }
  95. p.hosts[h] = e
  96. p.hostList[i] = e
  97. }
  98. return p
  99. }
  100. func (r *standardHostPoolResponse) Host() string {
  101. return r.host
  102. }
  103. func (r *standardHostPoolResponse) hostPool() HostPool {
  104. return r.pool
  105. }
  106. func (r *standardHostPoolResponse) Mark(err error) {
  107. r.Do(func() {
  108. doMark(err, r)
  109. })
  110. }
  111. func doMark(err error, r HostPoolResponse) {
  112. if err == nil {
  113. r.hostPool().markSuccess(r)
  114. } else {
  115. r.hostPool().markFailed(r)
  116. }
  117. }
  118. func (r *epsilonHostPoolResponse) Mark(err error) {
  119. r.Do(func() {
  120. r.ended = time.Now()
  121. doMark(err, r)
  122. })
  123. }
  124. // Epsilon Greedy is an algorithim that allows HostPool not only to track failure state,
  125. // but also to learn about "better" options in terms of speed, and to pick from available hosts
  126. // based on a percentage of how well they perform. This gives a weighted request rate to better
  127. // performing hosts, while still distributing requests to all hosts (proportionate to their performance)
  128. //
  129. // After enabling Epsilon Greedy, hosts must be marked for sucess along with a time value representing
  130. // how fast (or slow) that host was.
  131. //
  132. // host := pool.Get()
  133. // start := time.Now()
  134. // ..... do work with host
  135. // duration = time.Now().Sub(start)
  136. // pool.MarkSuccessWithTime(host, duration)
  137. //
  138. // a good overview of Epsilon Greedy is here http://stevehanov.ca/blog/index.php?id=132
  139. //
  140. // decayDuration may be set to 0 to use the default value of 5 minutes
  141. func NewEpsilonGreedy(hosts []string, decayDuration time.Duration, calc EpsilonValueCalculator) HostPool {
  142. if decayDuration <= 0 {
  143. decayDuration = defaultDecayDuration
  144. }
  145. stdHP := New(hosts).(*standardHostPool)
  146. p := &epsilonGreedyHostPool{
  147. standardHostPool: *stdHP,
  148. epsilon: float32(initialEpsilon),
  149. decayDuration: decayDuration,
  150. EpsilonValueCalculator: calc,
  151. timer: &realTimer{},
  152. }
  153. // allocate structures
  154. for _, h := range p.hostList {
  155. h.epsilonCounts = make([]int64, epsilonBuckets)
  156. h.epsilonValues = make([]int64, epsilonBuckets)
  157. }
  158. go p.epsilonGreedyDecay()
  159. return p
  160. }
  161. func (rt *realTimer) between(start time.Time, end time.Time) time.Duration {
  162. return end.Sub(start)
  163. }
  164. func (p *epsilonGreedyHostPool) SetEpsilon(newEpsilon float32) {
  165. p.Lock()
  166. defer p.Unlock()
  167. p.epsilon = newEpsilon
  168. }
  169. func (p *epsilonGreedyHostPool) epsilonGreedyDecay() {
  170. durationPerBucket := p.decayDuration / epsilonBuckets
  171. ticker := time.Tick(durationPerBucket)
  172. for {
  173. <-ticker
  174. p.performEpsilonGreedyDecay()
  175. }
  176. }
  177. func (p *epsilonGreedyHostPool) performEpsilonGreedyDecay() {
  178. p.Lock()
  179. for _, h := range p.hostList {
  180. h.epsilonIndex += 1
  181. h.epsilonIndex = h.epsilonIndex % epsilonBuckets
  182. h.epsilonCounts[h.epsilonIndex] = 0
  183. h.epsilonValues[h.epsilonIndex] = 0
  184. }
  185. p.Unlock()
  186. }
  187. // return an upstream entry from the HostPool
  188. func (p *standardHostPool) Get() HostPoolResponse {
  189. p.Lock()
  190. defer p.Unlock()
  191. host := p.getRoundRobin()
  192. return &standardHostPoolResponse{host: host, pool: p}
  193. }
  194. func (p *epsilonGreedyHostPool) Get() HostPoolResponse {
  195. p.Lock()
  196. defer p.Unlock()
  197. host := p.getEpsilonGreedy()
  198. started := time.Now()
  199. return &epsilonHostPoolResponse{
  200. standardHostPoolResponse: standardHostPoolResponse{host: host, pool: p},
  201. started: started,
  202. }
  203. }
  204. func (p *standardHostPool) getRoundRobin() string {
  205. now := time.Now()
  206. hostCount := len(p.hostList)
  207. for i := range p.hostList {
  208. // iterate via sequenece from where we last iterated
  209. currentIndex := (i + p.nextHostIndex) % hostCount
  210. h := p.hostList[currentIndex]
  211. if !h.dead {
  212. p.nextHostIndex = currentIndex + 1
  213. return h.host
  214. }
  215. if h.nextRetry.Before(now) {
  216. h.willRetryHost(p.maxRetryInterval)
  217. p.nextHostIndex = currentIndex + 1
  218. return h.host
  219. }
  220. }
  221. // all hosts are down. re-add them
  222. p.doResetAll()
  223. p.nextHostIndex = 0
  224. return p.hostList[0].host
  225. }
  226. func (p *epsilonGreedyHostPool) getEpsilonGreedy() string {
  227. var hostToUse *hostEntry
  228. // this is our exploration phase
  229. if rand.Float32() < p.epsilon {
  230. p.epsilon = p.epsilon * epsilonDecay
  231. if p.epsilon < minEpsilon {
  232. p.epsilon = minEpsilon
  233. }
  234. return p.getRoundRobin()
  235. }
  236. // calculate values for each host in the 0..1 range (but not ormalized)
  237. var possibleHosts []*hostEntry
  238. now := time.Now()
  239. var sumValues float64
  240. for _, h := range p.hostList {
  241. if h.canTryHost(now) {
  242. v := h.getWeightedAverageResponseTime()
  243. if v > 0 {
  244. ev := p.CalcValueFromAvgResponseTime(v)
  245. h.epsilonValue = ev
  246. sumValues += ev
  247. possibleHosts = append(possibleHosts, h)
  248. }
  249. }
  250. }
  251. if len(possibleHosts) != 0 {
  252. // now normalize to the 0..1 range to get a percentage
  253. for _, h := range possibleHosts {
  254. h.epsilonPercentage = h.epsilonValue / sumValues
  255. }
  256. // do a weighted random choice among hosts
  257. ceiling := 0.0
  258. pickPercentage := rand.Float64()
  259. for _, h := range possibleHosts {
  260. ceiling += h.epsilonPercentage
  261. if pickPercentage <= ceiling {
  262. hostToUse = h
  263. break
  264. }
  265. }
  266. }
  267. if hostToUse == nil {
  268. if len(possibleHosts) != 0 {
  269. log.Println("Failed to randomly choose a host, Dan loses")
  270. }
  271. return p.getRoundRobin()
  272. }
  273. if hostToUse.dead {
  274. hostToUse.willRetryHost(p.maxRetryInterval)
  275. }
  276. return hostToUse.host
  277. }
  278. func (h *hostEntry) canTryHost(now time.Time) bool {
  279. if !h.dead {
  280. return true
  281. }
  282. if h.nextRetry.Before(now) {
  283. return true
  284. }
  285. return false
  286. }
  287. func (h *hostEntry) willRetryHost(maxRetryInterval time.Duration) {
  288. h.retryCount += 1
  289. newDelay := h.retryDelay * 2
  290. if newDelay < maxRetryInterval {
  291. h.retryDelay = newDelay
  292. } else {
  293. h.retryDelay = maxRetryInterval
  294. }
  295. h.nextRetry = time.Now().Add(h.retryDelay)
  296. }
  297. func (h *hostEntry) getWeightedAverageResponseTime() float64 {
  298. var value float64
  299. var lastValue float64
  300. // start at 1 so we start with the oldest entry
  301. for i := 1; i <= epsilonBuckets; i += 1 {
  302. pos := (h.epsilonIndex + i) % epsilonBuckets
  303. bucketCount := h.epsilonCounts[pos]
  304. // Changing the line below to what I think it should be to get the weights right
  305. weight := float64(i) / float64(epsilonBuckets)
  306. if bucketCount > 0 {
  307. currentValue := float64(h.epsilonValues[pos]) / float64(bucketCount)
  308. value += currentValue * weight
  309. lastValue = currentValue
  310. } else {
  311. value += lastValue * weight
  312. }
  313. }
  314. return value
  315. }
  316. func (p *standardHostPool) ResetAll() {
  317. p.Lock()
  318. defer p.Unlock()
  319. p.doResetAll()
  320. }
  321. // this actually performs the logic to reset,
  322. // and should only be called when the lock has
  323. // already been acquired
  324. func (p *standardHostPool) doResetAll() {
  325. for _, h := range p.hosts {
  326. h.dead = false
  327. }
  328. }
  329. func (p *standardHostPool) markSuccess(hostR HostPoolResponse) {
  330. host := hostR.Host()
  331. p.Lock()
  332. defer p.Unlock()
  333. h, ok := p.hosts[host]
  334. if !ok {
  335. log.Fatalf("host %s not in HostPool %v", host, p.Hosts())
  336. }
  337. h.dead = false
  338. }
  339. func (p *epsilonGreedyHostPool) markSuccess(hostR HostPoolResponse) {
  340. // first do the base markSuccess - a little redundant with host lookup but cleaner than repeating logic
  341. p.standardHostPool.markSuccess(hostR)
  342. eHostR, ok := hostR.(*epsilonHostPoolResponse)
  343. if !ok {
  344. log.Printf("Incorrect type in eps markSuccess!") // TODO reflection to print out offending type
  345. return
  346. }
  347. host := eHostR.host
  348. duration := p.between(eHostR.started, eHostR.ended)
  349. p.Lock()
  350. defer p.Unlock()
  351. h, ok := p.hosts[host]
  352. if !ok {
  353. log.Fatalf("host %s not in HostPool %v", host, p.Hosts())
  354. }
  355. h.epsilonCounts[h.epsilonIndex]++
  356. h.epsilonValues[h.epsilonIndex] += int64(duration.Seconds() * 1000)
  357. }
  358. func (p *standardHostPool) markFailed(hostR HostPoolResponse) {
  359. host := hostR.Host()
  360. p.Lock()
  361. defer p.Unlock()
  362. h, ok := p.hosts[host]
  363. if !ok {
  364. log.Fatalf("host %s not in HostPool %v", host, p.Hosts())
  365. }
  366. if !h.dead {
  367. h.dead = true
  368. h.retryCount = 0
  369. h.retryDelay = p.initialRetryDelay
  370. h.nextRetry = time.Now().Add(h.retryDelay)
  371. }
  372. }
  373. func (p *standardHostPool) Hosts() []string {
  374. hosts := make([]string, len(p.hosts))
  375. for host, _ := range p.hosts {
  376. hosts = append(hosts, host)
  377. }
  378. return hosts
  379. }
  380. // -------- Epsilon Value Calculators ----------
  381. func (c *LinearEpsilonValueCalculator) CalcValueFromAvgResponseTime(v float64) float64 {
  382. return 1.0 / v
  383. }
  384. func (c *LogEpsilonValueCalculator) CalcValueFromAvgResponseTime(v float64) float64 {
  385. return math.Log(c.LinearEpsilonValueCalculator.CalcValueFromAvgResponseTime(v))
  386. }
  387. func (c *PolynomialEpsilonValueCalculator) CalcValueFromAvgResponseTime(v float64) float64 {
  388. return math.Pow(c.LinearEpsilonValueCalculator.CalcValueFromAvgResponseTime(v), c.exp)
  389. }