hostpool.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420
  1. package hostpool
  2. import (
  3. "log"
  4. "math"
  5. "math/rand"
  6. "sync"
  7. "time"
  8. )
  9. // --- timer: this just exists for testing
  10. type timer interface {
  11. between(time.Time, time.Time) time.Duration
  12. }
  13. type realTimer struct{}
  14. // --- Response interfaces and structs ----
  15. type HostPoolResponse interface {
  16. Host() string
  17. Mark(error)
  18. hostPool() HostPool
  19. }
  20. type standardHostPoolResponse struct {
  21. host string
  22. sync.Once
  23. pool HostPool
  24. }
  25. type epsilonHostPoolResponse struct {
  26. standardHostPoolResponse
  27. started time.Time
  28. ended time.Time
  29. }
  30. // --- HostPool structs and interfaces ----
  31. type HostPool interface {
  32. Get() HostPoolResponse
  33. // keep the marks separate so we can override independently
  34. markSuccess(HostPoolResponse)
  35. markFailed(HostPoolResponse)
  36. ResetAll()
  37. Hosts() []string
  38. lookupHost(string) HostEntry
  39. sync.Locker
  40. }
  41. type standardHostPool struct {
  42. sync.RWMutex
  43. hosts map[string]HostEntry
  44. hostList []HostEntry
  45. initialRetryDelay time.Duration
  46. maxRetryInterval time.Duration
  47. nextHostIndex int
  48. }
  49. type epsilonGreedyHostPool struct {
  50. HostPool
  51. epsilon float32 // this is our exploration factor
  52. decayDuration time.Duration
  53. EpsilonValueCalculator // embed the epsilonValueCalculator
  54. timer
  55. }
  56. // --- Value Calculators -----------------
  57. type EpsilonValueCalculator interface {
  58. CalcValueFromAvgResponseTime(float64) float64
  59. }
  60. type LinearEpsilonValueCalculator struct{}
  61. type LogEpsilonValueCalculator struct{ LinearEpsilonValueCalculator }
  62. type PolynomialEpsilonValueCalculator struct {
  63. LinearEpsilonValueCalculator
  64. exp float64 // the exponent to which we will raise the value to reweight
  65. }
  66. // ------ constants -------------------
  67. const epsilonBuckets = 120
  68. const epsilonDecay = 0.90 // decay the exploration rate
  69. const minEpsilon = 0.01 // explore one percent of the time
  70. const initialEpsilon = 0.3
  71. const defaultDecayDuration = time.Duration(5) * time.Minute
  72. func New(hosts []string) HostPool {
  73. p := &standardHostPool{
  74. hosts: make(map[string]HostEntry, len(hosts)),
  75. hostList: make([]HostEntry, len(hosts)),
  76. initialRetryDelay: time.Duration(30) * time.Second,
  77. maxRetryInterval: time.Duration(900) * time.Second,
  78. }
  79. for i, h := range hosts {
  80. e := newHostEntry(h, p.initialRetryDelay, p.maxRetryInterval)
  81. p.hosts[h] = e
  82. p.hostList[i] = e
  83. }
  84. return p
  85. }
  86. func (r *standardHostPoolResponse) Host() string {
  87. return r.host
  88. }
  89. func (r *standardHostPoolResponse) hostPool() HostPool {
  90. return r.pool
  91. }
  92. func (r *standardHostPoolResponse) Mark(err error) {
  93. r.Do(func() {
  94. doMark(err, r)
  95. })
  96. }
  97. func doMark(err error, r HostPoolResponse) {
  98. if err == nil {
  99. r.hostPool().markSuccess(r)
  100. } else {
  101. r.hostPool().markFailed(r)
  102. }
  103. }
  104. func (r *epsilonHostPoolResponse) Mark(err error) {
  105. r.Do(func() {
  106. r.ended = time.Now()
  107. doMark(err, r)
  108. })
  109. }
  110. // Epsilon Greedy is an algorithim that allows HostPool not only to track failure state,
  111. // but also to learn about "better" options in terms of speed, and to pick from available hosts
  112. // based on a percentage of how well they perform. This gives a weighted request rate to better
  113. // performing hosts, while still distributing requests to all hosts (proportionate to their performance)
  114. //
  115. // After enabling Epsilon Greedy, hosts must be marked for sucess along with a time value representing
  116. // how fast (or slow) that host was.
  117. //
  118. // host := pool.Get()
  119. // start := time.Now()
  120. // ..... do work with host
  121. // duration = time.Now().Sub(start)
  122. // pool.MarkSuccessWithTime(host, duration)
  123. //
  124. // a good overview of Epsilon Greedy is here http://stevehanov.ca/blog/index.php?id=132
  125. //
  126. // decayDuration may be set to 0 to use the default value of 5 minutes
  127. func NewEpsilonGreedy(hosts []string, decayDuration time.Duration, calc EpsilonValueCalculator) HostPool {
  128. if decayDuration <= 0 {
  129. decayDuration = defaultDecayDuration
  130. }
  131. // stdHP := New(hosts).(*standardHostPool)
  132. p := &epsilonGreedyHostPool{
  133. HostPool: New(hosts),
  134. epsilon: float32(initialEpsilon),
  135. decayDuration: decayDuration,
  136. EpsilonValueCalculator: calc,
  137. timer: &realTimer{},
  138. }
  139. // allocate structures
  140. for _, h := range p.hostList {
  141. h.epsilonCounts = make([]int64, epsilonBuckets)
  142. h.epsilonValues = make([]int64, epsilonBuckets)
  143. }
  144. go p.epsilonGreedyDecay()
  145. return p
  146. }
  147. func (rt *realTimer) between(start time.Time, end time.Time) time.Duration {
  148. return end.Sub(start)
  149. }
  150. func (p *epsilonGreedyHostPool) SetEpsilon(newEpsilon float32) {
  151. p.Lock()
  152. defer p.Unlock()
  153. p.epsilon = newEpsilon
  154. }
  155. func (p *epsilonGreedyHostPool) epsilonGreedyDecay() {
  156. durationPerBucket := p.decayDuration / epsilonBuckets
  157. ticker := time.Tick(durationPerBucket)
  158. for {
  159. <-ticker
  160. p.performEpsilonGreedyDecay()
  161. }
  162. }
  163. func (p *epsilonGreedyHostPool) performEpsilonGreedyDecay() {
  164. p.Lock()
  165. for _, h := range p.hostList {
  166. h.epsilonIndex += 1
  167. h.epsilonIndex = h.epsilonIndex % epsilonBuckets
  168. h.epsilonCounts[h.epsilonIndex] = 0
  169. h.epsilonValues[h.epsilonIndex] = 0
  170. }
  171. p.Unlock()
  172. }
  173. // return an upstream entry from the HostPool
  174. func (p *standardHostPool) Get() HostPoolResponse {
  175. p.Lock()
  176. defer p.Unlock()
  177. host := p.getRoundRobin()
  178. return &standardHostPoolResponse{host: host, pool: p}
  179. }
  180. func (p *epsilonGreedyHostPool) Get() HostPoolResponse {
  181. p.Lock()
  182. defer p.Unlock()
  183. host := p.getEpsilonGreedy()
  184. started := time.Now()
  185. return &epsilonHostPoolResponse{
  186. standardHostPoolResponse: standardHostPoolResponse{host: host, pool: p},
  187. started: started,
  188. }
  189. }
  190. func (p *standardHostPool) getRoundRobin() string {
  191. // TODO - will want to replace this with something that runs in a
  192. // goroutine and receives requests on a channel.
  193. // The state being protected in that case is really just the currentIdx
  194. // Question - should I just skip the goroutine shit and select randomly?
  195. // Maybe
  196. now := time.Now()
  197. hostCount := len(p.hostList)
  198. for i := range p.hostList {
  199. // iterate via sequenece from where we last iterated
  200. currentIndex := (i + p.nextHostIndex) % hostCount
  201. h := p.hostList[currentIndex]
  202. if h.canTryHost(now) {
  203. if h.IsDead() {
  204. h.willRetryHost()
  205. }
  206. p.nextHostIndex = currentIndex + 1
  207. return h.Host()
  208. }
  209. }
  210. // all hosts are down. re-add them
  211. p.doResetAll()
  212. p.nextHostIndex = 0
  213. return p.hostList[0].Host()
  214. }
  215. func (p *epsilonGreedyHostPool) getEpsilonGreedy() string {
  216. var hostToUse *hostEntry
  217. // this is our exploration phase
  218. if rand.Float32() < p.epsilon {
  219. p.epsilon = p.epsilon * epsilonDecay
  220. if p.epsilon < minEpsilon {
  221. p.epsilon = minEpsilon
  222. }
  223. return p.HostPool.Get().Host()
  224. }
  225. // calculate values for each host in the 0..1 range (but not ormalized)
  226. var possibleHosts []*hostEntry
  227. now := time.Now()
  228. var sumValues float64
  229. for _, h := range p.hostList {
  230. if h.canTryHost(now) {
  231. v := h.getWeightedAverageResponseTime()
  232. if v > 0 {
  233. ev := p.CalcValueFromAvgResponseTime(v)
  234. h.epsilonValue = ev
  235. sumValues += ev
  236. possibleHosts = append(possibleHosts, h)
  237. }
  238. }
  239. }
  240. if len(possibleHosts) != 0 {
  241. // now normalize to the 0..1 range to get a percentage
  242. for _, h := range possibleHosts {
  243. h.epsilonPercentage = h.epsilonValue / sumValues
  244. }
  245. // do a weighted random choice among hosts
  246. ceiling := 0.0
  247. pickPercentage := rand.Float64()
  248. for _, h := range possibleHosts {
  249. ceiling += h.epsilonPercentage
  250. if pickPercentage <= ceiling {
  251. hostToUse = h
  252. break
  253. }
  254. }
  255. }
  256. if hostToUse == nil {
  257. if len(possibleHosts) != 0 {
  258. log.Println("Failed to randomly choose a host, Dan loses")
  259. }
  260. return p.HostPool.Get().Host()
  261. }
  262. if hostToUse.dead {
  263. hostToUse.willRetryHost()
  264. }
  265. return hostToUse.host
  266. }
  267. func (h *hostEntry) getWeightedAverageResponseTime() float64 {
  268. var value float64
  269. var lastValue float64
  270. // start at 1 so we start with the oldest entry
  271. for i := 1; i <= epsilonBuckets; i += 1 {
  272. pos := (h.epsilonIndex + i) % epsilonBuckets
  273. bucketCount := h.epsilonCounts[pos]
  274. // Changing the line below to what I think it should be to get the weights right
  275. weight := float64(i) / float64(epsilonBuckets)
  276. if bucketCount > 0 {
  277. currentValue := float64(h.epsilonValues[pos]) / float64(bucketCount)
  278. value += currentValue * weight
  279. lastValue = currentValue
  280. } else {
  281. value += lastValue * weight
  282. }
  283. }
  284. return value
  285. }
  286. func (p *standardHostPool) ResetAll() {
  287. p.Lock()
  288. defer p.Unlock()
  289. p.doResetAll()
  290. }
  291. // this actually performs the logic to reset,
  292. // and should only be called when the lock has
  293. // already been acquired
  294. func (p *standardHostPool) doResetAll() {
  295. for _, h := range p.hosts {
  296. h.SetDead(false)
  297. }
  298. }
  299. func (p *standardHostPool) markSuccess(hostR HostPoolResponse) {
  300. host := hostR.Host()
  301. p.Lock()
  302. defer p.Unlock()
  303. h, ok := p.hosts[host]
  304. if !ok {
  305. log.Fatalf("host %s not in HostPool %v", host, p.Hosts())
  306. }
  307. h.SetDead(false)
  308. }
  309. func (p *epsilonGreedyHostPool) markSuccess(hostR HostPoolResponse) {
  310. // first do the base markSuccess - a little redundant with host lookup but cleaner than repeating logic
  311. p.HostPool.markSuccess(hostR)
  312. eHostR, ok := hostR.(*epsilonHostPoolResponse)
  313. if !ok {
  314. log.Printf("Incorrect type in eps markSuccess!") // TODO reflection to print out offending type
  315. return
  316. }
  317. host := eHostR.host
  318. duration := p.between(eHostR.started, eHostR.ended)
  319. p.Lock()
  320. defer p.Unlock()
  321. h := p.lookupHost(host)
  322. h.epsilonCounts[h.epsilonIndex]++
  323. h.epsilonValues[h.epsilonIndex] += int64(duration.Seconds() * 1000)
  324. }
  325. func (p *standardHostPool) markFailed(hostR HostPoolResponse) {
  326. host := hostR.Host()
  327. p.Lock()
  328. defer p.Unlock()
  329. h, ok := p.hosts[host]
  330. if !ok {
  331. log.Fatalf("host %s not in HostPool %v", host, p.Hosts())
  332. }
  333. h.SetDead(true)
  334. }
  335. func (p *standardHostPool) Hosts() []string {
  336. hosts := make([]string, len(p.hosts))
  337. for host, _ := range p.hosts {
  338. hosts = append(hosts, host)
  339. }
  340. return hosts
  341. }
  342. func (p *standardHostPool) lookupHost(hostname string) HostEntry {
  343. // We can do a "simple" lookup here because this map doesn't change once init'd
  344. h, ok := p.hosts[host]
  345. if !ok {
  346. log.Fatalf("host %s not in HostPool %v", host, p.Hosts())
  347. }
  348. return h
  349. }
  350. // -------- Epsilon Value Calculators ----------
  351. func (c *LinearEpsilonValueCalculator) CalcValueFromAvgResponseTime(v float64) float64 {
  352. return 1.0 / v
  353. }
  354. func (c *LogEpsilonValueCalculator) CalcValueFromAvgResponseTime(v float64) float64 {
  355. return math.Log(c.LinearEpsilonValueCalculator.CalcValueFromAvgResponseTime(v))
  356. }
  357. func (c *PolynomialEpsilonValueCalculator) CalcValueFromAvgResponseTime(v float64) float64 {
  358. return math.Pow(c.LinearEpsilonValueCalculator.CalcValueFromAvgResponseTime(v), c.exp)
  359. }