|
|
@@ -36,7 +36,8 @@ type healthBalancer struct {
|
|
|
balancer
|
|
|
|
|
|
// healthCheck checks an endpoint's health.
|
|
|
- healthCheck healthCheckFunc
|
|
|
+ healthCheck healthCheckFunc
|
|
|
+ healthCheckTimeout time.Duration
|
|
|
|
|
|
// mu protects addrs, eps, unhealthy map, and stopc.
|
|
|
mu sync.RWMutex
|
|
|
@@ -71,6 +72,7 @@ func newHealthBalancer(b balancer, timeout time.Duration, hc healthCheckFunc) *h
|
|
|
if timeout < minHealthRetryDuration {
|
|
|
timeout = minHealthRetryDuration
|
|
|
}
|
|
|
+ hb.healthCheckTimeout = timeout
|
|
|
|
|
|
hb.wg.Add(1)
|
|
|
go func() {
|
|
|
@@ -95,6 +97,9 @@ func (hb *healthBalancer) Up(addr grpc.Address) func(error) {
|
|
|
hb.unhealthy[addr.Addr] = time.Now()
|
|
|
hb.mu.Unlock()
|
|
|
f(err)
|
|
|
+ if logger.V(4) {
|
|
|
+ logger.Infof("clientv3/health-balancer: %s becomes unhealthy (%v)", addr.Addr, err)
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -140,7 +145,7 @@ func (hb *healthBalancer) updateUnhealthy(timeout time.Duration) {
|
|
|
if time.Since(v) > timeout {
|
|
|
delete(hb.unhealthy, k)
|
|
|
if logger.V(4) {
|
|
|
- logger.Infof("clientv3/balancer: removes %s from unhealthy after %v", k, timeout)
|
|
|
+ logger.Infof("clientv3/health-balancer: removes %s from unhealthy after %v", k, timeout)
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
@@ -175,17 +180,29 @@ func (hb *healthBalancer) liveAddrs() []grpc.Address {
|
|
|
func (hb *healthBalancer) mayPin(addr grpc.Address) bool {
|
|
|
hb.mu.RLock()
|
|
|
skip := len(hb.addrs) == 1 || len(hb.unhealthy) == 0
|
|
|
- _, bad := hb.unhealthy[addr.Addr]
|
|
|
+ failedTime, bad := hb.unhealthy[addr.Addr]
|
|
|
+ dur := hb.healthCheckTimeout
|
|
|
hb.mu.RUnlock()
|
|
|
if skip || !bad {
|
|
|
return true
|
|
|
}
|
|
|
+ // prevent isolated member's endpoint from being infinitely retried, as follows:
|
|
|
+ // 1. keepalive pings detects GoAway with http2.ErrCodeEnhanceYourCalm
|
|
|
+ // 2. balancer 'Up' unpins with grpc: failed with network I/O error
|
|
|
+ // 3. grpc-healthcheck still SERVING, thus retry to pin
|
|
|
+ // instead, return before grpc-healthcheck if failed within healthcheck timeout
|
|
|
+ if elapsed := time.Since(failedTime); elapsed < dur {
|
|
|
+ if logger.V(4) {
|
|
|
+ logger.Infof("clientv3/health-balancer: %s is up but not pinned (failed %v ago, require minimum %v after failure)", addr.Addr, elapsed, dur)
|
|
|
+ }
|
|
|
+ return false
|
|
|
+ }
|
|
|
if ok, _ := hb.healthCheck(addr.Addr); ok {
|
|
|
hb.mu.Lock()
|
|
|
delete(hb.unhealthy, addr.Addr)
|
|
|
hb.mu.Unlock()
|
|
|
if logger.V(4) {
|
|
|
- logger.Infof("clientv3/balancer: %s is healthy", addr.Addr)
|
|
|
+ logger.Infof("clientv3/health-balancer: %s is healthy (health check success)", addr.Addr)
|
|
|
}
|
|
|
return true
|
|
|
}
|
|
|
@@ -193,7 +210,7 @@ func (hb *healthBalancer) mayPin(addr grpc.Address) bool {
|
|
|
hb.unhealthy[addr.Addr] = time.Now()
|
|
|
hb.mu.Unlock()
|
|
|
if logger.V(4) {
|
|
|
- logger.Infof("clientv3/balancer: %s becomes unhealthy", addr.Addr)
|
|
|
+ logger.Infof("clientv3/health-balancer: %s becomes unhealthy (health check failed)", addr.Addr)
|
|
|
}
|
|
|
return false
|
|
|
}
|