health_balancer.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. // Copyright 2017 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package clientv3
  15. import (
  16. "context"
  17. "sync"
  18. "time"
  19. "google.golang.org/grpc"
  20. "google.golang.org/grpc/codes"
  21. healthpb "google.golang.org/grpc/health/grpc_health_v1"
  22. "google.golang.org/grpc/status"
  23. )
  24. const minHealthRetryDuration = 3 * time.Second
  25. const unknownService = "unknown service grpc.health.v1.Health"
  26. type healthCheckFunc func(ep string) (bool, error)
  27. // healthBalancer wraps a balancer so that it uses health checking
  28. // to choose its endpoints.
  29. type healthBalancer struct {
  30. balancer
  31. // healthCheck checks an endpoint's health.
  32. healthCheck healthCheckFunc
  33. // mu protects addrs, eps, unhealthy map, and stopc.
  34. mu sync.RWMutex
  35. // addrs stores all grpc addresses associated with the balancer.
  36. addrs []grpc.Address
  37. // eps stores all client endpoints
  38. eps []string
  39. // unhealthy tracks the last unhealthy time of endpoints.
  40. unhealthy map[string]time.Time
  41. stopc chan struct{}
  42. stopOnce sync.Once
  43. host2ep map[string]string
  44. wg sync.WaitGroup
  45. }
  46. func newHealthBalancer(b balancer, timeout time.Duration, hc healthCheckFunc) *healthBalancer {
  47. hb := &healthBalancer{
  48. balancer: b,
  49. healthCheck: hc,
  50. eps: b.endpoints(),
  51. addrs: eps2addrs(b.endpoints()),
  52. host2ep: getHost2ep(b.endpoints()),
  53. unhealthy: make(map[string]time.Time),
  54. stopc: make(chan struct{}),
  55. }
  56. if timeout < minHealthRetryDuration {
  57. timeout = minHealthRetryDuration
  58. }
  59. hb.wg.Add(1)
  60. go func() {
  61. defer hb.wg.Done()
  62. hb.updateUnhealthy(timeout)
  63. }()
  64. return hb
  65. }
  66. func (hb *healthBalancer) Up(addr grpc.Address) func(error) {
  67. f, used := hb.up(addr)
  68. if !used {
  69. return f
  70. }
  71. return func(err error) {
  72. // If connected to a black hole endpoint or a killed server, the gRPC ping
  73. // timeout will induce a network I/O error, and retrying until success;
  74. // finding healthy endpoint on retry could take several timeouts and redials.
  75. // To avoid wasting retries, gray-list unhealthy endpoints.
  76. hb.mu.Lock()
  77. hb.unhealthy[addr.Addr] = time.Now()
  78. hb.mu.Unlock()
  79. f(err)
  80. }
  81. }
  82. func (hb *healthBalancer) up(addr grpc.Address) (func(error), bool) {
  83. if !hb.mayPin(addr) {
  84. return func(err error) {}, false
  85. }
  86. return hb.balancer.up(addr)
  87. }
  88. func (hb *healthBalancer) Close() error {
  89. hb.stopOnce.Do(func() { close(hb.stopc) })
  90. hb.wg.Wait()
  91. return hb.balancer.Close()
  92. }
  93. func (hb *healthBalancer) updateAddrs(eps ...string) {
  94. addrs, host2ep := eps2addrs(eps), getHost2ep(eps)
  95. hb.mu.Lock()
  96. hb.addrs, hb.eps, hb.host2ep = addrs, eps, host2ep
  97. hb.mu.Unlock()
  98. hb.balancer.updateAddrs(eps...)
  99. }
  100. func (hb *healthBalancer) endpoint(host string) string {
  101. hb.mu.RLock()
  102. defer hb.mu.RUnlock()
  103. return hb.host2ep[host]
  104. }
  105. func (hb *healthBalancer) endpoints() []string {
  106. hb.mu.RLock()
  107. defer hb.mu.RUnlock()
  108. return hb.eps
  109. }
  110. func (hb *healthBalancer) updateUnhealthy(timeout time.Duration) {
  111. for {
  112. select {
  113. case <-time.After(timeout):
  114. hb.mu.Lock()
  115. for k, v := range hb.unhealthy {
  116. if time.Since(v) > timeout {
  117. delete(hb.unhealthy, k)
  118. }
  119. }
  120. hb.mu.Unlock()
  121. eps := []string{}
  122. for _, addr := range hb.liveAddrs() {
  123. eps = append(eps, hb.endpoint(addr.Addr))
  124. }
  125. hb.balancer.updateAddrs(eps...)
  126. case <-hb.stopc:
  127. return
  128. }
  129. }
  130. }
  131. func (hb *healthBalancer) liveAddrs() []grpc.Address {
  132. hb.mu.RLock()
  133. defer hb.mu.RUnlock()
  134. hbAddrs := hb.addrs
  135. if len(hb.addrs) == 1 || len(hb.unhealthy) == 0 || len(hb.unhealthy) == len(hb.addrs) {
  136. return hbAddrs
  137. }
  138. addrs := make([]grpc.Address, 0, len(hb.addrs)-len(hb.unhealthy))
  139. for _, addr := range hb.addrs {
  140. if _, unhealthy := hb.unhealthy[addr.Addr]; !unhealthy {
  141. addrs = append(addrs, addr)
  142. }
  143. }
  144. return addrs
  145. }
  146. func (hb *healthBalancer) mayPin(addr grpc.Address) bool {
  147. hb.mu.RLock()
  148. skip := len(hb.addrs) == 1 || len(hb.unhealthy) == 0
  149. _, bad := hb.unhealthy[addr.Addr]
  150. hb.mu.RUnlock()
  151. if skip || !bad {
  152. return true
  153. }
  154. if ok, _ := hb.healthCheck(addr.Addr); ok {
  155. hb.mu.Lock()
  156. delete(hb.unhealthy, addr.Addr)
  157. hb.mu.Unlock()
  158. return true
  159. }
  160. hb.mu.Lock()
  161. hb.unhealthy[addr.Addr] = time.Now()
  162. hb.mu.Unlock()
  163. return false
  164. }
  165. func grpcHealthCheck(client *Client, ep string) (bool, error) {
  166. conn, err := client.dial(ep)
  167. if err != nil {
  168. return false, err
  169. }
  170. defer conn.Close()
  171. cli := healthpb.NewHealthClient(conn)
  172. ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  173. resp, err := cli.Check(ctx, &healthpb.HealthCheckRequest{})
  174. cancel()
  175. if err != nil {
  176. if s, ok := status.FromError(err); ok && s.Code() == codes.Unavailable {
  177. if s.Message() == unknownService {
  178. // etcd < v3.3.0
  179. return true, nil
  180. }
  181. }
  182. return false, err
  183. }
  184. return resp.Status == healthpb.HealthCheckResponse_SERVING, nil
  185. }