瀏覽代碼

refactor to selector interface

Dan Frank 12 年之前
父節點
當前提交
d94a633b9e
共有 5 個文件被更改,包括 211 次插入205 次删除
  1. 49 51
      epsilon_greedy.go
  2. 2 2
      example_test.go
  3. 16 126
      hostpool.go
  4. 26 26
      hostpool_test.go
  5. 118 0
      selector.go

+ 49 - 51
epsilon_greedy.go

@@ -10,21 +10,21 @@ import (
 
 type epsilonHostPoolResponse struct {
 	HostPoolResponse
-	started time.Time
-	ended   time.Time
-	pool    *epsilonGreedyHostPool
+	started  time.Time
+	ended    time.Time
+	selector *epsilonGreedySelector
 }
 
 func (r *epsilonHostPoolResponse) Mark(err error) {
 	if err == nil {
 		r.ended = time.Now()
-		r.pool.recordTiming(r)
+		r.selector.recordTiming(r)
 	}
 	r.HostPoolResponse.Mark(err)
 }
 
-type epsilonGreedyHostPool struct {
-	HostPool
+type epsilonGreedySelector struct {
+	Selector
 	sync.Locker
 	epsilon                float32 // this is our exploration factor
 	decayDuration          time.Duration
@@ -32,7 +32,7 @@ type epsilonGreedyHostPool struct {
 	timer
 }
 
-// Construct an Epsilon Greedy HostPool
+// Construct an Epsilon Greedy Selector
 //
 // Epsilon Greedy is an algorithm that allows HostPool not only to track failure state, 
 // but also to learn about "better" options in terms of speed, and to pick from available hosts
@@ -46,73 +46,71 @@ type epsilonGreedyHostPool struct {
 // To compute the weighting scores, we perform a weighted average of recent response times, over the course of
 // `decayDuration`. decayDuration may be set to 0 to use the default value of 5 minutes
 // We then use the supplied EpsilonValueCalculator to calculate a score from that weighted average response time.
-func NewEpsilonGreedy(hosts []string, decayDuration time.Duration, calc EpsilonValueCalculator) HostPool {
+func NewEpsilonGreedy(decayDuration time.Duration, calc EpsilonValueCalculator) Selector {
 
 	if decayDuration <= 0 {
 		decayDuration = defaultDecayDuration
 	}
-	stdHP := New(hosts).(*standardHostPool)
-	p := &epsilonGreedyHostPool{
-		HostPool:               stdHP,
-		Locker:                 stdHP,
+	ss := &standardSelector{}
+	s := &epsilonGreedySelector{
+		Selector:               ss,
+		Locker:                 ss,
 		epsilon:                float32(initialEpsilon),
 		decayDuration:          decayDuration,
 		EpsilonValueCalculator: calc,
 		timer:                  &realTimer{},
 	}
 
+	return s
+}
+
+func (s *epsilonGreedySelector) Init(hosts []string) {
+	s.Selector.Init(hosts)
 	// allocate structures
-	for _, h := range stdHP.hostList {
+	for _, h := range s.Selector.(*standardSelector).hostList {
 		h.epsilonCounts = make([]int64, epsilonBuckets)
 		h.epsilonValues = make([]int64, epsilonBuckets)
 	}
-	go p.epsilonGreedyDecay()
-	return p
-}
-
-func (p *epsilonGreedyHostPool) SetEpsilon(newEpsilon float32) {
-	p.Lock()
-	defer p.Unlock()
-	p.epsilon = newEpsilon
+	go s.epsilonGreedyDecay()
 }
 
-func (p *epsilonGreedyHostPool) epsilonGreedyDecay() {
-	durationPerBucket := p.decayDuration / epsilonBuckets
+func (s *epsilonGreedySelector) epsilonGreedyDecay() {
+	durationPerBucket := s.decayDuration / epsilonBuckets
 	ticker := time.Tick(durationPerBucket)
 	for {
 		<-ticker
-		p.performEpsilonGreedyDecay()
+		s.performEpsilonGreedyDecay()
 	}
 }
-func (p *epsilonGreedyHostPool) performEpsilonGreedyDecay() {
-	p.Lock()
-	for _, h := range p.HostPool.(*standardHostPool).hostList {
+func (s *epsilonGreedySelector) performEpsilonGreedyDecay() {
+	s.Lock()
+	for _, h := range s.Selector.(*standardSelector).hostList {
 		h.epsilonIndex += 1
 		h.epsilonIndex = h.epsilonIndex % epsilonBuckets
 		h.epsilonCounts[h.epsilonIndex] = 0
 		h.epsilonValues[h.epsilonIndex] = 0
 	}
-	p.Unlock()
+	s.Unlock()
 }
 
-func (p *epsilonGreedyHostPool) ChooseNextHost() string {
-	p.Lock()
-	host, err := p.getEpsilonGreedy()
-	p.Unlock()
+func (s *epsilonGreedySelector) SelectNextHost() string {
+	s.Lock()
+	host, err := s.getEpsilonGreedy()
+	s.Unlock()
 	if err != nil {
-		host = p.HostPool.ChooseNextHost()
+		host = s.Selector.SelectNextHost()
 	}
 	return host
 }
 
-func (p *epsilonGreedyHostPool) getEpsilonGreedy() (string, error) {
+func (s *epsilonGreedySelector) getEpsilonGreedy() (string, error) {
 	var hostToUse *hostEntry
 
 	// this is our exploration phase
-	if rand.Float32() < p.epsilon {
-		p.epsilon = p.epsilon * epsilonDecay
-		if p.epsilon < minEpsilon {
-			p.epsilon = minEpsilon
+	if rand.Float32() < s.epsilon {
+		s.epsilon = s.epsilon * epsilonDecay
+		if s.epsilon < minEpsilon {
+			s.epsilon = minEpsilon
 		}
 		return "", errors.New("Exploration")
 	}
@@ -121,11 +119,11 @@ func (p *epsilonGreedyHostPool) getEpsilonGreedy() (string, error) {
 	var possibleHosts []*hostEntry
 	now := time.Now()
 	var sumValues float64
-	for _, h := range p.HostPool.(*standardHostPool).hostList {
+	for _, h := range s.Selector.(*standardSelector).hostList {
 		if h.canTryHost(now) {
 			v := h.getWeightedAverageResponseTime()
 			if v > 0 {
-				ev := p.CalcValueFromAvgResponseTime(v)
+				ev := s.CalcValueFromAvgResponseTime(v)
 				h.epsilonValue = ev
 				sumValues += ev
 				possibleHosts = append(possibleHosts, h)
@@ -160,32 +158,32 @@ func (p *epsilonGreedyHostPool) getEpsilonGreedy() (string, error) {
 	return hostToUse.host, nil
 }
 
-func (p *epsilonGreedyHostPool) recordTiming(eHostR *epsilonHostPoolResponse) {
+func (s *epsilonGreedySelector) recordTiming(eHostR *epsilonHostPoolResponse) {
 	host := eHostR.Host()
-	duration := p.between(eHostR.started, eHostR.ended)
+	duration := s.between(eHostR.started, eHostR.ended)
 
-	p.Lock()
-	defer p.Unlock()
-	h, ok := p.HostPool.(*standardHostPool).hosts[host]
+	s.Lock()
+	defer s.Unlock()
+	h, ok := s.Selector.(*standardSelector).hosts[host]
 	if !ok {
-		log.Fatalf("host %s not in HostPool %v", host, p.Hosts())
+		log.Fatalf("host %s not in HostPool", host)
 	}
 	h.epsilonCounts[h.epsilonIndex]++
 	h.epsilonValues[h.epsilonIndex] += int64(duration.Seconds() * 1000)
 }
 
-func (p *epsilonGreedyHostPool) DeliverHostResponse(host string) HostPoolResponse {
-	resp := p.HostPool.DeliverHostResponse(host)
-	return p.toEpsilonHostPootResponse(resp)
+func (s *epsilonGreedySelector) MakeHostResponse(host string) HostPoolResponse {
+	resp := s.Selector.MakeHostResponse(host)
+	return s.toEpsilonHostPoolResponse(resp)
 }
 
 // Convert regular response to one equipped for EG. Doesn't require lock, for now
-func (p *epsilonGreedyHostPool) toEpsilonHostPootResponse(resp HostPoolResponse) *epsilonHostPoolResponse {
+func (s *epsilonGreedySelector) toEpsilonHostPoolResponse(resp HostPoolResponse) *epsilonHostPoolResponse {
 	started := time.Now()
 	return &epsilonHostPoolResponse{
 		HostPoolResponse: resp,
 		started:          started,
-		pool:             p,
+		selector:         s,
 	}
 }
 

+ 2 - 2
example_test.go

@@ -5,8 +5,8 @@ import (
 )
 
 func ExampleNewEpsilonGreedy() {
-	hp := NewEpsilonGreedy([]string{"a", "b"}, 0, &LinearEpsilonValueCalculator{})
-	hostResponse := Get(hp)
+	hp := NewWithSelector([]string{"a", "b"}, NewEpsilonGreedy(0, &LinearEpsilonValueCalculator{}))
+	hostResponse := hp.Get()
 	hostname := hostResponse.Host()
 	err := errors.New("I am your http error from " + hostname) // (make a request with hostname)
 	hostResponse.Mark(err)

+ 16 - 126
hostpool.go

@@ -4,8 +4,6 @@
 package hostpool
 
 import (
-	"log"
-	"sync"
 	"time"
 )
 
@@ -27,7 +25,7 @@ type HostPoolResponse interface {
 
 type standardHostPoolResponse struct {
 	host string
-	pool *standardHostPool
+	ss   *standardSelector
 }
 
 // --- HostPool structs and interfaces ----
@@ -36,22 +34,14 @@ type standardHostPoolResponse struct {
 // allow you to Get a HostPoolResponse (which includes a hostname to use),
 // get the list of all Hosts, and use ResetAll to reset state.
 type HostPool interface {
-	// Get() HostPoolResponse
-
+	Get() HostPoolResponse
 	ResetAll()
 	Hosts() []string
-
-	ChooseNextHost() string
-	DeliverHostResponse(string) HostPoolResponse
 }
 
 type standardHostPool struct {
-	sync.RWMutex
-	hosts             map[string]*hostEntry
-	hostList          []*hostEntry
-	initialRetryDelay time.Duration
-	maxRetryInterval  time.Duration
-	nextHostIndex     int
+	hosts []string
+	Selector
 }
 
 // ------ constants -------------------
@@ -64,23 +54,15 @@ const defaultDecayDuration = time.Duration(5) * time.Minute
 
 // Construct a basic HostPool using the hostnames provided
 func New(hosts []string) HostPool {
-	p := &standardHostPool{
-		hosts:             make(map[string]*hostEntry, len(hosts)),
-		hostList:          make([]*hostEntry, len(hosts)),
-		initialRetryDelay: time.Duration(30) * time.Second,
-		maxRetryInterval:  time.Duration(900) * time.Second,
-	}
+	return NewWithSelector(hosts, &standardSelector{})
+}
 
-	for i, h := range hosts {
-		e := &hostEntry{
-			host:       h,
-			retryDelay: p.initialRetryDelay,
-		}
-		p.hosts[h] = e
-		p.hostList[i] = e
+func NewWithSelector(hosts []string, s Selector) HostPool {
+	s.Init(hosts)
+	return &standardHostPool{
+		hosts,
+		s,
 	}
-
-	return p
 }
 
 func (r *standardHostPoolResponse) Host() string {
@@ -88,107 +70,15 @@ func (r *standardHostPoolResponse) Host() string {
 }
 
 func (r *standardHostPoolResponse) Mark(err error) {
-	if err == nil {
-		r.pool.markSuccess(r)
-	} else {
-		r.pool.markFailed(r)
-	}
+	r.ss.MarkHost(r.host, err)
 }
 
 // return an entry from the HostPool
-func Get(p HostPool) HostPoolResponse {
-	host := p.ChooseNextHost()
-	return p.DeliverHostResponse(host)
-}
-
-func (p *standardHostPool) ChooseNextHost() string {
-	p.Lock()
-	host := p.getRoundRobin()
-	p.Unlock()
-	return host
-}
-
-func (p *standardHostPool) getRoundRobin() string {
-	now := time.Now()
-	hostCount := len(p.hostList)
-	for i := range p.hostList {
-		// iterate via sequenece from where we last iterated
-		currentIndex := (i + p.nextHostIndex) % hostCount
-
-		h := p.hostList[currentIndex]
-		if h.canTryHost(now) {
-			p.nextHostIndex = currentIndex + 1
-			return h.host
-		}
-	}
-
-	// all hosts are down. re-add them
-	p.doResetAll()
-	p.nextHostIndex = 0
-	return p.hostList[0].host
-}
-
-func (p *standardHostPool) ResetAll() {
-	p.Lock()
-	defer p.Unlock()
-	p.doResetAll()
-}
-
-// this actually performs the logic to reset,
-// and should only be called when the lock has
-// already been acquired
-func (p *standardHostPool) doResetAll() {
-	for _, h := range p.hosts {
-		h.dead = false
-	}
-}
-
-func (p *standardHostPool) markSuccess(hostR *standardHostPoolResponse) {
-	host := hostR.Host()
-	p.Lock()
-	defer p.Unlock()
-
-	h, ok := p.hosts[host]
-	if !ok {
-		log.Fatalf("host %s not in HostPool %v", host, p.Hosts())
-	}
-	h.dead = false
+func (p *standardHostPool) Get() HostPoolResponse {
+	host := p.SelectNextHost()
+	return p.MakeHostResponse(host)
 }
 
-func (p *standardHostPool) markFailed(hostR *standardHostPoolResponse) {
-	host := hostR.Host()
-	p.Lock()
-	defer p.Unlock()
-	h, ok := p.hosts[host]
-	if !ok {
-		log.Fatalf("host %s not in HostPool %v", host, p.Hosts())
-	}
-	if !h.dead {
-		h.dead = true
-		h.retryCount = 0
-		h.retryDelay = p.initialRetryDelay
-		h.nextRetry = time.Now().Add(h.retryDelay)
-	}
-
-}
 func (p *standardHostPool) Hosts() []string {
-	hosts := make([]string, len(p.hosts))
-	for host, _ := range p.hosts {
-		hosts = append(hosts, host)
-	}
-	return hosts
-}
-
-func (p *standardHostPool) DeliverHostResponse(host string) HostPoolResponse {
-	p.Lock()
-	defer p.Unlock()
-	h, ok := p.hosts[host]
-	if !ok {
-		log.Fatalf("host %s not in HostPool %v", host, p.Hosts())
-	}
-	now := time.Now()
-	if h.dead && h.nextRetry.Before(now) {
-		h.willRetryHost(p.maxRetryInterval)
-	}
-	return &standardHostPoolResponse{host: host, pool: p}
+	return p.hosts
 }

+ 26 - 26
hostpool_test.go

@@ -18,33 +18,33 @@ func TestHostPool(t *testing.T) {
 	dummyErr := errors.New("Dummy Error")
 
 	p := New([]string{"a", "b", "c"}).(*standardHostPool)
-	assert.Equal(t, Get(p).Host(), "a")
-	assert.Equal(t, Get(p).Host(), "b")
-	assert.Equal(t, Get(p).Host(), "c")
-	respA := Get(p)
+	assert.Equal(t, p.Get().Host(), "a")
+	assert.Equal(t, p.Get().Host(), "b")
+	assert.Equal(t, p.Get().Host(), "c")
+	respA := p.Get()
 	assert.Equal(t, respA.Host(), "a")
 
 	respA.Mark(dummyErr)
-	respB := Get(p)
+	respB := p.Get()
 	respB.Mark(dummyErr)
-	respC := Get(p)
+	respC := p.Get()
 	assert.Equal(t, respC.Host(), "c")
 	respC.Mark(nil)
 	// get again, and verify that it's still c
-	assert.Equal(t, Get(p).Host(), "c")
-	assert.Equal(t, Get(p).Host(), "c") // would be b if it were not dead
+	assert.Equal(t, p.Get().Host(), "c")
+	assert.Equal(t, p.Get().Host(), "c") // would be b if it were not dead
 	// now restore a
-	respA = &standardHostPoolResponse{host: "a", pool: p}
+	respA = &standardHostPoolResponse{host: "a", ss: p.Selector.(*standardSelector)}
 	respA.Mark(nil)
-	assert.Equal(t, Get(p).Host(), "a")
-	assert.Equal(t, Get(p).Host(), "c")
+	assert.Equal(t, p.Get().Host(), "a")
+	assert.Equal(t, p.Get().Host(), "c")
 
 	// ensure that we get *something* back when all hosts fail
 	for _, host := range []string{"a", "b", "c"} {
-		response := &standardHostPoolResponse{host: host, pool: p}
+		response := &standardHostPoolResponse{host: host, ss: p.Selector.(*standardSelector)}
 		response.Mark(dummyErr)
 	}
-	resp := Get(p)
+	resp := p.Get()
 	assert.NotEqual(t, resp, nil)
 }
 
@@ -57,13 +57,13 @@ func (t *mockTimer) between(start time.Time, end time.Time) time.Duration {
 }
 
 func TestEpsilonGreedy(t *testing.T) {
-	log.SetOutput(ioutil.Discard)
-	defer log.SetOutput(os.Stdout)
+	// log.SetOutput(ioutil.Discard)
+	// defer log.SetOutput(os.Stdout)
 
 	rand.Seed(10)
 
 	iterations := 12000
-	p := NewEpsilonGreedy([]string{"a", "b"}, 0, &LinearEpsilonValueCalculator{}).(*epsilonGreedyHostPool)
+	p := NewWithSelector([]string{"a", "b"}, NewEpsilonGreedy(0, &LinearEpsilonValueCalculator{})).(*standardHostPool)
 
 	timings := make(map[string]int64)
 	timings["a"] = 200
@@ -77,13 +77,13 @@ func TestEpsilonGreedy(t *testing.T) {
 
 	for i := 0; i < iterations; i += 1 {
 		if i != 0 && i%100 == 0 {
-			p.performEpsilonGreedyDecay()
+			p.Selector.(*epsilonGreedySelector).performEpsilonGreedyDecay()
 		}
-		hostR := Get(p)
+		hostR := p.Get()
 		host := hostR.Host()
 		hitCounts[host]++
 		timing := timings[host]
-		p.timer = &mockTimer{t: int(timing)}
+		p.Selector.(*epsilonGreedySelector).timer = &mockTimer{t: int(timing)}
 		hostR.Mark(nil)
 	}
 
@@ -101,13 +101,13 @@ func TestEpsilonGreedy(t *testing.T) {
 
 	for i := 0; i < iterations; i += 1 {
 		if i != 0 && i%100 == 0 {
-			p.performEpsilonGreedyDecay()
+			p.Selector.(*epsilonGreedySelector).performEpsilonGreedyDecay()
 		}
-		hostR := Get(p)
+		hostR := p.Get()
 		host := hostR.Host()
 		hitCounts[host]++
 		timing := timings[host]
-		p.timer = &mockTimer{t: int(timing)}
+		p.Selector.(*epsilonGreedySelector).timer = &mockTimer{t: int(timing)}
 		hostR.Mark(nil)
 	}
 
@@ -129,15 +129,15 @@ func BenchmarkEpsilonGreedy(b *testing.B) {
 	}
 
 	// Make the hostpool with a few hosts
-	p := NewEpsilonGreedy([]string{"a", "b"}, 0, &LinearEpsilonValueCalculator{}).(*epsilonGreedyHostPool)
+	p := NewWithSelector([]string{"a", "b"}, NewEpsilonGreedy(0, &LinearEpsilonValueCalculator{})).(*standardHostPool)
 
 	b.StartTimer()
 	for i := 0; i < b.N; i++ {
 		if i != 0 && i%100 == 0 {
-			p.performEpsilonGreedyDecay()
+			p.Selector.(*epsilonGreedySelector).performEpsilonGreedyDecay()
 		}
-		hostR := Get(p)
-		p.timer = &mockTimer{t: int(timings[i])}
+		hostR := p.Get()
+		p.Selector.(*epsilonGreedySelector).timer = &mockTimer{t: int(timings[i])}
 		hostR.Mark(nil)
 	}
 }

+ 118 - 0
selector.go

@@ -0,0 +1,118 @@
+package hostpool
+
+import (
+	"log"
+	"sync"
+	"time"
+)
+
+type Selector interface {
+	Init([]string)
+	SelectNextHost() string
+	MakeHostResponse(string) HostPoolResponse
+	MarkHost(string, error)
+	ResetAll()
+}
+
+type standardSelector struct {
+	sync.RWMutex
+	hosts             map[string]*hostEntry
+	hostList          []*hostEntry
+	initialRetryDelay time.Duration
+	maxRetryInterval  time.Duration
+	nextHostIndex     int
+}
+
+func (s *standardSelector) Init(hosts []string) {
+	s.hosts = make(map[string]*hostEntry, len(hosts))
+	s.hostList = make([]*hostEntry, len(hosts))
+	s.initialRetryDelay = time.Duration(30) * time.Second
+	s.maxRetryInterval = time.Duration(900) * time.Second
+
+	for i, h := range hosts {
+		e := &hostEntry{
+			host:       h,
+			retryDelay: s.initialRetryDelay,
+		}
+		s.hosts[h] = e
+		s.hostList[i] = e
+	}
+}
+
+func (s *standardSelector) SelectNextHost() string {
+	s.Lock()
+	host := s.getRoundRobin()
+	s.Unlock()
+	return host
+}
+
+func (s *standardSelector) getRoundRobin() string {
+	now := time.Now()
+	hostCount := len(s.hostList)
+	for i := range s.hostList {
+		// iterate via sequenece from where we last iterated
+		currentIndex := (i + s.nextHostIndex) % hostCount
+
+		h := s.hostList[currentIndex]
+		if h.canTryHost(now) {
+			s.nextHostIndex = currentIndex + 1
+			return h.host
+		}
+	}
+
+	// all hosts are down. re-add them
+	s.doResetAll()
+	s.nextHostIndex = 0
+	return s.hostList[0].host
+}
+
+func (s *standardSelector) MakeHostResponse(host string) HostPoolResponse {
+	s.Lock()
+	defer s.Unlock()
+	h, ok := s.hosts[host]
+	if !ok {
+		log.Fatalf("host %s not in HostPool", host)
+	}
+	now := time.Now()
+	if h.dead && h.nextRetry.Before(now) {
+		h.willRetryHost(s.maxRetryInterval)
+	}
+	return &standardHostPoolResponse{host: host, ss: s}
+}
+
+func (s *standardSelector) MarkHost(host string, err error) {
+	s.Lock()
+	defer s.Unlock()
+
+	h, ok := s.hosts[host]
+	if !ok {
+		log.Fatalf("host %s not in HostPool", host)
+	}
+	if err == nil {
+		// success - mark host alive
+		h.dead = false
+	} else {
+		// failure - mark host dead
+		if !h.dead {
+			h.dead = true
+			h.retryCount = 0
+			h.retryDelay = s.initialRetryDelay
+			h.nextRetry = time.Now().Add(h.retryDelay)
+		}
+	}
+}
+
+func (s *standardSelector) ResetAll() {
+	s.Lock()
+	defer s.Unlock()
+	s.doResetAll()
+}
+
+// this actually performs the logic to reset,
+// and should only be called when the lock has
+// already been acquired
+func (s *standardSelector) doResetAll() {
+	for _, h := range s.hosts {
+		h.dead = false
+	}
+}