| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251 |
- package hostpool
- import (
- "log"
- "math"
- "sort"
- "sync"
- "time"
- )
- // --- timer: this just exists for testing
- type timer interface {
- between(time.Time, time.Time) time.Duration
- }
- type realTimer struct{}
- // --- Response interfaces and structs ----
- type HostPoolResponse interface {
- Host() string
- Mark(error)
- hostPool() HostPool
- }
- type standardHostPoolResponse struct {
- host string
- sync.Once
- pool HostPool
- }
- // --- HostPool structs and interfaces ----
- type HostPool interface {
- Get() HostPoolResponse
- // keep the marks separate so we can override independently
- markSuccess(HostPoolResponse)
- markFailed(HostPoolResponse)
- ResetAll()
- Hosts() []string
- lookupHost(string) HostEntry
- Close()
- }
- type standardHostPool struct {
- hosts map[string]HostEntry
- initialRetryDelay time.Duration
- maxRetryInterval time.Duration
- rrResults chan string
- closeChan chan struct{}
- wg sync.WaitGroup
- }
- // --- Value Calculators -----------------
- type EpsilonValueCalculator interface {
- CalcValueFromAvgResponseTime(float64) float64
- }
- type LinearEpsilonValueCalculator struct{}
- type LogEpsilonValueCalculator struct{ LinearEpsilonValueCalculator }
- type PolynomialEpsilonValueCalculator struct {
- LinearEpsilonValueCalculator
- exp float64 // the exponent to which we will raise the value to reweight
- }
- func New(hosts []string) HostPool {
- p := &standardHostPool{
- hosts: make(map[string]HostEntry, len(hosts)),
- initialRetryDelay: time.Duration(30) * time.Second,
- maxRetryInterval: time.Duration(900) * time.Second,
- rrResults: make(chan string),
- closeChan: make(chan struct{}),
- }
- for _, h := range hosts {
- e := newHostEntry(h, p.initialRetryDelay, p.maxRetryInterval)
- p.hosts[h] = e
- }
- p.wg.Add(1)
- go p.serveRoundRobin()
- return p
- }
- func (r *standardHostPoolResponse) Host() string {
- return r.host
- }
- func (r *standardHostPoolResponse) hostPool() HostPool {
- return r.pool
- }
- func (r *standardHostPoolResponse) Mark(err error) {
- r.Do(func() {
- doMark(err, r)
- })
- }
- func doMark(err error, r HostPoolResponse) {
- if err == nil {
- r.hostPool().markSuccess(r)
- } else {
- r.hostPool().markFailed(r)
- }
- }
- func (r *epsilonHostPoolResponse) Mark(err error) {
- r.Do(func() {
- r.ended = time.Now()
- doMark(err, r)
- })
- }
- func (rt *realTimer) between(start time.Time, end time.Time) time.Duration {
- return end.Sub(start)
- }
- // return an upstream entry from the HostPool
- func (p *standardHostPool) Get() HostPoolResponse {
- host := p.getRoundRobin()
- return &standardHostPoolResponse{host: host, pool: p}
- }
- func (p *epsilonGreedyHostPool) Get() HostPoolResponse {
- host := p.getEpsilonGreedy()
- started := time.Now()
- return &epsilonHostPoolResponse{
- standardHostPoolResponse: standardHostPoolResponse{host: host, pool: p},
- started: started,
- }
- }
- func (p *standardHostPool) getRoundRobin() string {
- return <-p.rrResults
- }
- func (p *standardHostPool) serveRoundRobin() {
- nextHostIndex := 0
- getHostToServe := func() string {
- hostCount := len(p.hosts)
- for i := range p.hostList() {
- // iterate via sequenece from where we last iterated
- currentIndex := (i + nextHostIndex) % hostCount
- h := p.hostList()[currentIndex]
- if h.canTryHost(time.Now()) {
- if h.IsDead() {
- h.willRetryHost()
- }
- nextHostIndex = currentIndex + 1
- return h.Host()
- }
- }
- // all hosts are down. re-add them
- p.ResetAll()
- nextHostIndex = 0
- return p.hostList()[0].Host()
- }
- for {
- select {
- case p.rrResults <- getHostToServe():
- case <-p.closeChan:
- p.wg.Done()
- return
- }
- }
- }
- func (p *standardHostPool) ResetAll() {
- // SetDead is threadsafe
- for _, h := range p.hosts {
- h.SetDead(false)
- }
- }
- func (p *standardHostPool) markSuccess(hostR HostPoolResponse) {
- host := hostR.Host()
- h, ok := p.hosts[host]
- if !ok {
- log.Fatalf("host %s not in HostPool %v", host, p.Hosts())
- }
- h.SetDead(false)
- }
- func (p *standardHostPool) markFailed(hostR HostPoolResponse) {
- host := hostR.Host()
- h, ok := p.hosts[host]
- if !ok {
- log.Fatalf("host %s not in HostPool %v", host, p.Hosts())
- }
- h.SetDead(true)
- }
- func (p *standardHostPool) Hosts() []string {
- hosts := make([]string, 0, len(p.hosts))
- for host, _ := range p.hosts {
- hosts = append(hosts, host)
- }
- return hosts
- }
- func (p *standardHostPool) lookupHost(hostname string) HostEntry {
- // We can do a "simple" lookup here because this map doesn't change once init'd
- h, ok := p.hosts[hostname]
- if !ok {
- log.Fatalf("host %s not in HostPool %v", hostname, p.Hosts())
- }
- return h
- }
- func (p *standardHostPool) hostList() []HostEntry {
- // This returns a sorted list of HostEntry's. We ought
- // to do some optimization so that this isn't computed every time
- keys := make([]string, 0, len(p.hosts))
- vals := make([]HostEntry, 0, len(p.hosts))
- for hostName := range p.hosts {
- keys = append(keys, hostName)
- }
- sort.Strings(keys)
- for _, k := range keys {
- vals = append(vals, p.hosts[k])
- }
- return vals
- }
- func (p *standardHostPool) Close() {
- p.closeChan <- struct{}{}
- for _, he := range p.hosts {
- he.Close()
- }
- p.wg.Wait()
- }
- // -------- Epsilon Value Calculators ----------
- func (c *LinearEpsilonValueCalculator) CalcValueFromAvgResponseTime(v float64) float64 {
- return 1.0 / v
- }
- func (c *LogEpsilonValueCalculator) CalcValueFromAvgResponseTime(v float64) float64 {
- return math.Log(c.LinearEpsilonValueCalculator.CalcValueFromAvgResponseTime(v))
- }
- func (c *PolynomialEpsilonValueCalculator) CalcValueFromAvgResponseTime(v float64) float64 {
- return math.Pow(c.LinearEpsilonValueCalculator.CalcValueFromAvgResponseTime(v), c.exp)
- }
|