health_balancer.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. // Copyright 2017 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package clientv3
  15. import (
  16. "context"
  17. "sync"
  18. "time"
  19. "google.golang.org/grpc"
  20. "google.golang.org/grpc/codes"
  21. healthpb "google.golang.org/grpc/health/grpc_health_v1"
  22. "google.golang.org/grpc/status"
  23. )
  24. const minHealthRetryDuration = 3 * time.Second
  25. const unknownService = "unknown service grpc.health.v1.Health"
  26. type healthCheckFunc func(ep string) (bool, error)
  27. // healthBalancer wraps a balancer so that it uses health checking
  28. // to choose its endpoints.
  29. type healthBalancer struct {
  30. balancer
  31. // healthCheck checks an endpoint's health.
  32. healthCheck healthCheckFunc
  33. healthCheckTimeout time.Duration
  34. // mu protects addrs, eps, unhealthy map, and stopc.
  35. mu sync.RWMutex
  36. // addrs stores all grpc addresses associated with the balancer.
  37. addrs []grpc.Address
  38. // eps stores all client endpoints
  39. eps []string
  40. // unhealthy tracks the last unhealthy time of endpoints.
  41. unhealthy map[string]time.Time
  42. stopc chan struct{}
  43. stopOnce sync.Once
  44. hostPort2ep map[string]string
  45. wg sync.WaitGroup
  46. }
  47. func newHealthBalancer(b balancer, timeout time.Duration, hc healthCheckFunc) *healthBalancer {
  48. hb := &healthBalancer{
  49. balancer: b,
  50. healthCheck: hc,
  51. eps: b.endpoints(),
  52. addrs: eps2addrs(b.endpoints()),
  53. hostPort2ep: getHostPort2ep(b.endpoints()),
  54. unhealthy: make(map[string]time.Time),
  55. stopc: make(chan struct{}),
  56. }
  57. if timeout < minHealthRetryDuration {
  58. timeout = minHealthRetryDuration
  59. }
  60. hb.healthCheckTimeout = timeout
  61. hb.wg.Add(1)
  62. go func() {
  63. defer hb.wg.Done()
  64. hb.updateUnhealthy(timeout)
  65. }()
  66. return hb
  67. }
  68. func (hb *healthBalancer) Up(addr grpc.Address) func(error) {
  69. f, used := hb.up(addr)
  70. if !used {
  71. return f
  72. }
  73. return func(err error) {
  74. // If connected to a black hole endpoint or a killed server, the gRPC ping
  75. // timeout will induce a network I/O error, and retrying until success;
  76. // finding healthy endpoint on retry could take several timeouts and redials.
  77. // To avoid wasting retries, gray-list unhealthy endpoints.
  78. hb.mu.Lock()
  79. hb.unhealthy[addr.Addr] = time.Now()
  80. hb.mu.Unlock()
  81. f(err)
  82. if logger.V(4) {
  83. logger.Infof("clientv3/health-balancer: %q becomes unhealthy (%q)", addr.Addr, err.Error())
  84. }
  85. }
  86. }
  87. func (hb *healthBalancer) up(addr grpc.Address) (func(error), bool) {
  88. if !hb.mayPin(addr) {
  89. return func(err error) {}, false
  90. }
  91. return hb.balancer.up(addr)
  92. }
  93. func (hb *healthBalancer) Close() error {
  94. hb.stopOnce.Do(func() { close(hb.stopc) })
  95. hb.wg.Wait()
  96. return hb.balancer.Close()
  97. }
  98. func (hb *healthBalancer) updateAddrs(eps ...string) {
  99. addrs, hostPort2ep := eps2addrs(eps), getHostPort2ep(eps)
  100. hb.mu.Lock()
  101. hb.addrs, hb.eps, hb.hostPort2ep = addrs, eps, hostPort2ep
  102. hb.unhealthy = make(map[string]time.Time)
  103. hb.mu.Unlock()
  104. hb.balancer.updateAddrs(eps...)
  105. }
  106. func (hb *healthBalancer) endpoint(host string) string {
  107. hb.mu.RLock()
  108. defer hb.mu.RUnlock()
  109. return hb.hostPort2ep[host]
  110. }
  111. func (hb *healthBalancer) endpoints() []string {
  112. hb.mu.RLock()
  113. defer hb.mu.RUnlock()
  114. return hb.eps
  115. }
  116. func (hb *healthBalancer) updateUnhealthy(timeout time.Duration) {
  117. for {
  118. select {
  119. case <-time.After(timeout):
  120. hb.mu.Lock()
  121. for k, v := range hb.unhealthy {
  122. if _, ok := hb.hostPort2ep[k]; !ok {
  123. delete(hb.unhealthy, k)
  124. if logger.V(4) {
  125. logger.Infof("clientv3/health-balancer: removes stale host:port %q from unhealthy", k)
  126. }
  127. continue
  128. }
  129. if time.Since(v) > timeout {
  130. delete(hb.unhealthy, k)
  131. if logger.V(4) {
  132. logger.Infof("clientv3/health-balancer: removes %q from unhealthy after %v", k, timeout)
  133. }
  134. }
  135. }
  136. hb.mu.Unlock()
  137. eps := []string{}
  138. for _, addr := range hb.liveAddrs() {
  139. eps = append(eps, hb.endpoint(addr.Addr))
  140. }
  141. hb.balancer.updateAddrs(eps...)
  142. case <-hb.stopc:
  143. return
  144. }
  145. }
  146. }
  147. func (hb *healthBalancer) liveAddrs() []grpc.Address {
  148. hb.mu.RLock()
  149. defer hb.mu.RUnlock()
  150. hbAddrs := hb.addrs
  151. if len(hb.addrs) == 1 || len(hb.unhealthy) == 0 || len(hb.unhealthy) == len(hb.addrs) {
  152. return hbAddrs
  153. }
  154. addrs := make([]grpc.Address, 0, len(hb.addrs)-len(hb.unhealthy))
  155. for _, addr := range hb.addrs {
  156. if _, unhealthy := hb.unhealthy[addr.Addr]; !unhealthy {
  157. addrs = append(addrs, addr)
  158. }
  159. }
  160. return addrs
  161. }
  162. func (hb *healthBalancer) hostPortError(hostPort string, err error) {
  163. hb.mu.Lock()
  164. hb.unhealthy[hostPort] = time.Now()
  165. hb.mu.Unlock()
  166. if logger.V(4) {
  167. logger.Infof("clientv3/health-balancer: marking %q as unhealthy (%q)", hostPort, err.Error())
  168. }
  169. }
  170. func (hb *healthBalancer) mayPin(addr grpc.Address) bool {
  171. hb.mu.RLock()
  172. if _, ok := hb.hostPort2ep[addr.Addr]; !ok { // stale host:port
  173. hb.mu.RUnlock()
  174. return false
  175. }
  176. skip := len(hb.addrs) == 1 || len(hb.unhealthy) == 0 || len(hb.addrs) == len(hb.unhealthy)
  177. failedTime, bad := hb.unhealthy[addr.Addr]
  178. dur := hb.healthCheckTimeout
  179. hb.mu.RUnlock()
  180. if skip || !bad {
  181. return true
  182. }
  183. // prevent isolated member's endpoint from being infinitely retried, as follows:
  184. // 1. keepalive pings detects GoAway with http2.ErrCodeEnhanceYourCalm
  185. // 2. balancer 'Up' unpins with grpc: failed with network I/O error
  186. // 3. grpc-healthcheck still SERVING, thus retry to pin
  187. // instead, return before grpc-healthcheck if failed within healthcheck timeout
  188. if elapsed := time.Since(failedTime); elapsed < dur {
  189. if logger.V(4) {
  190. logger.Infof("clientv3/health-balancer: %q is up but not pinned (failed %v ago, require minimum %v after failure)", addr.Addr, elapsed, dur)
  191. }
  192. return false
  193. }
  194. if ok, _ := hb.healthCheck(addr.Addr); ok {
  195. hb.mu.Lock()
  196. delete(hb.unhealthy, addr.Addr)
  197. hb.mu.Unlock()
  198. if logger.V(4) {
  199. logger.Infof("clientv3/health-balancer: %q is healthy (health check success)", addr.Addr)
  200. }
  201. return true
  202. }
  203. hb.mu.Lock()
  204. hb.unhealthy[addr.Addr] = time.Now()
  205. hb.mu.Unlock()
  206. if logger.V(4) {
  207. logger.Infof("clientv3/health-balancer: %q becomes unhealthy (health check failed)", addr.Addr)
  208. }
  209. return false
  210. }
  211. func grpcHealthCheck(client *Client, ep string) (bool, error) {
  212. conn, err := client.dial(ep)
  213. if err != nil {
  214. return false, err
  215. }
  216. defer conn.Close()
  217. cli := healthpb.NewHealthClient(conn)
  218. ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  219. resp, err := cli.Check(ctx, &healthpb.HealthCheckRequest{})
  220. cancel()
  221. if err != nil {
  222. if s, ok := status.FromError(err); ok && s.Code() == codes.Unavailable {
  223. if s.Message() == unknownService {
  224. // etcd < v3.3.0
  225. return true, nil
  226. }
  227. }
  228. return false, err
  229. }
  230. return resp.Status == healthpb.HealthCheckResponse_SERVING, nil
  231. }