瀏覽代碼

new file for host_entry stuff, trying to do major refactor to lock-free

Dan Frank 13 年之前
父節點
當前提交
4430ba29f0
共有 2 個文件被更改,包括 165 次插入39 次删除
  1. 150 0
      host_entry.go
  2. 15 39
      hostpool.go

+ 150 - 0
host_entry.go

@@ -0,0 +1,150 @@
+package hostpool
+
+import (
+	"math"
+	"time"
+)
+
+type HostEntry interface {
+	IsDead() bool
+	Host() string
+	SetDead(bool)
+	canTryHost(time.Time) bool
+	willRetryHost()
+}
+
+// -- Requests
+
+type hostEntryRequest interface {
+	getRespChan() chan<- interface{}
+}
+
+type baseHostEntryRequest struct {
+	respChan chan interface{}
+}
+
+func (req *baseHostEntryRequest) getRespChan() chan<- interface{} {
+	return req.respChan
+}
+
+type isDeadRequest struct{ baseHostEntryRequest }
+
+type setDeadRequest struct {
+	baseHostEntryRequest
+	setDeadTo bool
+}
+
+type canTryRequest struct {
+	baseHostEntryRequest
+	atTime time.Time
+}
+
+type willRetryRequest struct{ baseHostEntryRequest }
+
+type hostEntry struct {
+	host              string
+	nextRetry         time.Time
+	retryDelay        time.Duration
+	initialRetryDelay time.Duration
+	maxRetryInterval  time.Duration
+	dead              bool
+	// epsilonCounts     []int64
+	// epsilonValues     []int64
+	// epsilonIndex      int
+	// epsilonValue      float64
+	// epsilonPercentage float64
+	incomingRequests chan hostEntryRequest
+}
+
+func (he *hostEntry) Host() string {
+	// This never changes, so we can safely return it
+	return he.host
+}
+
+func newHostEntry(host string, initialRetryDelay time.Duration, maxRetryInterval time.Duration) HostEntry {
+	he := &hostEntry{
+		host:              host,
+		retryDelay:        initialRetryDelay,
+		initialRetryDelay: initialRetryDelay,
+		maxRetryInterval:  maxRetryInterval,
+		incomingRequests:  make(chan hostEntryRequest),
+	}
+	go he.handleRequests()
+	return he
+}
+
+func (he *hostEntry) handleRequests() {
+	for req := range he.incomingRequests {
+		var resp interface{}
+		switch req.(type) {
+		case *isDeadRequest:
+			resp = he.dead
+		case *setDeadRequest:
+			newVal := req.(*setDeadRequest).setDeadTo
+			if newVal && !he.dead {
+				// Entering the deadpool - initialize retry
+				he.retryDelay = he.initialRetryDelay
+				he.nextRetry = time.Now().Add(he.retryDelay)
+			}
+			he.dead = newVal
+		case *canTryRequest:
+			resp = !he.dead || he.nextRetry.Before(req.(*canTryRequest).atTime)
+		case *willRetryRequest:
+			he.retryDelay = time.Duration(int64(math.Min(float64(he.retryDelay*2), float64(he.maxRetryInterval))))
+			he.nextRetry = time.Now().Add(he.retryDelay)
+		}
+		req.getRespChan() <- resp
+	}
+}
+
+func (he *hostEntry) IsDead() bool {
+	req := &isDeadRequest{
+		baseHostEntryRequest{
+			respChan: make(chan interface{}),
+		},
+	}
+	he.incomingRequests <- req
+	resp := <-req.respChan
+	isDeadResp, ok := resp.(bool)
+	if !ok {
+		// TODO
+	}
+	return isDeadResp
+}
+
+func (he *hostEntry) SetDead(newDeadVal bool) {
+	req := &setDeadRequest{
+		baseHostEntryRequest{
+			respChan: make(chan interface{}),
+		},
+		newDeadVal,
+	}
+	he.incomingRequests <- req
+	<-req.respChan
+}
+
+func (he *hostEntry) canTryHost(now time.Time) bool {
+	req := &canTryRequest{
+		baseHostEntryRequest{
+			respChan: make(chan interface{}),
+		},
+		now,
+	}
+	he.incomingRequests <- req
+	resp := <-req.respChan
+	canTryResp, ok := resp.(bool)
+	if !ok {
+		// TODO
+	}
+	return canTryResp
+}
+
+func (he *hostEntry) willRetryHost() {
+	req := &willRetryRequest{
+		baseHostEntryRequest{
+			respChan: make(chan interface{}),
+		},
+	}
+	he.incomingRequests <- req
+	<-req.respChan
+}

+ 15 - 39
hostpool.go

@@ -46,6 +46,7 @@ type HostPool interface {
 
 	ResetAll()
 	Hosts() []string
+	sync.Locker
 }
 
 type standardHostPool struct {
@@ -58,28 +59,13 @@ type standardHostPool struct {
 }
 
 type epsilonGreedyHostPool struct {
-	standardHostPool               // TODO - would be nifty if we could embed HostPool and Locker interfaces
+	HostPool
 	epsilon                float32 // this is our exploration factor
 	decayDuration          time.Duration
 	EpsilonValueCalculator // embed the epsilonValueCalculator
 	timer
 }
 
-// --- hostEntry - this is due to get upgraded
-
-type hostEntry struct {
-	host              string
-	nextRetry         time.Time
-	retryCount        int16
-	retryDelay        time.Duration
-	dead              bool
-	epsilonCounts     []int64
-	epsilonValues     []int64
-	epsilonIndex      int
-	epsilonValue      float64
-	epsilonPercentage float64
-}
-
 // --- Value Calculators -----------------
 
 type EpsilonValueCalculator interface {
@@ -110,10 +96,7 @@ func New(hosts []string) HostPool {
 	}
 
 	for i, h := range hosts {
-		e := &hostEntry{
-			host:       h,
-			retryDelay: p.initialRetryDelay,
-		}
+		e := newHostEntry(h, p.initialRetryDelay, p.maxRetryInterval)
 		p.hosts[h] = e
 		p.hostList[i] = e
 	}
@@ -173,9 +156,9 @@ func NewEpsilonGreedy(hosts []string, decayDuration time.Duration, calc EpsilonV
 	if decayDuration <= 0 {
 		decayDuration = defaultDecayDuration
 	}
-	stdHP := New(hosts).(*standardHostPool)
+	// stdHP := New(hosts).(*standardHostPool)
 	p := &epsilonGreedyHostPool{
-		standardHostPool:       *stdHP,
+		HostPool:               New(hosts),
 		epsilon:                float32(initialEpsilon),
 		decayDuration:          decayDuration,
 		EpsilonValueCalculator: calc,
@@ -247,14 +230,12 @@ func (p *standardHostPool) getRoundRobin() string {
 		currentIndex := (i + p.nextHostIndex) % hostCount
 
 		h := p.hostList[currentIndex]
-		if !h.dead {
-			p.nextHostIndex = currentIndex + 1
-			return h.host
-		}
-		if h.nextRetry.Before(now) {
-			h.willRetryHost(p.maxRetryInterval)
+		if h.canTryHost(now) {
+			if h.IsDead() {
+				h.willRetryHost()
+			}
 			p.nextHostIndex = currentIndex + 1
-			return h.host
+			return h.Host()
 		}
 	}
 
@@ -376,7 +357,7 @@ func (p *standardHostPool) ResetAll() {
 // already been acquired
 func (p *standardHostPool) doResetAll() {
 	for _, h := range p.hosts {
-		h.dead = false
+		h.SetDead(false)
 	}
 }
 
@@ -389,12 +370,12 @@ func (p *standardHostPool) markSuccess(hostR HostPoolResponse) {
 	if !ok {
 		log.Fatalf("host %s not in HostPool %v", host, p.Hosts())
 	}
-	h.dead = false
+	h.SetDead(false)
 }
 
 func (p *epsilonGreedyHostPool) markSuccess(hostR HostPoolResponse) {
 	// first do the base markSuccess - a little redundant with host lookup but cleaner than repeating logic
-	p.standardHostPool.markSuccess(hostR)
+	p.HostPool.markSuccess(hostR)
 	eHostR, ok := hostR.(*epsilonHostPoolResponse)
 	if !ok {
 		log.Printf("Incorrect type in eps markSuccess!") // TODO reflection to print out offending type
@@ -421,14 +402,9 @@ func (p *standardHostPool) markFailed(hostR HostPoolResponse) {
 	if !ok {
 		log.Fatalf("host %s not in HostPool %v", host, p.Hosts())
 	}
-	if !h.dead {
-		h.dead = true
-		h.retryCount = 0
-		h.retryDelay = p.initialRetryDelay
-		h.nextRetry = time.Now().Add(h.retryDelay)
-	}
-
+	h.SetDead(true)
 }
+
 func (p *standardHostPool) Hosts() []string {
 	hosts := make([]string, len(p.hosts))
 	for host, _ := range p.hosts {