Просмотр исходного кода

Initial migration from Bitly codebase, plus travis yml

Dan Frank 13 лет назад
Родитель
Сommit
04638c1bf2
3 измененных файлов с 578 добавлено и 0 удалено
  1. 0 0
      .travis.yml
  2. 456 0
      hostpool.go
  3. 122 0
      hostpool_test.go

+ 0 - 0
.travis.yml


+ 456 - 0
hostpool.go

@@ -0,0 +1,456 @@
+package hostpool
+
+import (
+	"log"
+	"math"
+	"math/rand"
+	"sync"
+	"time"
+)
+
+// --- timer: this just exists for testing
+
+type timer interface {
+	between(time.Time, time.Time) time.Duration
+}
+
+type realTimer struct{}
+
+
+// --- Response interfaces and structs ----
+
+type HostPoolResponse interface {
+	Host() string
+	Mark(error)
+	hostPool() HostPool
+}
+
+type standardHostPoolResponse struct {
+	host string
+	sync.Once
+	pool HostPool
+}
+
+type epsilonHostPoolResponse struct {
+	standardHostPoolResponse
+	started time.Time
+	ended   time.Time
+}
+
+// --- HostPool structs and interfaces ----
+
+type HostPool interface {
+	Get() HostPoolResponse
+	// keep the marks separate so we can override independently
+	markSuccess(HostPoolResponse)
+	markFailed(HostPoolResponse)
+
+	ResetAll()
+	Hosts() []string
+}
+
+type standardHostPool struct {
+	sync.RWMutex
+	hosts             map[string]*hostEntry
+	hostList          []*hostEntry
+	initialRetryDelay time.Duration
+	maxRetryInterval  time.Duration
+	nextHostIndex     int
+}
+
+type epsilonGreedyHostPool struct {
+	standardHostPool               // TODO - would be nifty if we could embed HostPool and Locker interfaces
+	epsilon                float32 // this is our exploration factor
+	decayDuration          time.Duration
+	EpsilonValueCalculator // embed the epsilonValueCalculator
+	timer
+}
+
+// --- hostEntry - this is due to get upgraded
+
+type hostEntry struct {
+	host              string
+	nextRetry         time.Time
+	retryCount        int16
+	retryDelay        time.Duration
+	dead              bool
+	epsilonCounts     []int64
+	epsilonValues     []int64
+	epsilonIndex      int
+	epsilonValue      float64
+	epsilonPercentage float64
+}
+
+
+// --- Value Calculators -----------------
+
+type EpsilonValueCalculator interface {
+	CalcValueFromAvgResponseTime(float64) float64
+}
+
+type LinearEpsilonValueCalculator struct{}
+type LogEpsilonValueCalculator struct{ LinearEpsilonValueCalculator }
+type PolynomialEpsilonValueCalculator struct {
+	LinearEpsilonValueCalculator
+	exp float64 // the exponent to which we will raise the value to reweight
+}
+
+// ------ constants -------------------
+
+const epsilonBuckets = 120
+const epsilonDecay = 0.90 // decay the exploration rate
+const minEpsilon = 0.01   // explore one percent of the time
+const initialEpsilon = 0.3
+const defaultDecayDuration = time.Duration(5) * time.Minute
+
+func New(hosts []string) *standardHostPool {
+	p := &standardHostPool{
+		hosts:             make(map[string]*hostEntry, len(hosts)),
+		hostList:          make([]*hostEntry, len(hosts)),
+		initialRetryDelay: time.Duration(30) * time.Second,
+		maxRetryInterval:  time.Duration(900) * time.Second,
+	}
+
+	for i, h := range hosts {
+		e := &hostEntry{
+			host:       h,
+			retryDelay: p.initialRetryDelay,
+		}
+		p.hosts[h] = e
+		p.hostList[i] = e
+	}
+
+	return p
+}
+
+func (r *standardHostPoolResponse) Host() string {
+	return r.host
+}
+
+func (r *standardHostPoolResponse) hostPool() HostPool {
+	return r.pool
+}
+
+func (r *standardHostPoolResponse) Mark(err error) {
+	r.Do(func() {
+		doMark(err, r)
+	})
+}
+
+func doMark(err error, r HostPoolResponse) {
+	if err == nil {
+		r.hostPool().markSuccess(r)
+	} else {
+		r.hostPool().markFailed(r)
+	}
+}
+
+func (r *epsilonHostPoolResponse) Mark(err error) {
+	r.Do(func() {
+		r.ended = time.Now()
+		doMark(err, r)
+	})
+
+}
+
+// Epsilon Greedy is an algorithim that allows HostPool not only to track failure state, 
+// but also to learn about "better" options in terms of speed, and to pick from available hosts
+// based on a percentage of how well they perform. This gives a weighted request rate to better
+// performing hosts, while still distributing requests to all hosts (proportionate to their performance)
+// 
+// After enabling Epsilon Greedy, hosts must be marked for sucess along with a time value representing 
+// how fast (or slow) that host was. 
+// 
+// host := pool.Get()
+// start := time.Now()
+// ..... do work with host
+// duration = time.Now().Sub(start)
+// pool.MarkSuccessWithTime(host, duration)
+// 
+// a good overview of Epsilon Greedy is here http://stevehanov.ca/blog/index.php?id=132
+//
+// decayDuration may be set to 0 to use the default value of 5 minutes
+func NewEpsilonGreedy(hosts []string, decayDuration time.Duration, calc EpsilonValueCalculator) *epsilonGreedyHostPool {
+
+	if decayDuration <= 0 {
+		decayDuration = defaultDecayDuration
+	}
+	p := &epsilonGreedyHostPool{
+		standardHostPool:       *New(hosts),
+		epsilon:                float32(initialEpsilon),
+		decayDuration:          decayDuration,
+		EpsilonValueCalculator: calc,
+		timer:                  &realTimer{},
+	}
+
+	// allocate structures
+	for _, h := range p.hostList {
+		h.epsilonCounts = make([]int64, epsilonBuckets)
+		h.epsilonValues = make([]int64, epsilonBuckets)
+	}
+	go p.epsilonGreedyDecay()
+	return p
+}
+
+func (rt *realTimer) between(start time.Time, end time.Time) time.Duration {
+	return end.Sub(start)
+}
+
+
+func (p *epsilonGreedyHostPool) SetEpsilon(newEpsilon float32) {
+	p.Lock()
+	defer p.Unlock()
+	p.epsilon = newEpsilon
+}
+
+
+func (p *epsilonGreedyHostPool) epsilonGreedyDecay() {
+	durationPerBucket := p.decayDuration / epsilonBuckets
+	ticker := time.Tick(durationPerBucket)
+	for {
+		<-ticker
+		p.performEpsilonGreedyDecay()
+	}
+}
+func (p *epsilonGreedyHostPool) performEpsilonGreedyDecay() {
+	p.Lock()
+	for _, h := range p.hostList {
+		h.epsilonIndex += 1
+		h.epsilonIndex = h.epsilonIndex % epsilonBuckets
+		h.epsilonCounts[h.epsilonIndex] = 0
+		h.epsilonValues[h.epsilonIndex] = 0
+	}
+	p.Unlock()
+}
+
+
+// return an upstream entry from the HostPool
+func (p *standardHostPool) Get() HostPoolResponse {
+	p.Lock()
+	defer p.Unlock()
+	host := p.getRoundRobin()
+	return &standardHostPoolResponse{host: host, pool: p}
+}
+
+func (p *epsilonGreedyHostPool) Get() HostPoolResponse {
+	p.Lock()
+	defer p.Unlock()
+	host := p.getEpsilonGreedy()
+	started := time.Now()
+	return &epsilonHostPoolResponse{
+		standardHostPoolResponse: standardHostPoolResponse{host: host, pool: p},
+		started:                  started,
+	}
+}
+
+func (p *standardHostPool) getRoundRobin() string {
+	now := time.Now()
+	hostCount := len(p.hostList)
+	for i := range p.hostList {
+		// iterate via sequenece from where we last iterated
+		currentIndex := (i + p.nextHostIndex) % hostCount
+
+		h := p.hostList[currentIndex]
+		if !h.dead {
+			p.nextHostIndex = currentIndex + 1
+			return h.host
+		}
+		if h.nextRetry.Before(now) {
+			h.willRetryHost(p.maxRetryInterval)
+			p.nextHostIndex = currentIndex + 1
+			return h.host
+		}
+	}
+
+	// all hosts are down. re-add them
+	p.doResetAll()
+	p.nextHostIndex = 0
+	return p.hostList[0].host
+}
+
+func (p *epsilonGreedyHostPool) getEpsilonGreedy() string {
+	var hostToUse *hostEntry
+
+	// this is our exploration phase
+	if rand.Float32() < p.epsilon {
+		p.epsilon = p.epsilon * epsilonDecay
+		if p.epsilon < minEpsilon {
+			p.epsilon = minEpsilon
+		}
+		return p.getRoundRobin()
+	}
+
+	// calculate values for each host in the 0..1 range (but not ormalized)
+	var possibleHosts []*hostEntry
+	now := time.Now()
+	var sumValues float64
+	for _, h := range p.hostList {
+		if h.canTryHost(now) {
+			v := h.getWeightedAverageResponseTime()
+			if v > 0 {
+				ev := p.CalcValueFromAvgResponseTime(v)
+				h.epsilonValue = ev
+				sumValues += ev
+				possibleHosts = append(possibleHosts, h)
+			}
+		}
+	}
+
+	if len(possibleHosts) != 0 {
+		// now normalize to the 0..1 range to get a percentage
+		for _, h := range possibleHosts {
+			h.epsilonPercentage = h.epsilonValue / sumValues
+		}
+
+		// do a weighted random choice among hosts
+		ceiling := 0.0
+		pickPercentage := rand.Float64()
+		for _, h := range possibleHosts {
+			ceiling += h.epsilonPercentage
+			if pickPercentage <= ceiling {
+				hostToUse = h
+				break
+			}
+		}
+	}
+
+	if hostToUse == nil {
+		if len(possibleHosts) != 0 {
+			log.Println("Failed to randomly choose a host, Dan loses")
+		}
+		return p.getRoundRobin()
+	}
+
+	if hostToUse.dead {
+		hostToUse.willRetryHost(p.maxRetryInterval)
+	}
+	return hostToUse.host
+}
+
+func (h *hostEntry) canTryHost(now time.Time) bool {
+	if !h.dead {
+		return true
+	}
+	if h.nextRetry.Before(now) {
+		return true
+	}
+	return false
+}
+
+func (h *hostEntry) willRetryHost(maxRetryInterval time.Duration) {
+	h.retryCount += 1
+	newDelay := h.retryDelay * 2
+	if newDelay < maxRetryInterval {
+		h.retryDelay = newDelay
+	} else {
+		h.retryDelay = maxRetryInterval
+	}
+	h.nextRetry = time.Now().Add(h.retryDelay)
+}
+
+func (h *hostEntry) getWeightedAverageResponseTime() float64 {
+	var value float64
+	var lastValue float64
+
+	// start at 1 so we start with the oldest entry
+	for i := 1; i <= epsilonBuckets; i += 1 {
+		pos := (h.epsilonIndex + i) % epsilonBuckets
+		bucketCount := h.epsilonCounts[pos]
+		// Changing the line below to what I think it should be to get the weights right
+		weight := float64(i) / float64(epsilonBuckets)
+		if bucketCount > 0 {
+			currentValue := float64(h.epsilonValues[pos]) / float64(bucketCount)
+			value += currentValue * weight
+			lastValue = currentValue
+		} else {
+			value += lastValue * weight
+		}
+	}
+	return value
+}
+
+func (p *standardHostPool) ResetAll() {
+	p.Lock()
+	defer p.Unlock()
+	p.doResetAll()
+}
+
+// this actually performs the logic to reset,
+// and should only be called when the lock has
+// already been acquired
+func (p *standardHostPool) doResetAll() {
+	for _, h := range p.hosts {
+		h.dead = false
+	}
+}
+
+func (p *standardHostPool) markSuccess(hostR HostPoolResponse) {
+	host := hostR.Host()
+	p.Lock()
+	defer p.Unlock()
+
+	h, ok := p.hosts[host]
+	if !ok {
+		log.Fatalf("host %s not in HostPool %v", host, p.Hosts())
+	}
+	h.dead = false
+}
+
+func (p *epsilonGreedyHostPool) markSuccess(hostR HostPoolResponse) {
+	// first do the base markSuccess - a little redundant with host lookup but cleaner than repeating logic
+	p.standardHostPool.markSuccess(hostR)
+	eHostR, ok := hostR.(*epsilonHostPoolResponse)
+	if !ok {
+		log.Printf("Incorrect type in eps markSuccess!") // TODO reflection to print out offending type
+		return
+	}
+	host := eHostR.host
+	duration := p.between(eHostR.started, eHostR.ended)
+
+	p.Lock()
+	defer p.Unlock()
+	h, ok := p.hosts[host]
+	if !ok {
+		log.Fatalf("host %s not in HostPool %v", host, p.Hosts())
+	}
+	h.epsilonCounts[h.epsilonIndex]++
+	h.epsilonValues[h.epsilonIndex] += int64(duration.Seconds() * 1000)
+}
+
+func (p *standardHostPool) markFailed(hostR HostPoolResponse) {
+	host := hostR.Host()
+	p.Lock()
+	defer p.Unlock()
+	h, ok := p.hosts[host]
+	if !ok {
+		log.Fatalf("host %s not in HostPool %v", host, p.Hosts())
+	}
+	if !h.dead {
+		h.dead = true
+		h.retryCount = 0
+		h.retryDelay = p.initialRetryDelay
+		h.nextRetry = time.Now().Add(h.retryDelay)
+	}
+
+}
+func (p *standardHostPool) Hosts() []string {
+	hosts := make([]string, len(p.hosts))
+	for host, _ := range p.hosts {
+		hosts = append(hosts, host)
+	}
+	return hosts
+}
+
+// -------- Epsilon Value Calculators ----------
+
+func (c *LinearEpsilonValueCalculator) CalcValueFromAvgResponseTime(v float64) float64 {
+	return 1.0 / v
+}
+
+func (c *LogEpsilonValueCalculator) CalcValueFromAvgResponseTime(v float64) float64 {
+	return math.Log(c.LinearEpsilonValueCalculator.CalcValueFromAvgResponseTime(v))
+}
+
+func (c *PolynomialEpsilonValueCalculator) CalcValueFromAvgResponseTime(v float64) float64 {
+	return math.Pow(c.LinearEpsilonValueCalculator.CalcValueFromAvgResponseTime(v), c.exp)
+}

+ 122 - 0
hostpool_test.go

@@ -0,0 +1,122 @@
+package hostpool
+
+import (
+	"github.com/bmizerany/assert"
+	"io/ioutil"
+	"log"
+	"math/rand"
+	"os"
+	"testing"
+	"time"
+	"errors"
+)
+
+func TestHostPool(t *testing.T) {
+	log.SetOutput(ioutil.Discard)
+	defer log.SetOutput(os.Stdout)
+
+	dummyErr := errors.New("Dummy Error")
+
+	p := New([]string{"a", "b", "c"})
+	assert.Equal(t, p.Get().Host(), "a")
+	assert.Equal(t, p.Get().Host(), "b")
+	assert.Equal(t, p.Get().Host(), "c")
+	respA := p.Get()
+	assert.Equal(t, respA.Host(), "a")
+
+	respA.Mark(dummyErr)
+	respB := p.Get()
+	respB.Mark(dummyErr)
+	respC := p.Get()
+	assert.Equal(t, respC.Host(), "c")
+	respC.Mark(nil)
+	// get again, and verify that it's still c
+	assert.Equal(t, p.Get().Host(), "c")
+	// now try to mark b as success; should fail because already marked
+	respB.Mark(nil)
+	assert.Equal(t, p.Get().Host(), "c") // would be b if it were not dead
+	// now restore a
+	respA = &standardHostPoolResponse{host: "a", pool: p}
+	respA.Mark(nil)
+	assert.Equal(t, p.Get().Host(), "a")
+	assert.Equal(t, p.Get().Host(), "c")
+
+	// ensure that we get *something* back when all hosts fail
+	for _, host := range []string{"a", "b", "c"} {
+		response := &standardHostPoolResponse{host: host, pool: p}
+		response.Mark(dummyErr)
+	}
+	resp := p.Get()
+	assert.NotEqual(t, resp, nil)
+}
+
+type mockTimer struct {
+	t int // the time it will always return
+}
+
+func (t *mockTimer) between(start time.Time, end time.Time) time.Duration {
+	return time.Duration(t.t) * time.Millisecond
+}
+
+func TestEpsilonGreedy(t *testing.T) {
+	log.SetOutput(ioutil.Discard)
+	defer log.SetOutput(os.Stdout)
+
+	rand.Seed(10)
+
+	iterations := 12000
+	p := NewEpsilonGreedy([]string{"a", "b"}, 0, &LinearEpsilonValueCalculator{})
+
+	timings := make(map[string]int64)
+	timings["a"] = 200
+	timings["b"] = 300
+
+	hitCounts := make(map[string]int)
+	hitCounts["a"] = 0
+	hitCounts["b"] = 0
+
+	log.Printf("starting first run (a, b)")
+
+	for i := 0; i < iterations; i += 1 {
+		if i != 0 && i%100 == 0 {
+			p.performEpsilonGreedyDecay()
+		}
+		hostR := p.Get()
+		host := hostR.Host()
+		hitCounts[host]++
+		timing := timings[host]
+		p.timer = &mockTimer{t: int(timing)}
+		hostR.Mark(nil)
+	}
+
+	for host, _ := range hitCounts {
+		log.Printf("host %s hit %d times (%0.2f percent)", host, hitCounts[host], (float64(hitCounts[host])/float64(iterations))*100.0)
+	}
+
+	assert.Equal(t, hitCounts["a"] > hitCounts["b"], true)
+
+	hitCounts["a"] = 0
+	hitCounts["b"] = 0
+	log.Printf("starting second run (b, a)")
+	timings["a"] = 500
+	timings["b"] = 100
+
+	for i := 0; i < iterations; i += 1 {
+		if i != 0 && i%100 == 0 {
+			p.performEpsilonGreedyDecay()
+		}
+		hostR := p.Get()
+		host := hostR.Host()
+		hitCounts[host]++
+		timing := timings[host]
+		p.timer = &mockTimer{t: int(timing)}
+		hostR.Mark(nil)
+	}
+
+	for host, _ := range hitCounts {
+		log.Printf("host %s hit %d times (%0.2f percent)", host, hitCounts[host], (float64(hitCounts[host])/float64(iterations))*100.0)
+	}
+
+	assert.Equal(t, hitCounts["b"] > hitCounts["a"], true)
+
+}