Prechádzať zdrojové kódy

split into multiple files

Dan Frank 12 rokov pred
rodič
commit
6d43f34e8e
4 zmenil súbory, kde vykonal 293 pridanie a 276 odobranie
  1. 197 0
      epsilon_greedy.go
  2. 34 0
      epsilon_value_calculators.go
  3. 62 0
      host_entry.go
  4. 0 276
      hostpool.go

+ 197 - 0
epsilon_greedy.go

@@ -0,0 +1,197 @@
+package hostpool
+
+import (
+	"log"
+	"math/rand"
+	"time"
+)
+
+type epsilonHostPoolResponse struct {
+	standardHostPoolResponse
+	started time.Time
+	ended   time.Time
+}
+
+func (r *epsilonHostPoolResponse) Mark(err error) {
+	r.Do(func() {
+		r.ended = time.Now()
+		doMark(err, r)
+	})
+
+}
+
+type epsilonGreedyHostPool struct {
+	standardHostPool               // TODO - would be nifty if we could embed HostPool and Locker interfaces
+	epsilon                float32 // this is our exploration factor
+	decayDuration          time.Duration
+	EpsilonValueCalculator // embed the epsilonValueCalculator
+	timer
+}
+
+// Epsilon Greedy is an algorithim that allows HostPool not only to track failure state, 
+// but also to learn about "better" options in terms of speed, and to pick from available hosts
+// based on a percentage of how well they perform. This gives a weighted request rate to better
+// performing hosts, while still distributing requests to all hosts (proportionate to their performance)
+// 
+// After enabling Epsilon Greedy, hosts must be marked for sucess along with a time value representing 
+// how fast (or slow) that host was. 
+// 
+// host := pool.Get()
+// start := time.Now()
+// ..... do work with host
+// duration = time.Now().Sub(start)
+// pool.MarkSuccessWithTime(host, duration)
+// 
+// a good overview of Epsilon Greedy is here http://stevehanov.ca/blog/index.php?id=132
+//
+// decayDuration may be set to 0 to use the default value of 5 minutes
+func NewEpsilonGreedy(hosts []string, decayDuration time.Duration, calc EpsilonValueCalculator) HostPool {
+
+	if decayDuration <= 0 {
+		decayDuration = defaultDecayDuration
+	}
+	stdHP := New(hosts).(*standardHostPool)
+	p := &epsilonGreedyHostPool{
+		standardHostPool:       *stdHP,
+		epsilon:                float32(initialEpsilon),
+		decayDuration:          decayDuration,
+		EpsilonValueCalculator: calc,
+		timer:                  &realTimer{},
+	}
+
+	// allocate structures
+	for _, h := range p.hostList {
+		h.epsilonCounts = make([]int64, epsilonBuckets)
+		h.epsilonValues = make([]int64, epsilonBuckets)
+	}
+	go p.epsilonGreedyDecay()
+	return p
+}
+
+func (p *epsilonGreedyHostPool) SetEpsilon(newEpsilon float32) {
+	p.Lock()
+	defer p.Unlock()
+	p.epsilon = newEpsilon
+}
+
+func (p *epsilonGreedyHostPool) epsilonGreedyDecay() {
+	durationPerBucket := p.decayDuration / epsilonBuckets
+	ticker := time.Tick(durationPerBucket)
+	for {
+		<-ticker
+		p.performEpsilonGreedyDecay()
+	}
+}
+func (p *epsilonGreedyHostPool) performEpsilonGreedyDecay() {
+	p.Lock()
+	for _, h := range p.hostList {
+		h.epsilonIndex += 1
+		h.epsilonIndex = h.epsilonIndex % epsilonBuckets
+		h.epsilonCounts[h.epsilonIndex] = 0
+		h.epsilonValues[h.epsilonIndex] = 0
+	}
+	p.Unlock()
+}
+
+func (p *epsilonGreedyHostPool) Get() HostPoolResponse {
+	p.Lock()
+	defer p.Unlock()
+	host := p.getEpsilonGreedy()
+	started := time.Now()
+	return &epsilonHostPoolResponse{
+		standardHostPoolResponse: standardHostPoolResponse{host: host, pool: p},
+		started:                  started,
+	}
+}
+
+func (p *epsilonGreedyHostPool) getEpsilonGreedy() string {
+	var hostToUse *hostEntry
+
+	// this is our exploration phase
+	if rand.Float32() < p.epsilon {
+		p.epsilon = p.epsilon * epsilonDecay
+		if p.epsilon < minEpsilon {
+			p.epsilon = minEpsilon
+		}
+		return p.getRoundRobin()
+	}
+
+	// calculate values for each host in the 0..1 range (but not ormalized)
+	var possibleHosts []*hostEntry
+	now := time.Now()
+	var sumValues float64
+	for _, h := range p.hostList {
+		if h.canTryHost(now) {
+			v := h.getWeightedAverageResponseTime()
+			if v > 0 {
+				ev := p.CalcValueFromAvgResponseTime(v)
+				h.epsilonValue = ev
+				sumValues += ev
+				possibleHosts = append(possibleHosts, h)
+			}
+		}
+	}
+
+	if len(possibleHosts) != 0 {
+		// now normalize to the 0..1 range to get a percentage
+		for _, h := range possibleHosts {
+			h.epsilonPercentage = h.epsilonValue / sumValues
+		}
+
+		// do a weighted random choice among hosts
+		ceiling := 0.0
+		pickPercentage := rand.Float64()
+		for _, h := range possibleHosts {
+			ceiling += h.epsilonPercentage
+			if pickPercentage <= ceiling {
+				hostToUse = h
+				break
+			}
+		}
+	}
+
+	if hostToUse == nil {
+		if len(possibleHosts) != 0 {
+			log.Println("Failed to randomly choose a host, Dan loses")
+		}
+		return p.getRoundRobin()
+	}
+
+	if hostToUse.dead {
+		hostToUse.willRetryHost(p.maxRetryInterval)
+	}
+	return hostToUse.host
+}
+
+func (p *epsilonGreedyHostPool) markSuccess(hostR HostPoolResponse) {
+	// first do the base markSuccess - a little redundant with host lookup but cleaner than repeating logic
+	p.standardHostPool.markSuccess(hostR)
+	eHostR, ok := hostR.(*epsilonHostPoolResponse)
+	if !ok {
+		log.Printf("Incorrect type in eps markSuccess!") // TODO reflection to print out offending type
+		return
+	}
+	host := eHostR.host
+	duration := p.between(eHostR.started, eHostR.ended)
+
+	p.Lock()
+	defer p.Unlock()
+	h, ok := p.hosts[host]
+	if !ok {
+		log.Fatalf("host %s not in HostPool %v", host, p.Hosts())
+	}
+	h.epsilonCounts[h.epsilonIndex]++
+	h.epsilonValues[h.epsilonIndex] += int64(duration.Seconds() * 1000)
+}
+
+// --- timer: this just exists for testing
+
+type timer interface {
+	between(time.Time, time.Time) time.Duration
+}
+
+type realTimer struct{}
+
+func (rt *realTimer) between(start time.Time, end time.Time) time.Duration {
+	return end.Sub(start)
+}

+ 34 - 0
epsilon_value_calculators.go

@@ -0,0 +1,34 @@
+package hostpool
+
+// --- Value Calculators -----------------
+
+import (
+	"math"
+)
+
+// --- Definitions -----------------------
+
+type EpsilonValueCalculator interface {
+	CalcValueFromAvgResponseTime(float64) float64
+}
+
+type LinearEpsilonValueCalculator struct{}
+type LogEpsilonValueCalculator struct{ LinearEpsilonValueCalculator }
+type PolynomialEpsilonValueCalculator struct {
+	LinearEpsilonValueCalculator
+	Exp float64 // the exponent to which we will raise the value to reweight
+}
+
+// -------- Methods -----------------------
+
+func (c *LinearEpsilonValueCalculator) CalcValueFromAvgResponseTime(v float64) float64 {
+	return 1.0 / v
+}
+
+func (c *LogEpsilonValueCalculator) CalcValueFromAvgResponseTime(v float64) float64 {
+	return math.Log(c.LinearEpsilonValueCalculator.CalcValueFromAvgResponseTime(v))
+}
+
+func (c *PolynomialEpsilonValueCalculator) CalcValueFromAvgResponseTime(v float64) float64 {
+	return math.Pow(c.LinearEpsilonValueCalculator.CalcValueFromAvgResponseTime(v), c.Exp)
+}

+ 62 - 0
host_entry.go

@@ -0,0 +1,62 @@
+package hostpool
+
+import (
+	"time"
+)
+
+// --- hostEntry - this is due to get upgraded
+
+type hostEntry struct {
+	host              string
+	nextRetry         time.Time
+	retryCount        int16
+	retryDelay        time.Duration
+	dead              bool
+	epsilonCounts     []int64
+	epsilonValues     []int64
+	epsilonIndex      int
+	epsilonValue      float64
+	epsilonPercentage float64
+}
+
+func (h *hostEntry) canTryHost(now time.Time) bool {
+	if !h.dead {
+		return true
+	}
+	if h.nextRetry.Before(now) {
+		return true
+	}
+	return false
+}
+
+func (h *hostEntry) willRetryHost(maxRetryInterval time.Duration) {
+	h.retryCount += 1
+	newDelay := h.retryDelay * 2
+	if newDelay < maxRetryInterval {
+		h.retryDelay = newDelay
+	} else {
+		h.retryDelay = maxRetryInterval
+	}
+	h.nextRetry = time.Now().Add(h.retryDelay)
+}
+
+func (h *hostEntry) getWeightedAverageResponseTime() float64 {
+	var value float64
+	var lastValue float64
+
+	// start at 1 so we start with the oldest entry
+	for i := 1; i <= epsilonBuckets; i += 1 {
+		pos := (h.epsilonIndex + i) % epsilonBuckets
+		bucketCount := h.epsilonCounts[pos]
+		// Changing the line below to what I think it should be to get the weights right
+		weight := float64(i) / float64(epsilonBuckets)
+		if bucketCount > 0 {
+			currentValue := float64(h.epsilonValues[pos]) / float64(bucketCount)
+			value += currentValue * weight
+			lastValue = currentValue
+		} else {
+			value += lastValue * weight
+		}
+	}
+	return value
+}

+ 0 - 276
hostpool.go

@@ -2,20 +2,10 @@ package hostpool
 
 import (
 	"log"
-	"math"
-	"math/rand"
 	"sync"
 	"time"
 )
 
-// --- timer: this just exists for testing
-
-type timer interface {
-	between(time.Time, time.Time) time.Duration
-}
-
-type realTimer struct{}
-
 // --- Response interfaces and structs ----
 
 type HostPoolResponse interface {
@@ -30,12 +20,6 @@ type standardHostPoolResponse struct {
 	pool HostPool
 }
 
-type epsilonHostPoolResponse struct {
-	standardHostPoolResponse
-	started time.Time
-	ended   time.Time
-}
-
 // --- HostPool structs and interfaces ----
 
 type HostPool interface {
@@ -57,42 +41,6 @@ type standardHostPool struct {
 	nextHostIndex     int
 }
 
-type epsilonGreedyHostPool struct {
-	standardHostPool               // TODO - would be nifty if we could embed HostPool and Locker interfaces
-	epsilon                float32 // this is our exploration factor
-	decayDuration          time.Duration
-	EpsilonValueCalculator // embed the epsilonValueCalculator
-	timer
-}
-
-// --- hostEntry - this is due to get upgraded
-
-type hostEntry struct {
-	host              string
-	nextRetry         time.Time
-	retryCount        int16
-	retryDelay        time.Duration
-	dead              bool
-	epsilonCounts     []int64
-	epsilonValues     []int64
-	epsilonIndex      int
-	epsilonValue      float64
-	epsilonPercentage float64
-}
-
-// --- Value Calculators -----------------
-
-type EpsilonValueCalculator interface {
-	CalcValueFromAvgResponseTime(float64) float64
-}
-
-type LinearEpsilonValueCalculator struct{}
-type LogEpsilonValueCalculator struct{ LinearEpsilonValueCalculator }
-type PolynomialEpsilonValueCalculator struct {
-	LinearEpsilonValueCalculator
-	exp float64 // the exponent to which we will raise the value to reweight
-}
-
 // ------ constants -------------------
 
 const epsilonBuckets = 120
@@ -143,83 +91,6 @@ func doMark(err error, r HostPoolResponse) {
 	}
 }
 
-func (r *epsilonHostPoolResponse) Mark(err error) {
-	r.Do(func() {
-		r.ended = time.Now()
-		doMark(err, r)
-	})
-
-}
-
-// Epsilon Greedy is an algorithim that allows HostPool not only to track failure state, 
-// but also to learn about "better" options in terms of speed, and to pick from available hosts
-// based on a percentage of how well they perform. This gives a weighted request rate to better
-// performing hosts, while still distributing requests to all hosts (proportionate to their performance)
-// 
-// After enabling Epsilon Greedy, hosts must be marked for sucess along with a time value representing 
-// how fast (or slow) that host was. 
-// 
-// host := pool.Get()
-// start := time.Now()
-// ..... do work with host
-// duration = time.Now().Sub(start)
-// pool.MarkSuccessWithTime(host, duration)
-// 
-// a good overview of Epsilon Greedy is here http://stevehanov.ca/blog/index.php?id=132
-//
-// decayDuration may be set to 0 to use the default value of 5 minutes
-func NewEpsilonGreedy(hosts []string, decayDuration time.Duration, calc EpsilonValueCalculator) HostPool {
-
-	if decayDuration <= 0 {
-		decayDuration = defaultDecayDuration
-	}
-	stdHP := New(hosts).(*standardHostPool)
-	p := &epsilonGreedyHostPool{
-		standardHostPool:       *stdHP,
-		epsilon:                float32(initialEpsilon),
-		decayDuration:          decayDuration,
-		EpsilonValueCalculator: calc,
-		timer:                  &realTimer{},
-	}
-
-	// allocate structures
-	for _, h := range p.hostList {
-		h.epsilonCounts = make([]int64, epsilonBuckets)
-		h.epsilonValues = make([]int64, epsilonBuckets)
-	}
-	go p.epsilonGreedyDecay()
-	return p
-}
-
-func (rt *realTimer) between(start time.Time, end time.Time) time.Duration {
-	return end.Sub(start)
-}
-
-func (p *epsilonGreedyHostPool) SetEpsilon(newEpsilon float32) {
-	p.Lock()
-	defer p.Unlock()
-	p.epsilon = newEpsilon
-}
-
-func (p *epsilonGreedyHostPool) epsilonGreedyDecay() {
-	durationPerBucket := p.decayDuration / epsilonBuckets
-	ticker := time.Tick(durationPerBucket)
-	for {
-		<-ticker
-		p.performEpsilonGreedyDecay()
-	}
-}
-func (p *epsilonGreedyHostPool) performEpsilonGreedyDecay() {
-	p.Lock()
-	for _, h := range p.hostList {
-		h.epsilonIndex += 1
-		h.epsilonIndex = h.epsilonIndex % epsilonBuckets
-		h.epsilonCounts[h.epsilonIndex] = 0
-		h.epsilonValues[h.epsilonIndex] = 0
-	}
-	p.Unlock()
-}
-
 // return an upstream entry from the HostPool
 func (p *standardHostPool) Get() HostPoolResponse {
 	p.Lock()
@@ -228,17 +99,6 @@ func (p *standardHostPool) Get() HostPoolResponse {
 	return &standardHostPoolResponse{host: host, pool: p}
 }
 
-func (p *epsilonGreedyHostPool) Get() HostPoolResponse {
-	p.Lock()
-	defer p.Unlock()
-	host := p.getEpsilonGreedy()
-	started := time.Now()
-	return &epsilonHostPoolResponse{
-		standardHostPoolResponse: standardHostPoolResponse{host: host, pool: p},
-		started:                  started,
-	}
-}
-
 func (p *standardHostPool) getRoundRobin() string {
 	now := time.Now()
 	hostCount := len(p.hostList)
@@ -264,107 +124,6 @@ func (p *standardHostPool) getRoundRobin() string {
 	return p.hostList[0].host
 }
 
-func (p *epsilonGreedyHostPool) getEpsilonGreedy() string {
-	var hostToUse *hostEntry
-
-	// this is our exploration phase
-	if rand.Float32() < p.epsilon {
-		p.epsilon = p.epsilon * epsilonDecay
-		if p.epsilon < minEpsilon {
-			p.epsilon = minEpsilon
-		}
-		return p.getRoundRobin()
-	}
-
-	// calculate values for each host in the 0..1 range (but not ormalized)
-	var possibleHosts []*hostEntry
-	now := time.Now()
-	var sumValues float64
-	for _, h := range p.hostList {
-		if h.canTryHost(now) {
-			v := h.getWeightedAverageResponseTime()
-			if v > 0 {
-				ev := p.CalcValueFromAvgResponseTime(v)
-				h.epsilonValue = ev
-				sumValues += ev
-				possibleHosts = append(possibleHosts, h)
-			}
-		}
-	}
-
-	if len(possibleHosts) != 0 {
-		// now normalize to the 0..1 range to get a percentage
-		for _, h := range possibleHosts {
-			h.epsilonPercentage = h.epsilonValue / sumValues
-		}
-
-		// do a weighted random choice among hosts
-		ceiling := 0.0
-		pickPercentage := rand.Float64()
-		for _, h := range possibleHosts {
-			ceiling += h.epsilonPercentage
-			if pickPercentage <= ceiling {
-				hostToUse = h
-				break
-			}
-		}
-	}
-
-	if hostToUse == nil {
-		if len(possibleHosts) != 0 {
-			log.Println("Failed to randomly choose a host, Dan loses")
-		}
-		return p.getRoundRobin()
-	}
-
-	if hostToUse.dead {
-		hostToUse.willRetryHost(p.maxRetryInterval)
-	}
-	return hostToUse.host
-}
-
-func (h *hostEntry) canTryHost(now time.Time) bool {
-	if !h.dead {
-		return true
-	}
-	if h.nextRetry.Before(now) {
-		return true
-	}
-	return false
-}
-
-func (h *hostEntry) willRetryHost(maxRetryInterval time.Duration) {
-	h.retryCount += 1
-	newDelay := h.retryDelay * 2
-	if newDelay < maxRetryInterval {
-		h.retryDelay = newDelay
-	} else {
-		h.retryDelay = maxRetryInterval
-	}
-	h.nextRetry = time.Now().Add(h.retryDelay)
-}
-
-func (h *hostEntry) getWeightedAverageResponseTime() float64 {
-	var value float64
-	var lastValue float64
-
-	// start at 1 so we start with the oldest entry
-	for i := 1; i <= epsilonBuckets; i += 1 {
-		pos := (h.epsilonIndex + i) % epsilonBuckets
-		bucketCount := h.epsilonCounts[pos]
-		// Changing the line below to what I think it should be to get the weights right
-		weight := float64(i) / float64(epsilonBuckets)
-		if bucketCount > 0 {
-			currentValue := float64(h.epsilonValues[pos]) / float64(bucketCount)
-			value += currentValue * weight
-			lastValue = currentValue
-		} else {
-			value += lastValue * weight
-		}
-	}
-	return value
-}
-
 func (p *standardHostPool) ResetAll() {
 	p.Lock()
 	defer p.Unlock()
@@ -392,27 +151,6 @@ func (p *standardHostPool) markSuccess(hostR HostPoolResponse) {
 	h.dead = false
 }
 
-func (p *epsilonGreedyHostPool) markSuccess(hostR HostPoolResponse) {
-	// first do the base markSuccess - a little redundant with host lookup but cleaner than repeating logic
-	p.standardHostPool.markSuccess(hostR)
-	eHostR, ok := hostR.(*epsilonHostPoolResponse)
-	if !ok {
-		log.Printf("Incorrect type in eps markSuccess!") // TODO reflection to print out offending type
-		return
-	}
-	host := eHostR.host
-	duration := p.between(eHostR.started, eHostR.ended)
-
-	p.Lock()
-	defer p.Unlock()
-	h, ok := p.hosts[host]
-	if !ok {
-		log.Fatalf("host %s not in HostPool %v", host, p.Hosts())
-	}
-	h.epsilonCounts[h.epsilonIndex]++
-	h.epsilonValues[h.epsilonIndex] += int64(duration.Seconds() * 1000)
-}
-
 func (p *standardHostPool) markFailed(hostR HostPoolResponse) {
 	host := hostR.Host()
 	p.Lock()
@@ -436,17 +174,3 @@ func (p *standardHostPool) Hosts() []string {
 	}
 	return hosts
 }
-
-// -------- Epsilon Value Calculators ----------
-
-func (c *LinearEpsilonValueCalculator) CalcValueFromAvgResponseTime(v float64) float64 {
-	return 1.0 / v
-}
-
-func (c *LogEpsilonValueCalculator) CalcValueFromAvgResponseTime(v float64) float64 {
-	return math.Log(c.LinearEpsilonValueCalculator.CalcValueFromAvgResponseTime(v))
-}
-
-func (c *PolynomialEpsilonValueCalculator) CalcValueFromAvgResponseTime(v float64) float64 {
-	return math.Pow(c.LinearEpsilonValueCalculator.CalcValueFromAvgResponseTime(v), c.exp)
-}