瀏覽代碼

finally got it building with no errors, no clue if it works

Dan Frank 13 年之前
父節點
當前提交
3c3082c66c
共有 4 個文件被更改,包括 190 次插入196 次删除
  1. 1 1
      epsilon_decay.go
  2. 163 0
      epsilon_greedy.go
  3. 1 5
      host_entry.go
  4. 25 190
      hostpool.go

+ 1 - 1
epsilon_decay.go

@@ -42,7 +42,7 @@ type getWAScoreRequest struct {
 
 // -- "Constructor" --
 
-func New() EpsilonDecayStore {
+func NewDecayStore() EpsilonDecayStore {
 	store := &defEpsDecayStore{
 		epsilonCounts: make([]int64, epsilonBuckets),
 		epsilonValues: make([]float64, epsilonBuckets),

+ 163 - 0
epsilon_greedy.go

@@ -0,0 +1,163 @@
+package hostpool
+
+import (
+	"log"
+	"math/rand"
+	"time"
+)
+
+type epsilonGreedyHostEntry interface {
+	HostEntry
+	EpsilonDecayStore
+}
+
+type epsilonGreedyHostPool struct {
+	HostPool
+	hosts                  map[string]epsilonGreedyHostEntry // this basically just mirrors the underlying host map
+	epsilon                float32                           // this is our exploration factor
+	EpsilonValueCalculator                                   // embed the epsilonValueCalculator
+	timer
+}
+
+type epsilonHostPoolResponse struct {
+	standardHostPoolResponse
+	started time.Time
+	ended   time.Time
+}
+
+// ------ constants -------------------
+
+const epsilonDecay = 0.90 // decay the exploration rate
+const minEpsilon = 0.01   // explore one percent of the time
+const initialEpsilon = 0.3
+
+func toEGHostEntry(fromHE HostEntry) epsilonGreedyHostEntry {
+	return &struct {
+		HostEntry
+		EpsilonDecayStore
+	}{
+		fromHE,
+		NewDecayStore(),
+	}
+}
+
+// Epsilon Greedy is an algorithim that allows HostPool not only to track failure state, 
+// but also to learn about "better" options in terms of speed, and to pick from available hosts
+// based on a percentage of how well they perform. This gives a weighted request rate to better
+// performing hosts, while still distributing requests to all hosts (proportionate to their performance)
+// 
+// After enabling Epsilon Greedy, hosts must be marked for sucess along with a time value representing 
+// how fast (or slow) that host was. 
+// 
+// host := pool.Get()
+// start := time.Now()
+// ..... do work with host
+// duration = time.Now().Sub(start)
+// pool.MarkSuccessWithTime(host, duration)
+// 
+// a good overview of Epsilon Greedy is here http://stevehanov.ca/blog/index.php?id=132
+//
+// decayDuration may be set to 0 to use the default value of 5 minutes
+func NewEpsilonGreedy(hosts []string, decayDuration time.Duration, calc EpsilonValueCalculator) HostPool {
+	return ToEpsilonGreedy(New(hosts), decayDuration, calc)
+}
+
+func ToEpsilonGreedy(pool HostPool, decayDuration time.Duration, calc EpsilonValueCalculator) HostPool {
+	if decayDuration <= 0 {
+		decayDuration = defaultDecayDuration
+	}
+	p := &epsilonGreedyHostPool{
+		HostPool:               pool,
+		epsilon:                float32(initialEpsilon),
+		EpsilonValueCalculator: calc,
+		timer:                  &realTimer{},
+	}
+	for _, hostName := range p.Hosts() {
+		p.hosts[hostName] = toEGHostEntry(p.lookupHost(hostName))
+	}
+	return p
+}
+
+// ---------
+
+func (p *epsilonGreedyHostPool) getEpsilonGreedy() string {
+
+	// this is our exploration phase
+	if rand.Float32() < p.epsilon {
+		p.epsilon = p.epsilon * epsilonDecay
+		if p.epsilon < minEpsilon {
+			p.epsilon = minEpsilon
+		}
+		return p.HostPool.Get().Host()
+	}
+
+	// calculate values for each host in the 0..1 range (but not ormalized)
+	type epsilonChoice struct {
+		he                HostEntry
+		epsilonValue      float64
+		epsilonPercentage float64
+	}
+	var hostToUse epsilonChoice
+	var possibleHosts []epsilonChoice
+	now := time.Now()
+	var sumValues float64
+	for _, h := range p.hosts {
+		ec := epsilonChoice{he: h}
+		if h.canTryHost(now) {
+			v := h.GetWeightedAvgScore() // score is the response time
+			if v > 0 {
+				ev := p.CalcValueFromAvgResponseTime(v)
+				ec.epsilonValue = ev
+				sumValues += ev
+				possibleHosts = append(possibleHosts, ec)
+			}
+		}
+	}
+
+	if len(possibleHosts) != 0 {
+		// now normalize to the 0..1 range to get a percentage
+		for _, ec := range possibleHosts {
+			ec.epsilonPercentage = ec.epsilonValue / sumValues
+		}
+
+		// do a weighted random choice among hosts
+		ceiling := 0.0
+		pickPercentage := rand.Float64()
+		for _, ec := range possibleHosts {
+			ceiling += ec.epsilonPercentage
+			if pickPercentage <= ceiling {
+				hostToUse = ec
+				break
+			}
+		}
+	}
+
+	if hostToUse.he == nil {
+		if len(possibleHosts) != 0 {
+			log.Println("Failed to randomly choose a host, Dan loses")
+		}
+		return p.HostPool.Get().Host()
+	}
+
+	if hostToUse.he.IsDead() {
+		hostToUse.he.willRetryHost()
+	}
+	return hostToUse.he.Host()
+}
+
+func (p *epsilonGreedyHostPool) markSuccess(hostR HostPoolResponse) {
+	// first do the base markSuccess - a little redundant with host lookup but cleaner than repeating logic
+	p.HostPool.markSuccess(hostR)
+	eHostR, ok := hostR.(*epsilonHostPoolResponse)
+	if !ok {
+		log.Printf("Incorrect type in eps markSuccess!") // TODO reflection to print out offending type
+		return
+	}
+	host := eHostR.host
+	duration := p.between(eHostR.started, eHostR.ended)
+
+	p.Lock()
+	defer p.Unlock()
+	h := p.hosts[host]
+	h.Record(duration.Seconds())
+}

+ 1 - 5
host_entry.go

@@ -48,11 +48,7 @@ type hostEntry struct {
 	initialRetryDelay time.Duration
 	maxRetryInterval  time.Duration
 	dead              bool
-	// epsilonCounts     []int64
-	// epsilonValues     []int64
-	// epsilonIndex      int
-	// epsilonValue      float64
-	// epsilonPercentage float64
+
 	incomingRequests chan hostEntryRequest
 }
 

+ 25 - 190
hostpool.go

@@ -3,7 +3,7 @@ package hostpool
 import (
 	"log"
 	"math"
-	"math/rand"
+	"sort"
 	"sync"
 	"time"
 )
@@ -30,12 +30,6 @@ type standardHostPoolResponse struct {
 	pool HostPool
 }
 
-type epsilonHostPoolResponse struct {
-	standardHostPoolResponse
-	started time.Time
-	ended   time.Time
-}
-
 // --- HostPool structs and interfaces ----
 
 type HostPool interface {
@@ -47,26 +41,18 @@ type HostPool interface {
 	ResetAll()
 	Hosts() []string
 	lookupHost(string) HostEntry
+	// setHostMap(map[string]HostEntry)
 	sync.Locker
 }
 
 type standardHostPool struct {
 	sync.RWMutex
 	hosts             map[string]HostEntry
-	hostList          []HostEntry
 	initialRetryDelay time.Duration
 	maxRetryInterval  time.Duration
 	nextHostIndex     int
 }
 
-type epsilonGreedyHostPool struct {
-	HostPool
-	epsilon                float32 // this is our exploration factor
-	decayDuration          time.Duration
-	EpsilonValueCalculator // embed the epsilonValueCalculator
-	timer
-}
-
 // --- Value Calculators -----------------
 
 type EpsilonValueCalculator interface {
@@ -80,26 +66,16 @@ type PolynomialEpsilonValueCalculator struct {
 	exp float64 // the exponent to which we will raise the value to reweight
 }
 
-// ------ constants -------------------
-
-const epsilonBuckets = 120
-const epsilonDecay = 0.90 // decay the exploration rate
-const minEpsilon = 0.01   // explore one percent of the time
-const initialEpsilon = 0.3
-const defaultDecayDuration = time.Duration(5) * time.Minute
-
 func New(hosts []string) HostPool {
 	p := &standardHostPool{
 		hosts:             make(map[string]HostEntry, len(hosts)),
-		hostList:          make([]HostEntry, len(hosts)),
 		initialRetryDelay: time.Duration(30) * time.Second,
 		maxRetryInterval:  time.Duration(900) * time.Second,
 	}
 
-	for i, h := range hosts {
+	for _, h := range hosts {
 		e := newHostEntry(h, p.initialRetryDelay, p.maxRetryInterval)
 		p.hosts[h] = e
-		p.hostList[i] = e
 	}
 
 	return p
@@ -135,46 +111,6 @@ func (r *epsilonHostPoolResponse) Mark(err error) {
 
 }
 
-// Epsilon Greedy is an algorithim that allows HostPool not only to track failure state, 
-// but also to learn about "better" options in terms of speed, and to pick from available hosts
-// based on a percentage of how well they perform. This gives a weighted request rate to better
-// performing hosts, while still distributing requests to all hosts (proportionate to their performance)
-// 
-// After enabling Epsilon Greedy, hosts must be marked for sucess along with a time value representing 
-// how fast (or slow) that host was. 
-// 
-// host := pool.Get()
-// start := time.Now()
-// ..... do work with host
-// duration = time.Now().Sub(start)
-// pool.MarkSuccessWithTime(host, duration)
-// 
-// a good overview of Epsilon Greedy is here http://stevehanov.ca/blog/index.php?id=132
-//
-// decayDuration may be set to 0 to use the default value of 5 minutes
-func NewEpsilonGreedy(hosts []string, decayDuration time.Duration, calc EpsilonValueCalculator) HostPool {
-
-	if decayDuration <= 0 {
-		decayDuration = defaultDecayDuration
-	}
-	// stdHP := New(hosts).(*standardHostPool)
-	p := &epsilonGreedyHostPool{
-		HostPool:               New(hosts),
-		epsilon:                float32(initialEpsilon),
-		decayDuration:          decayDuration,
-		EpsilonValueCalculator: calc,
-		timer:                  &realTimer{},
-	}
-
-	// allocate structures
-	for _, h := range p.hostList {
-		h.epsilonCounts = make([]int64, epsilonBuckets)
-		h.epsilonValues = make([]int64, epsilonBuckets)
-	}
-	go p.epsilonGreedyDecay()
-	return p
-}
-
 func (rt *realTimer) between(start time.Time, end time.Time) time.Duration {
 	return end.Sub(start)
 }
@@ -185,25 +121,6 @@ func (p *epsilonGreedyHostPool) SetEpsilon(newEpsilon float32) {
 	p.epsilon = newEpsilon
 }
 
-func (p *epsilonGreedyHostPool) epsilonGreedyDecay() {
-	durationPerBucket := p.decayDuration / epsilonBuckets
-	ticker := time.Tick(durationPerBucket)
-	for {
-		<-ticker
-		p.performEpsilonGreedyDecay()
-	}
-}
-func (p *epsilonGreedyHostPool) performEpsilonGreedyDecay() {
-	p.Lock()
-	for _, h := range p.hostList {
-		h.epsilonIndex += 1
-		h.epsilonIndex = h.epsilonIndex % epsilonBuckets
-		h.epsilonCounts[h.epsilonIndex] = 0
-		h.epsilonValues[h.epsilonIndex] = 0
-	}
-	p.Unlock()
-}
-
 // return an upstream entry from the HostPool
 func (p *standardHostPool) Get() HostPoolResponse {
 	p.Lock()
@@ -231,12 +148,12 @@ func (p *standardHostPool) getRoundRobin() string {
 	// Question - should I just skip the goroutine shit and select randomly?
 	// Maybe
 	now := time.Now()
-	hostCount := len(p.hostList)
-	for i := range p.hostList {
+	hostCount := len(p.hosts)
+	for i := range p.hostList() {
 		// iterate via sequenece from where we last iterated
 		currentIndex := (i + p.nextHostIndex) % hostCount
 
-		h := p.hostList[currentIndex]
+		h := p.hostList()[currentIndex]
 		if h.canTryHost(now) {
 			if h.IsDead() {
 				h.willRetryHost()
@@ -249,87 +166,7 @@ func (p *standardHostPool) getRoundRobin() string {
 	// all hosts are down. re-add them
 	p.doResetAll()
 	p.nextHostIndex = 0
-	return p.hostList[0].Host()
-}
-
-func (p *epsilonGreedyHostPool) getEpsilonGreedy() string {
-	var hostToUse *hostEntry
-
-	// this is our exploration phase
-	if rand.Float32() < p.epsilon {
-		p.epsilon = p.epsilon * epsilonDecay
-		if p.epsilon < minEpsilon {
-			p.epsilon = minEpsilon
-		}
-		return p.HostPool.Get().Host()
-	}
-
-	// calculate values for each host in the 0..1 range (but not ormalized)
-	var possibleHosts []*hostEntry
-	now := time.Now()
-	var sumValues float64
-	for _, h := range p.hostList {
-		if h.canTryHost(now) {
-			v := h.getWeightedAverageResponseTime()
-			if v > 0 {
-				ev := p.CalcValueFromAvgResponseTime(v)
-				h.epsilonValue = ev
-				sumValues += ev
-				possibleHosts = append(possibleHosts, h)
-			}
-		}
-	}
-
-	if len(possibleHosts) != 0 {
-		// now normalize to the 0..1 range to get a percentage
-		for _, h := range possibleHosts {
-			h.epsilonPercentage = h.epsilonValue / sumValues
-		}
-
-		// do a weighted random choice among hosts
-		ceiling := 0.0
-		pickPercentage := rand.Float64()
-		for _, h := range possibleHosts {
-			ceiling += h.epsilonPercentage
-			if pickPercentage <= ceiling {
-				hostToUse = h
-				break
-			}
-		}
-	}
-
-	if hostToUse == nil {
-		if len(possibleHosts) != 0 {
-			log.Println("Failed to randomly choose a host, Dan loses")
-		}
-		return p.HostPool.Get().Host()
-	}
-
-	if hostToUse.dead {
-		hostToUse.willRetryHost()
-	}
-	return hostToUse.host
-}
-
-func (h *hostEntry) getWeightedAverageResponseTime() float64 {
-	var value float64
-	var lastValue float64
-
-	// start at 1 so we start with the oldest entry
-	for i := 1; i <= epsilonBuckets; i += 1 {
-		pos := (h.epsilonIndex + i) % epsilonBuckets
-		bucketCount := h.epsilonCounts[pos]
-		// Changing the line below to what I think it should be to get the weights right
-		weight := float64(i) / float64(epsilonBuckets)
-		if bucketCount > 0 {
-			currentValue := float64(h.epsilonValues[pos]) / float64(bucketCount)
-			value += currentValue * weight
-			lastValue = currentValue
-		} else {
-			value += lastValue * weight
-		}
-	}
-	return value
+	return p.hostList()[0].Host()
 }
 
 func (p *standardHostPool) ResetAll() {
@@ -359,24 +196,6 @@ func (p *standardHostPool) markSuccess(hostR HostPoolResponse) {
 	h.SetDead(false)
 }
 
-func (p *epsilonGreedyHostPool) markSuccess(hostR HostPoolResponse) {
-	// first do the base markSuccess - a little redundant with host lookup but cleaner than repeating logic
-	p.HostPool.markSuccess(hostR)
-	eHostR, ok := hostR.(*epsilonHostPoolResponse)
-	if !ok {
-		log.Printf("Incorrect type in eps markSuccess!") // TODO reflection to print out offending type
-		return
-	}
-	host := eHostR.host
-	duration := p.between(eHostR.started, eHostR.ended)
-
-	p.Lock()
-	defer p.Unlock()
-	h := p.lookupHost(host)
-	h.epsilonCounts[h.epsilonIndex]++
-	h.epsilonValues[h.epsilonIndex] += int64(duration.Seconds() * 1000)
-}
-
 func (p *standardHostPool) markFailed(hostR HostPoolResponse) {
 	host := hostR.Host()
 	p.Lock()
@@ -398,13 +217,29 @@ func (p *standardHostPool) Hosts() []string {
 
 func (p *standardHostPool) lookupHost(hostname string) HostEntry {
 	// We can do a "simple" lookup here because this map doesn't change once init'd
-	h, ok := p.hosts[host]
+	h, ok := p.hosts[hostname]
 	if !ok {
-		log.Fatalf("host %s not in HostPool %v", host, p.Hosts())
+		log.Fatalf("host %s not in HostPool %v", hostname, p.Hosts())
 	}
 	return h
 }
 
+func (p *standardHostPool) hostList() []HostEntry {
+	// This returns a sorted list of HostEntry's. We ought
+	// to do some optimization so that this isn't computed every time
+	// TODO can totally use Hosts above to accomplish this
+	keys := make([]string, 0, len(p.hosts))
+	vals := make([]HostEntry, 0, len(p.hosts))
+	for hostName := range p.hosts {
+		keys = append(keys, hostName)
+	}
+	sort.Strings(keys)
+	for _, k := range keys {
+		vals = append(vals, p.hosts[k])
+	}
+	return vals
+}
+
 // -------- Epsilon Value Calculators ----------
 
 func (c *LinearEpsilonValueCalculator) CalcValueFromAvgResponseTime(v float64) float64 {