health_balancer.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609
  1. // Copyright 2017 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package clientv3
  15. import (
  16. "context"
  17. "errors"
  18. "net/url"
  19. "strings"
  20. "sync"
  21. "time"
  22. "google.golang.org/grpc"
  23. "google.golang.org/grpc/codes"
  24. healthpb "google.golang.org/grpc/health/grpc_health_v1"
  25. "google.golang.org/grpc/status"
  26. )
  27. const (
  28. minHealthRetryDuration = 3 * time.Second
  29. unknownService = "unknown service grpc.health.v1.Health"
  30. )
  31. // ErrNoAddrAvilable is returned by Get() when the balancer does not have
  32. // any active connection to endpoints at the time.
  33. // This error is returned only when opts.BlockingWait is true.
  34. var ErrNoAddrAvilable = status.Error(codes.Unavailable, "there is no address available")
  35. type healthCheckFunc func(ep string) (bool, error)
  36. type notifyMsg int
  37. const (
  38. notifyReset notifyMsg = iota
  39. notifyNext
  40. )
  41. // healthBalancer does the bare minimum to expose multiple eps
  42. // to the grpc reconnection code path
  43. type healthBalancer struct {
  44. // addrs are the client's endpoint addresses for grpc
  45. addrs []grpc.Address
  46. // eps holds the raw endpoints from the client
  47. eps []string
  48. // notifyCh notifies grpc of the set of addresses for connecting
  49. notifyCh chan []grpc.Address
  50. // readyc closes once the first connection is up
  51. readyc chan struct{}
  52. readyOnce sync.Once
  53. // healthCheck checks an endpoint's health.
  54. healthCheck healthCheckFunc
  55. healthCheckTimeout time.Duration
  56. unhealthyMu sync.RWMutex
  57. unhealthyHostPorts map[string]time.Time
  58. // mu protects all fields below.
  59. mu sync.RWMutex
  60. // upc closes when pinAddr transitions from empty to non-empty or the balancer closes.
  61. upc chan struct{}
  62. // downc closes when grpc calls down() on pinAddr
  63. downc chan struct{}
  64. // stopc is closed to signal updateNotifyLoop should stop.
  65. stopc chan struct{}
  66. stopOnce sync.Once
  67. wg sync.WaitGroup
  68. // donec closes when all goroutines are exited
  69. donec chan struct{}
  70. // updateAddrsC notifies updateNotifyLoop to update addrs.
  71. updateAddrsC chan notifyMsg
  72. // grpc issues TLS cert checks using the string passed into dial so
  73. // that string must be the host. To recover the full scheme://host URL,
  74. // have a map from hosts to the original endpoint.
  75. hostPort2ep map[string]string
  76. // pinAddr is the currently pinned address; set to the empty string on
  77. // initialization and shutdown.
  78. pinAddr string
  79. closed bool
  80. }
  81. func newHealthBalancer(eps []string, timeout time.Duration, hc healthCheckFunc) *healthBalancer {
  82. notifyCh := make(chan []grpc.Address)
  83. addrs := eps2addrs(eps)
  84. hb := &healthBalancer{
  85. addrs: addrs,
  86. eps: eps,
  87. notifyCh: notifyCh,
  88. readyc: make(chan struct{}),
  89. healthCheck: hc,
  90. unhealthyHostPorts: make(map[string]time.Time),
  91. upc: make(chan struct{}),
  92. stopc: make(chan struct{}),
  93. downc: make(chan struct{}),
  94. donec: make(chan struct{}),
  95. updateAddrsC: make(chan notifyMsg),
  96. hostPort2ep: getHostPort2ep(eps),
  97. }
  98. if timeout < minHealthRetryDuration {
  99. timeout = minHealthRetryDuration
  100. }
  101. hb.healthCheckTimeout = timeout
  102. close(hb.downc)
  103. go hb.updateNotifyLoop()
  104. hb.wg.Add(1)
  105. go func() {
  106. defer hb.wg.Done()
  107. hb.updateUnhealthy()
  108. }()
  109. return hb
  110. }
  111. func (b *healthBalancer) Start(target string, config grpc.BalancerConfig) error { return nil }
  112. func (b *healthBalancer) ConnectNotify() <-chan struct{} {
  113. b.mu.Lock()
  114. defer b.mu.Unlock()
  115. return b.upc
  116. }
  117. func (b *healthBalancer) ready() <-chan struct{} { return b.readyc }
  118. func (b *healthBalancer) endpoint(hostPort string) string {
  119. b.mu.RLock()
  120. defer b.mu.RUnlock()
  121. return b.hostPort2ep[hostPort]
  122. }
  123. func (b *healthBalancer) pinned() string {
  124. b.mu.RLock()
  125. defer b.mu.RUnlock()
  126. return b.pinAddr
  127. }
  128. func (b *healthBalancer) hostPortError(hostPort string, err error) {
  129. if b.endpoint(hostPort) == "" {
  130. lg.Lvl(4).Infof("clientv3/balancer: %q is stale (skip marking as unhealthy on %q)", hostPort, err.Error())
  131. return
  132. }
  133. b.unhealthyMu.Lock()
  134. b.unhealthyHostPorts[hostPort] = time.Now()
  135. b.unhealthyMu.Unlock()
  136. lg.Lvl(4).Infof("clientv3/balancer: %q is marked unhealthy (%q)", hostPort, err.Error())
  137. }
  138. func (b *healthBalancer) removeUnhealthy(hostPort, msg string) {
  139. if b.endpoint(hostPort) == "" {
  140. lg.Lvl(4).Infof("clientv3/balancer: %q was not in unhealthy (%q)", hostPort, msg)
  141. return
  142. }
  143. b.unhealthyMu.Lock()
  144. delete(b.unhealthyHostPorts, hostPort)
  145. b.unhealthyMu.Unlock()
  146. lg.Lvl(4).Infof("clientv3/balancer: %q is removed from unhealthy (%q)", hostPort, msg)
  147. }
  148. func (b *healthBalancer) countUnhealthy() (count int) {
  149. b.unhealthyMu.RLock()
  150. count = len(b.unhealthyHostPorts)
  151. b.unhealthyMu.RUnlock()
  152. return count
  153. }
  154. func (b *healthBalancer) isUnhealthy(hostPort string) (unhealthy bool) {
  155. b.unhealthyMu.RLock()
  156. _, unhealthy = b.unhealthyHostPorts[hostPort]
  157. b.unhealthyMu.RUnlock()
  158. return unhealthy
  159. }
  160. func (b *healthBalancer) cleanupUnhealthy() {
  161. b.unhealthyMu.Lock()
  162. for k, v := range b.unhealthyHostPorts {
  163. if time.Since(v) > b.healthCheckTimeout {
  164. delete(b.unhealthyHostPorts, k)
  165. lg.Lvl(4).Infof("clientv3/balancer: removed %q from unhealthy after %v", k, b.healthCheckTimeout)
  166. }
  167. }
  168. b.unhealthyMu.Unlock()
  169. }
  170. func (b *healthBalancer) liveAddrs() ([]grpc.Address, map[string]struct{}) {
  171. unhealthyCnt := b.countUnhealthy()
  172. b.mu.RLock()
  173. defer b.mu.RUnlock()
  174. hbAddrs := b.addrs
  175. if len(b.addrs) == 1 || unhealthyCnt == 0 || unhealthyCnt == len(b.addrs) {
  176. liveHostPorts := make(map[string]struct{}, len(b.hostPort2ep))
  177. for k := range b.hostPort2ep {
  178. liveHostPorts[k] = struct{}{}
  179. }
  180. return hbAddrs, liveHostPorts
  181. }
  182. addrs := make([]grpc.Address, 0, len(b.addrs)-unhealthyCnt)
  183. liveHostPorts := make(map[string]struct{}, len(addrs))
  184. for _, addr := range b.addrs {
  185. if !b.isUnhealthy(addr.Addr) {
  186. addrs = append(addrs, addr)
  187. liveHostPorts[addr.Addr] = struct{}{}
  188. }
  189. }
  190. return addrs, liveHostPorts
  191. }
  192. func (b *healthBalancer) updateUnhealthy() {
  193. for {
  194. select {
  195. case <-time.After(b.healthCheckTimeout):
  196. b.cleanupUnhealthy()
  197. pinned := b.pinned()
  198. if pinned == "" || b.isUnhealthy(pinned) {
  199. select {
  200. case b.updateAddrsC <- notifyNext:
  201. case <-b.stopc:
  202. return
  203. }
  204. }
  205. case <-b.stopc:
  206. return
  207. }
  208. }
  209. }
  210. func (b *healthBalancer) updateAddrs(eps ...string) {
  211. np := getHostPort2ep(eps)
  212. b.mu.Lock()
  213. defer b.mu.Unlock()
  214. match := len(np) == len(b.hostPort2ep)
  215. if match {
  216. for k, v := range np {
  217. if b.hostPort2ep[k] != v {
  218. match = false
  219. break
  220. }
  221. }
  222. }
  223. if match {
  224. // same endpoints, so no need to update address
  225. return
  226. }
  227. b.hostPort2ep = np
  228. b.addrs, b.eps = eps2addrs(eps), eps
  229. b.unhealthyMu.Lock()
  230. b.unhealthyHostPorts = make(map[string]time.Time)
  231. b.unhealthyMu.Unlock()
  232. }
  233. func (b *healthBalancer) next() {
  234. b.mu.RLock()
  235. downc := b.downc
  236. b.mu.RUnlock()
  237. select {
  238. case b.updateAddrsC <- notifyNext:
  239. case <-b.stopc:
  240. }
  241. // wait until disconnect so new RPCs are not issued on old connection
  242. select {
  243. case <-downc:
  244. case <-b.stopc:
  245. }
  246. }
  247. func (b *healthBalancer) updateNotifyLoop() {
  248. defer close(b.donec)
  249. for {
  250. b.mu.RLock()
  251. upc, downc, addr := b.upc, b.downc, b.pinAddr
  252. b.mu.RUnlock()
  253. // downc or upc should be closed
  254. select {
  255. case <-downc:
  256. downc = nil
  257. default:
  258. }
  259. select {
  260. case <-upc:
  261. upc = nil
  262. default:
  263. }
  264. switch {
  265. case downc == nil && upc == nil:
  266. // stale
  267. select {
  268. case <-b.stopc:
  269. return
  270. default:
  271. }
  272. case downc == nil:
  273. b.notifyAddrs(notifyReset)
  274. select {
  275. case <-upc:
  276. case msg := <-b.updateAddrsC:
  277. b.notifyAddrs(msg)
  278. case <-b.stopc:
  279. return
  280. }
  281. case upc == nil:
  282. select {
  283. // close connections that are not the pinned address
  284. case b.notifyCh <- []grpc.Address{{Addr: addr}}:
  285. case <-downc:
  286. case <-b.stopc:
  287. return
  288. }
  289. select {
  290. case <-downc:
  291. b.notifyAddrs(notifyReset)
  292. case msg := <-b.updateAddrsC:
  293. b.notifyAddrs(msg)
  294. case <-b.stopc:
  295. return
  296. }
  297. }
  298. }
  299. }
  300. func (b *healthBalancer) notifyAddrs(msg notifyMsg) {
  301. if msg == notifyNext {
  302. select {
  303. case b.notifyCh <- []grpc.Address{}:
  304. case <-b.stopc:
  305. return
  306. }
  307. }
  308. b.mu.RLock()
  309. pinAddr := b.pinAddr
  310. downc := b.downc
  311. b.mu.RUnlock()
  312. addrs, hostPorts := b.liveAddrs()
  313. var waitDown bool
  314. if pinAddr != "" {
  315. _, ok := hostPorts[pinAddr]
  316. waitDown = !ok
  317. }
  318. select {
  319. case b.notifyCh <- addrs:
  320. if waitDown {
  321. select {
  322. case <-downc:
  323. case <-b.stopc:
  324. }
  325. }
  326. case <-b.stopc:
  327. }
  328. }
  329. func (b *healthBalancer) Up(addr grpc.Address) func(error) {
  330. if !b.mayPin(addr) {
  331. return func(err error) {}
  332. }
  333. b.mu.Lock()
  334. defer b.mu.Unlock()
  335. // gRPC might call Up after it called Close. We add this check
  336. // to "fix" it up at application layer. Otherwise, will panic
  337. // if b.upc is already closed.
  338. if b.closed {
  339. return func(err error) {}
  340. }
  341. // gRPC might call Up on a stale address.
  342. // Prevent updating pinAddr with a stale address.
  343. if !hasAddr(b.addrs, addr.Addr) {
  344. return func(err error) {}
  345. }
  346. if b.pinAddr != "" {
  347. lg.Lvl(4).Infof("clientv3/balancer: %q is up but not pinned (already pinned %q)", addr.Addr, b.pinAddr)
  348. return func(err error) {}
  349. }
  350. // notify waiting Get()s and pin first connected address
  351. close(b.upc)
  352. b.downc = make(chan struct{})
  353. b.pinAddr = addr.Addr
  354. lg.Lvl(4).Infof("clientv3/balancer: pin %q", addr.Addr)
  355. // notify client that a connection is up
  356. b.readyOnce.Do(func() { close(b.readyc) })
  357. return func(err error) {
  358. // If connected to a black hole endpoint or a killed server, the gRPC ping
  359. // timeout will induce a network I/O error, and retrying until success;
  360. // finding healthy endpoint on retry could take several timeouts and redials.
  361. // To avoid wasting retries, gray-list unhealthy endpoints.
  362. b.hostPortError(addr.Addr, err)
  363. b.mu.Lock()
  364. b.upc = make(chan struct{})
  365. close(b.downc)
  366. b.pinAddr = ""
  367. b.mu.Unlock()
  368. lg.Lvl(4).Infof("clientv3/balancer: unpin %q (%q)", addr.Addr, err.Error())
  369. }
  370. }
  371. func (b *healthBalancer) mayPin(addr grpc.Address) bool {
  372. if b.endpoint(addr.Addr) == "" { // stale host:port
  373. return false
  374. }
  375. b.unhealthyMu.RLock()
  376. unhealthyCnt := len(b.unhealthyHostPorts)
  377. failedTime, bad := b.unhealthyHostPorts[addr.Addr]
  378. b.unhealthyMu.RUnlock()
  379. b.mu.RLock()
  380. skip := len(b.addrs) == 1 || unhealthyCnt == 0 || len(b.addrs) == unhealthyCnt
  381. b.mu.RUnlock()
  382. if skip || !bad {
  383. return true
  384. }
  385. // prevent isolated member's endpoint from being infinitely retried, as follows:
  386. // 1. keepalive pings detects GoAway with http2.ErrCodeEnhanceYourCalm
  387. // 2. balancer 'Up' unpins with grpc: failed with network I/O error
  388. // 3. grpc-healthcheck still SERVING, thus retry to pin
  389. // instead, return before grpc-healthcheck if failed within healthcheck timeout
  390. if elapsed := time.Since(failedTime); elapsed < b.healthCheckTimeout {
  391. lg.Lvl(4).Infof("clientv3/balancer: %q is up but not pinned (failed %v ago, require minimum %v after failure)", addr.Addr, elapsed, b.healthCheckTimeout)
  392. return false
  393. }
  394. if ok, _ := b.healthCheck(addr.Addr); ok {
  395. b.removeUnhealthy(addr.Addr, "health check success")
  396. return true
  397. }
  398. b.hostPortError(addr.Addr, errors.New("health check failed"))
  399. return false
  400. }
  401. func (b *healthBalancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (grpc.Address, func(), error) {
  402. var (
  403. addr string
  404. closed bool
  405. )
  406. // If opts.BlockingWait is false (for fail-fast RPCs), it should return
  407. // an address it has notified via Notify immediately instead of blocking.
  408. if !opts.BlockingWait {
  409. b.mu.RLock()
  410. closed = b.closed
  411. addr = b.pinAddr
  412. b.mu.RUnlock()
  413. if closed {
  414. return grpc.Address{Addr: ""}, nil, grpc.ErrClientConnClosing
  415. }
  416. if addr == "" {
  417. return grpc.Address{Addr: ""}, nil, ErrNoAddrAvilable
  418. }
  419. return grpc.Address{Addr: addr}, func() {}, nil
  420. }
  421. for {
  422. b.mu.RLock()
  423. ch := b.upc
  424. b.mu.RUnlock()
  425. select {
  426. case <-ch:
  427. case <-b.donec:
  428. return grpc.Address{Addr: ""}, nil, grpc.ErrClientConnClosing
  429. case <-ctx.Done():
  430. return grpc.Address{Addr: ""}, nil, ctx.Err()
  431. }
  432. b.mu.RLock()
  433. closed = b.closed
  434. addr = b.pinAddr
  435. b.mu.RUnlock()
  436. // Close() which sets b.closed = true can be called before Get(), Get() must exit if balancer is closed.
  437. if closed {
  438. return grpc.Address{Addr: ""}, nil, grpc.ErrClientConnClosing
  439. }
  440. if addr != "" {
  441. break
  442. }
  443. }
  444. return grpc.Address{Addr: addr}, func() {}, nil
  445. }
  446. func (b *healthBalancer) Notify() <-chan []grpc.Address { return b.notifyCh }
  447. func (b *healthBalancer) Close() error {
  448. b.mu.Lock()
  449. // In case gRPC calls close twice. TODO: remove the checking
  450. // when we are sure that gRPC wont call close twice.
  451. if b.closed {
  452. b.mu.Unlock()
  453. <-b.donec
  454. return nil
  455. }
  456. b.closed = true
  457. b.stopOnce.Do(func() { close(b.stopc) })
  458. b.pinAddr = ""
  459. // In the case of following scenario:
  460. // 1. upc is not closed; no pinned address
  461. // 2. client issues an RPC, calling invoke(), which calls Get(), enters for loop, blocks
  462. // 3. client.conn.Close() calls balancer.Close(); closed = true
  463. // 4. for loop in Get() never exits since ctx is the context passed in by the client and may not be canceled
  464. // we must close upc so Get() exits from blocking on upc
  465. select {
  466. case <-b.upc:
  467. default:
  468. // terminate all waiting Get()s
  469. close(b.upc)
  470. }
  471. b.mu.Unlock()
  472. b.wg.Wait()
  473. // wait for updateNotifyLoop to finish
  474. <-b.donec
  475. close(b.notifyCh)
  476. return nil
  477. }
  478. func grpcHealthCheck(client *Client, ep string) (bool, error) {
  479. conn, err := client.dial(ep)
  480. if err != nil {
  481. return false, err
  482. }
  483. defer conn.Close()
  484. cli := healthpb.NewHealthClient(conn)
  485. ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  486. resp, err := cli.Check(ctx, &healthpb.HealthCheckRequest{})
  487. cancel()
  488. if err != nil {
  489. if s, ok := status.FromError(err); ok && s.Code() == codes.Unavailable {
  490. if s.Message() == unknownService { // etcd < v3.3.0
  491. return true, nil
  492. }
  493. }
  494. return false, err
  495. }
  496. return resp.Status == healthpb.HealthCheckResponse_SERVING, nil
  497. }
  498. func hasAddr(addrs []grpc.Address, targetAddr string) bool {
  499. for _, addr := range addrs {
  500. if targetAddr == addr.Addr {
  501. return true
  502. }
  503. }
  504. return false
  505. }
  506. func getHost(ep string) string {
  507. url, uerr := url.Parse(ep)
  508. if uerr != nil || !strings.Contains(ep, "://") {
  509. return ep
  510. }
  511. return url.Host
  512. }
  513. func eps2addrs(eps []string) []grpc.Address {
  514. addrs := make([]grpc.Address, len(eps))
  515. for i := range eps {
  516. addrs[i].Addr = getHost(eps[i])
  517. }
  518. return addrs
  519. }
  520. func getHostPort2ep(eps []string) map[string]string {
  521. hm := make(map[string]string, len(eps))
  522. for i := range eps {
  523. _, host, _ := parseEndpoint(eps[i])
  524. hm[host] = eps[i]
  525. }
  526. return hm
  527. }