| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152 |
- package hostpool
- import (
- "log"
- "math"
- "time"
- )
- type HostEntry interface {
- IsDead() bool
- Host() string
- SetDead(bool)
- canTryHost(time.Time) bool
- willRetryHost()
- Close()
- }
- // -- Requests
- type hostEntryRequest interface {
- getRespChan() chan<- interface{}
- }
- type baseHostEntryRequest struct {
- respChan chan interface{}
- }
- func (req *baseHostEntryRequest) getRespChan() chan<- interface{} {
- return req.respChan
- }
- type isDeadRequest struct{ baseHostEntryRequest }
- type setDeadRequest struct {
- baseHostEntryRequest
- setDeadTo bool
- }
- type canTryRequest struct {
- baseHostEntryRequest
- atTime time.Time
- }
- type willRetryRequest struct{ baseHostEntryRequest }
- type hostEntry struct {
- host string
- nextRetry time.Time
- retryDelay time.Duration
- initialRetryDelay time.Duration
- maxRetryInterval time.Duration
- dead bool
- incomingRequests chan hostEntryRequest
- }
- func (he *hostEntry) Host() string {
- // This never changes, so we can safely return it
- return he.host
- }
- func newHostEntry(host string, initialRetryDelay time.Duration, maxRetryInterval time.Duration) HostEntry {
- he := &hostEntry{
- host: host,
- retryDelay: initialRetryDelay,
- initialRetryDelay: initialRetryDelay,
- maxRetryInterval: maxRetryInterval,
- incomingRequests: make(chan hostEntryRequest),
- }
- go he.handleRequests()
- return he
- }
- func (he *hostEntry) handleRequests() {
- for req := range he.incomingRequests {
- var resp interface{}
- switch req.(type) {
- case *isDeadRequest:
- resp = he.dead
- case *setDeadRequest:
- newVal := req.(*setDeadRequest).setDeadTo
- if newVal && !he.dead {
- // Entering the deadpool - initialize retry
- he.retryDelay = he.initialRetryDelay
- he.nextRetry = time.Now().Add(he.retryDelay)
- }
- he.dead = newVal
- case *canTryRequest:
- resp = !he.dead || he.nextRetry.Before(req.(*canTryRequest).atTime)
- case *willRetryRequest:
- he.retryDelay = time.Duration(int64(math.Min(float64(he.retryDelay*2), float64(he.maxRetryInterval))))
- he.nextRetry = time.Now().Add(he.retryDelay)
- }
- req.getRespChan() <- resp
- }
- }
- func (he *hostEntry) IsDead() bool {
- req := &isDeadRequest{
- baseHostEntryRequest{
- respChan: make(chan interface{}),
- },
- }
- he.incomingRequests <- req
- resp := <-req.respChan
- isDeadResp, ok := resp.(bool)
- if !ok {
- log.Fatal("Got incorrect response type from host_entry muxer in IsDead")
- }
- return isDeadResp
- }
- func (he *hostEntry) SetDead(newDeadVal bool) {
- req := &setDeadRequest{
- baseHostEntryRequest{
- respChan: make(chan interface{}),
- },
- newDeadVal,
- }
- he.incomingRequests <- req
- <-req.respChan
- }
- func (he *hostEntry) canTryHost(now time.Time) bool {
- req := &canTryRequest{
- baseHostEntryRequest{
- respChan: make(chan interface{}),
- },
- now,
- }
- he.incomingRequests <- req
- resp := <-req.respChan
- canTryResp, ok := resp.(bool)
- if !ok {
- log.Fatal("Got incorrect response type from host_entry muxer in canTryHost")
- }
- return canTryResp
- }
- func (he *hostEntry) willRetryHost() {
- req := &willRetryRequest{
- baseHostEntryRequest{
- respChan: make(chan interface{}),
- },
- }
- he.incomingRequests <- req
- <-req.respChan
- }
- func (he *hostEntry) Close() {
- close(he.incomingRequests)
- }
|