health_balancer.go 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. // Copyright 2017 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package clientv3
  15. import (
  16. "context"
  17. "sync"
  18. "time"
  19. "google.golang.org/grpc"
  20. "google.golang.org/grpc/codes"
  21. healthpb "google.golang.org/grpc/health/grpc_health_v1"
  22. "google.golang.org/grpc/status"
  23. )
  24. const minHealthRetryDuration = 3 * time.Second
  25. const unknownService = "unknown service grpc.health.v1.Health"
  26. type healthCheckFunc func(ep string) (bool, error)
  27. // healthBalancer wraps a balancer so that it uses health checking
  28. // to choose its endpoints.
  29. type healthBalancer struct {
  30. *simpleBalancer
  31. // healthCheck checks an endpoint's health.
  32. healthCheck healthCheckFunc
  33. healthCheckTimeout time.Duration
  34. // mu protects addrs, eps, unhealthy map, and stopc.
  35. mu sync.RWMutex
  36. // addrs stores all grpc addresses associated with the balancer.
  37. addrs []grpc.Address
  38. // eps stores all client endpoints
  39. eps []string
  40. // unhealthy tracks the last unhealthy time of endpoints.
  41. unhealthy map[string]time.Time
  42. stopc chan struct{}
  43. stopOnce sync.Once
  44. hostPort2ep map[string]string
  45. wg sync.WaitGroup
  46. }
  47. func newHealthBalancer(b *simpleBalancer, timeout time.Duration, hc healthCheckFunc) *healthBalancer {
  48. hb := &healthBalancer{
  49. simpleBalancer: b,
  50. healthCheck: hc,
  51. eps: b.endpoints(),
  52. addrs: eps2addrs(b.endpoints()),
  53. hostPort2ep: getHostPort2ep(b.endpoints()),
  54. unhealthy: make(map[string]time.Time),
  55. stopc: make(chan struct{}),
  56. }
  57. if timeout < minHealthRetryDuration {
  58. timeout = minHealthRetryDuration
  59. }
  60. hb.healthCheckTimeout = timeout
  61. hb.wg.Add(1)
  62. go func() {
  63. defer hb.wg.Done()
  64. hb.updateUnhealthy(timeout)
  65. }()
  66. return hb
  67. }
  68. func (hb *healthBalancer) Up(addr grpc.Address) func(error) {
  69. f, used := hb.up(addr)
  70. if !used {
  71. return f
  72. }
  73. return func(err error) {
  74. // If connected to a black hole endpoint or a killed server, the gRPC ping
  75. // timeout will induce a network I/O error, and retrying until success;
  76. // finding healthy endpoint on retry could take several timeouts and redials.
  77. // To avoid wasting retries, gray-list unhealthy endpoints.
  78. hb.hostPortError(addr.Addr, err)
  79. f(err)
  80. }
  81. }
  82. func (hb *healthBalancer) up(addr grpc.Address) (func(error), bool) {
  83. if !hb.mayPin(addr) {
  84. return func(err error) {}, false
  85. }
  86. return hb.simpleBalancer.up(addr)
  87. }
  88. func (hb *healthBalancer) Close() error {
  89. hb.stopOnce.Do(func() { close(hb.stopc) })
  90. hb.wg.Wait()
  91. return hb.simpleBalancer.Close()
  92. }
  93. func (hb *healthBalancer) updateAddrs(eps ...string) {
  94. addrs, hostPort2ep := eps2addrs(eps), getHostPort2ep(eps)
  95. hb.mu.Lock()
  96. hb.addrs, hb.eps, hb.hostPort2ep = addrs, eps, hostPort2ep
  97. hb.unhealthy = make(map[string]time.Time)
  98. hb.mu.Unlock()
  99. hb.simpleBalancer.updateAddrs(eps...)
  100. }
  101. func (hb *healthBalancer) endpoint(host string) string {
  102. hb.mu.RLock()
  103. defer hb.mu.RUnlock()
  104. return hb.hostPort2ep[host]
  105. }
  106. func (hb *healthBalancer) endpoints() []string {
  107. hb.mu.RLock()
  108. defer hb.mu.RUnlock()
  109. return hb.eps
  110. }
  111. func (hb *healthBalancer) updateUnhealthy(timeout time.Duration) {
  112. for {
  113. select {
  114. case <-time.After(timeout):
  115. hb.mu.Lock()
  116. for k, v := range hb.unhealthy {
  117. if time.Since(v) > timeout {
  118. delete(hb.unhealthy, k)
  119. if logger.V(4) {
  120. logger.Infof("clientv3/health-balancer: removes %q from unhealthy after %v", k, timeout)
  121. }
  122. }
  123. }
  124. hb.mu.Unlock()
  125. eps := []string{}
  126. for _, addr := range hb.liveAddrs() {
  127. eps = append(eps, hb.endpoint(addr.Addr))
  128. }
  129. hb.simpleBalancer.updateAddrs(eps...)
  130. case <-hb.stopc:
  131. return
  132. }
  133. }
  134. }
  135. func (hb *healthBalancer) liveAddrs() []grpc.Address {
  136. hb.mu.RLock()
  137. defer hb.mu.RUnlock()
  138. hbAddrs := hb.addrs
  139. if len(hb.addrs) == 1 || len(hb.unhealthy) == 0 || len(hb.unhealthy) == len(hb.addrs) {
  140. return hbAddrs
  141. }
  142. addrs := make([]grpc.Address, 0, len(hb.addrs)-len(hb.unhealthy))
  143. for _, addr := range hb.addrs {
  144. if _, unhealthy := hb.unhealthy[addr.Addr]; !unhealthy {
  145. addrs = append(addrs, addr)
  146. }
  147. }
  148. return addrs
  149. }
  150. func (hb *healthBalancer) hostPortError(hostPort string, err error) {
  151. hb.mu.Lock()
  152. if _, ok := hb.hostPort2ep[hostPort]; ok {
  153. hb.unhealthy[hostPort] = time.Now()
  154. if logger.V(4) {
  155. logger.Infof("clientv3/health-balancer: marking %q as unhealthy (%q)", hostPort, err.Error())
  156. }
  157. }
  158. hb.mu.Unlock()
  159. }
  160. func (hb *healthBalancer) mayPin(addr grpc.Address) bool {
  161. hb.mu.RLock()
  162. if _, ok := hb.hostPort2ep[addr.Addr]; !ok { // stale host:port
  163. hb.mu.RUnlock()
  164. return false
  165. }
  166. skip := len(hb.addrs) == 1 || len(hb.unhealthy) == 0 || len(hb.addrs) == len(hb.unhealthy)
  167. failedTime, bad := hb.unhealthy[addr.Addr]
  168. dur := hb.healthCheckTimeout
  169. hb.mu.RUnlock()
  170. if skip || !bad {
  171. return true
  172. }
  173. // prevent isolated member's endpoint from being infinitely retried, as follows:
  174. // 1. keepalive pings detects GoAway with http2.ErrCodeEnhanceYourCalm
  175. // 2. balancer 'Up' unpins with grpc: failed with network I/O error
  176. // 3. grpc-healthcheck still SERVING, thus retry to pin
  177. // instead, return before grpc-healthcheck if failed within healthcheck timeout
  178. if elapsed := time.Since(failedTime); elapsed < dur {
  179. if logger.V(4) {
  180. logger.Infof("clientv3/health-balancer: %q is up but not pinned (failed %v ago, require minimum %v after failure)", addr.Addr, elapsed, dur)
  181. }
  182. return false
  183. }
  184. if ok, _ := hb.healthCheck(addr.Addr); ok {
  185. hb.mu.Lock()
  186. delete(hb.unhealthy, addr.Addr)
  187. hb.mu.Unlock()
  188. if logger.V(4) {
  189. logger.Infof("clientv3/health-balancer: %q is healthy (health check success)", addr.Addr)
  190. }
  191. return true
  192. }
  193. hb.mu.Lock()
  194. hb.unhealthy[addr.Addr] = time.Now()
  195. hb.mu.Unlock()
  196. if logger.V(4) {
  197. logger.Infof("clientv3/health-balancer: %q becomes unhealthy (health check failed)", addr.Addr)
  198. }
  199. return false
  200. }
  201. func grpcHealthCheck(client *Client, ep string) (bool, error) {
  202. conn, err := client.dial(ep)
  203. if err != nil {
  204. return false, err
  205. }
  206. defer conn.Close()
  207. cli := healthpb.NewHealthClient(conn)
  208. ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  209. resp, err := cli.Check(ctx, &healthpb.HealthCheckRequest{})
  210. cancel()
  211. if err != nil {
  212. if s, ok := status.FromError(err); ok && s.Code() == codes.Unavailable {
  213. if s.Message() == unknownService {
  214. // etcd < v3.3.0
  215. return true, nil
  216. }
  217. }
  218. return false, err
  219. }
  220. return resp.Status == healthpb.HealthCheckResponse_SERVING, nil
  221. }