health_balancer.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627
  1. // Copyright 2017 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package clientv3
  15. import (
  16. "errors"
  17. "net/url"
  18. "strings"
  19. "sync"
  20. "time"
  21. "golang.org/x/net/context"
  22. "google.golang.org/grpc"
  23. "google.golang.org/grpc/codes"
  24. healthpb "google.golang.org/grpc/health/grpc_health_v1"
  25. "google.golang.org/grpc/status"
  26. )
  27. const (
  28. minHealthRetryDuration = 3 * time.Second
  29. unknownService = "unknown service grpc.health.v1.Health"
  30. )
  31. // ErrNoAddrAvilable is returned by Get() when the balancer does not have
  32. // any active connection to endpoints at the time.
  33. // This error is returned only when opts.BlockingWait is true.
  34. var ErrNoAddrAvilable = status.Error(codes.Unavailable, "there is no address available")
  35. type healthCheckFunc func(ep string) (bool, error)
  36. type notifyMsg int
  37. const (
  38. notifyReset notifyMsg = iota
  39. notifyNext
  40. )
  41. // healthBalancer does the bare minimum to expose multiple eps
  42. // to the grpc reconnection code path
  43. type healthBalancer struct {
  44. // addrs are the client's endpoint addresses for grpc
  45. addrs []grpc.Address
  46. // eps holds the raw endpoints from the client
  47. eps []string
  48. // notifyCh notifies grpc of the set of addresses for connecting
  49. notifyCh chan []grpc.Address
  50. // readyc closes once the first connection is up
  51. readyc chan struct{}
  52. readyOnce sync.Once
  53. // healthCheck checks an endpoint's health.
  54. healthCheck healthCheckFunc
  55. healthCheckTimeout time.Duration
  56. unhealthyMu sync.RWMutex
  57. unhealthyHostPorts map[string]time.Time
  58. // mu protects all fields below.
  59. mu sync.RWMutex
  60. // upc closes when pinAddr transitions from empty to non-empty or the balancer closes.
  61. upc chan struct{}
  62. // downc closes when grpc calls down() on pinAddr
  63. downc chan struct{}
  64. // stopc is closed to signal updateNotifyLoop should stop.
  65. stopc chan struct{}
  66. stopOnce sync.Once
  67. wg sync.WaitGroup
  68. // donec closes when all goroutines are exited
  69. donec chan struct{}
  70. // updateAddrsC notifies updateNotifyLoop to update addrs.
  71. updateAddrsC chan notifyMsg
  72. // grpc issues TLS cert checks using the string passed into dial so
  73. // that string must be the host. To recover the full scheme://host URL,
  74. // have a map from hosts to the original endpoint.
  75. hostPort2ep map[string]string
  76. // pinAddr is the currently pinned address; set to the empty string on
  77. // initialization and shutdown.
  78. pinAddr string
  79. closed bool
  80. }
  81. func newHealthBalancer(eps []string, timeout time.Duration, hc healthCheckFunc) *healthBalancer {
  82. notifyCh := make(chan []grpc.Address)
  83. addrs := eps2addrs(eps)
  84. hb := &healthBalancer{
  85. addrs: addrs,
  86. eps: eps,
  87. notifyCh: notifyCh,
  88. readyc: make(chan struct{}),
  89. healthCheck: hc,
  90. unhealthyHostPorts: make(map[string]time.Time),
  91. upc: make(chan struct{}),
  92. stopc: make(chan struct{}),
  93. downc: make(chan struct{}),
  94. donec: make(chan struct{}),
  95. updateAddrsC: make(chan notifyMsg),
  96. hostPort2ep: getHostPort2ep(eps),
  97. }
  98. if timeout < minHealthRetryDuration {
  99. timeout = minHealthRetryDuration
  100. }
  101. hb.healthCheckTimeout = timeout
  102. close(hb.downc)
  103. go hb.updateNotifyLoop()
  104. hb.wg.Add(1)
  105. go func() {
  106. defer hb.wg.Done()
  107. hb.updateUnhealthy()
  108. }()
  109. return hb
  110. }
  111. func (b *healthBalancer) Start(target string, config grpc.BalancerConfig) error { return nil }
  112. func (b *healthBalancer) ConnectNotify() <-chan struct{} {
  113. b.mu.Lock()
  114. defer b.mu.Unlock()
  115. return b.upc
  116. }
  117. func (b *healthBalancer) ready() <-chan struct{} { return b.readyc }
  118. func (b *healthBalancer) endpoint(hostPort string) string {
  119. b.mu.RLock()
  120. defer b.mu.RUnlock()
  121. return b.hostPort2ep[hostPort]
  122. }
  123. func (b *healthBalancer) pinned() string {
  124. b.mu.RLock()
  125. defer b.mu.RUnlock()
  126. return b.pinAddr
  127. }
  128. func (b *healthBalancer) hostPortError(hostPort string, err error) {
  129. if b.endpoint(hostPort) == "" {
  130. if logger.V(4) {
  131. logger.Infof("clientv3/balancer: %q is stale (skip marking as unhealthy on %q)", hostPort, err.Error())
  132. }
  133. return
  134. }
  135. b.unhealthyMu.Lock()
  136. b.unhealthyHostPorts[hostPort] = time.Now()
  137. b.unhealthyMu.Unlock()
  138. if logger.V(4) {
  139. logger.Infof("clientv3/balancer: %q is marked unhealthy (%q)", hostPort, err.Error())
  140. }
  141. }
  142. func (b *healthBalancer) removeUnhealthy(hostPort, msg string) {
  143. if b.endpoint(hostPort) == "" {
  144. if logger.V(4) {
  145. logger.Infof("clientv3/balancer: %q was not in unhealthy (%q)", hostPort, msg)
  146. }
  147. return
  148. }
  149. b.unhealthyMu.Lock()
  150. delete(b.unhealthyHostPorts, hostPort)
  151. b.unhealthyMu.Unlock()
  152. if logger.V(4) {
  153. logger.Infof("clientv3/balancer: %q is removed from unhealthy (%q)", hostPort, msg)
  154. }
  155. }
  156. func (b *healthBalancer) countUnhealthy() (count int) {
  157. b.unhealthyMu.RLock()
  158. count = len(b.unhealthyHostPorts)
  159. b.unhealthyMu.RUnlock()
  160. return count
  161. }
  162. func (b *healthBalancer) isUnhealthy(hostPort string) (unhealthy bool) {
  163. b.unhealthyMu.RLock()
  164. _, unhealthy = b.unhealthyHostPorts[hostPort]
  165. b.unhealthyMu.RUnlock()
  166. return unhealthy
  167. }
  168. func (b *healthBalancer) cleanupUnhealthy() {
  169. b.unhealthyMu.Lock()
  170. for k, v := range b.unhealthyHostPorts {
  171. if time.Since(v) > b.healthCheckTimeout {
  172. delete(b.unhealthyHostPorts, k)
  173. if logger.V(4) {
  174. logger.Infof("clientv3/balancer: removed %q from unhealthy after %v", k, b.healthCheckTimeout)
  175. }
  176. }
  177. }
  178. b.unhealthyMu.Unlock()
  179. }
  180. func (b *healthBalancer) liveAddrs() ([]grpc.Address, map[string]struct{}) {
  181. unhealthyCnt := b.countUnhealthy()
  182. b.mu.RLock()
  183. defer b.mu.RUnlock()
  184. hbAddrs := b.addrs
  185. if len(b.addrs) == 1 || unhealthyCnt == 0 || unhealthyCnt == len(b.addrs) {
  186. liveHostPorts := make(map[string]struct{}, len(b.hostPort2ep))
  187. for k := range b.hostPort2ep {
  188. liveHostPorts[k] = struct{}{}
  189. }
  190. return hbAddrs, liveHostPorts
  191. }
  192. addrs := make([]grpc.Address, 0, len(b.addrs)-unhealthyCnt)
  193. liveHostPorts := make(map[string]struct{}, len(addrs))
  194. for _, addr := range b.addrs {
  195. if !b.isUnhealthy(addr.Addr) {
  196. addrs = append(addrs, addr)
  197. liveHostPorts[addr.Addr] = struct{}{}
  198. }
  199. }
  200. return addrs, liveHostPorts
  201. }
  202. func (b *healthBalancer) updateUnhealthy() {
  203. for {
  204. select {
  205. case <-time.After(b.healthCheckTimeout):
  206. b.cleanupUnhealthy()
  207. pinned := b.pinned()
  208. if pinned == "" || b.isUnhealthy(pinned) {
  209. select {
  210. case b.updateAddrsC <- notifyNext:
  211. case <-b.stopc:
  212. return
  213. }
  214. }
  215. case <-b.stopc:
  216. return
  217. }
  218. }
  219. }
  220. func (b *healthBalancer) updateAddrs(eps ...string) {
  221. np := getHostPort2ep(eps)
  222. b.mu.Lock()
  223. defer b.mu.Unlock()
  224. match := len(np) == len(b.hostPort2ep)
  225. if match {
  226. for k, v := range np {
  227. if b.hostPort2ep[k] != v {
  228. match = false
  229. break
  230. }
  231. }
  232. }
  233. if match {
  234. // same endpoints, so no need to update address
  235. return
  236. }
  237. b.hostPort2ep = np
  238. b.addrs, b.eps = eps2addrs(eps), eps
  239. b.unhealthyMu.Lock()
  240. b.unhealthyHostPorts = make(map[string]time.Time)
  241. b.unhealthyMu.Unlock()
  242. }
  243. func (b *healthBalancer) next() {
  244. b.mu.RLock()
  245. downc := b.downc
  246. b.mu.RUnlock()
  247. select {
  248. case b.updateAddrsC <- notifyNext:
  249. case <-b.stopc:
  250. }
  251. // wait until disconnect so new RPCs are not issued on old connection
  252. select {
  253. case <-downc:
  254. case <-b.stopc:
  255. }
  256. }
  257. func (b *healthBalancer) updateNotifyLoop() {
  258. defer close(b.donec)
  259. for {
  260. b.mu.RLock()
  261. upc, downc, addr := b.upc, b.downc, b.pinAddr
  262. b.mu.RUnlock()
  263. // downc or upc should be closed
  264. select {
  265. case <-downc:
  266. downc = nil
  267. default:
  268. }
  269. select {
  270. case <-upc:
  271. upc = nil
  272. default:
  273. }
  274. switch {
  275. case downc == nil && upc == nil:
  276. // stale
  277. select {
  278. case <-b.stopc:
  279. return
  280. default:
  281. }
  282. case downc == nil:
  283. b.notifyAddrs(notifyReset)
  284. select {
  285. case <-upc:
  286. case msg := <-b.updateAddrsC:
  287. b.notifyAddrs(msg)
  288. case <-b.stopc:
  289. return
  290. }
  291. case upc == nil:
  292. select {
  293. // close connections that are not the pinned address
  294. case b.notifyCh <- []grpc.Address{{Addr: addr}}:
  295. case <-downc:
  296. case <-b.stopc:
  297. return
  298. }
  299. select {
  300. case <-downc:
  301. b.notifyAddrs(notifyReset)
  302. case msg := <-b.updateAddrsC:
  303. b.notifyAddrs(msg)
  304. case <-b.stopc:
  305. return
  306. }
  307. }
  308. }
  309. }
  310. func (b *healthBalancer) notifyAddrs(msg notifyMsg) {
  311. if msg == notifyNext {
  312. select {
  313. case b.notifyCh <- []grpc.Address{}:
  314. case <-b.stopc:
  315. return
  316. }
  317. }
  318. b.mu.RLock()
  319. pinAddr := b.pinAddr
  320. downc := b.downc
  321. b.mu.RUnlock()
  322. addrs, hostPorts := b.liveAddrs()
  323. var waitDown bool
  324. if pinAddr != "" {
  325. _, ok := hostPorts[pinAddr]
  326. waitDown = !ok
  327. }
  328. select {
  329. case b.notifyCh <- addrs:
  330. if waitDown {
  331. select {
  332. case <-downc:
  333. case <-b.stopc:
  334. }
  335. }
  336. case <-b.stopc:
  337. }
  338. }
  339. func (b *healthBalancer) Up(addr grpc.Address) func(error) {
  340. if !b.mayPin(addr) {
  341. return func(err error) {}
  342. }
  343. b.mu.Lock()
  344. defer b.mu.Unlock()
  345. // gRPC might call Up after it called Close. We add this check
  346. // to "fix" it up at application layer. Otherwise, will panic
  347. // if b.upc is already closed.
  348. if b.closed {
  349. return func(err error) {}
  350. }
  351. // gRPC might call Up on a stale address.
  352. // Prevent updating pinAddr with a stale address.
  353. if !hasAddr(b.addrs, addr.Addr) {
  354. return func(err error) {}
  355. }
  356. if b.pinAddr != "" {
  357. if logger.V(4) {
  358. logger.Infof("clientv3/balancer: %q is up but not pinned (already pinned %q)", addr.Addr, b.pinAddr)
  359. }
  360. return func(err error) {}
  361. }
  362. // notify waiting Get()s and pin first connected address
  363. close(b.upc)
  364. b.downc = make(chan struct{})
  365. b.pinAddr = addr.Addr
  366. if logger.V(4) {
  367. logger.Infof("clientv3/balancer: pin %q", addr.Addr)
  368. }
  369. // notify client that a connection is up
  370. b.readyOnce.Do(func() { close(b.readyc) })
  371. return func(err error) {
  372. // If connected to a black hole endpoint or a killed server, the gRPC ping
  373. // timeout will induce a network I/O error, and retrying until success;
  374. // finding healthy endpoint on retry could take several timeouts and redials.
  375. // To avoid wasting retries, gray-list unhealthy endpoints.
  376. b.hostPortError(addr.Addr, err)
  377. b.mu.Lock()
  378. b.upc = make(chan struct{})
  379. close(b.downc)
  380. b.pinAddr = ""
  381. b.mu.Unlock()
  382. if logger.V(4) {
  383. logger.Infof("clientv3/balancer: unpin %q (%q)", addr.Addr, err.Error())
  384. }
  385. }
  386. }
  387. func (b *healthBalancer) mayPin(addr grpc.Address) bool {
  388. if b.endpoint(addr.Addr) == "" { // stale host:port
  389. return false
  390. }
  391. b.unhealthyMu.RLock()
  392. unhealthyCnt := len(b.unhealthyHostPorts)
  393. failedTime, bad := b.unhealthyHostPorts[addr.Addr]
  394. b.unhealthyMu.RUnlock()
  395. b.mu.RLock()
  396. skip := len(b.addrs) == 1 || unhealthyCnt == 0 || len(b.addrs) == unhealthyCnt
  397. b.mu.RUnlock()
  398. if skip || !bad {
  399. return true
  400. }
  401. // prevent isolated member's endpoint from being infinitely retried, as follows:
  402. // 1. keepalive pings detects GoAway with http2.ErrCodeEnhanceYourCalm
  403. // 2. balancer 'Up' unpins with grpc: failed with network I/O error
  404. // 3. grpc-healthcheck still SERVING, thus retry to pin
  405. // instead, return before grpc-healthcheck if failed within healthcheck timeout
  406. if elapsed := time.Since(failedTime); elapsed < b.healthCheckTimeout {
  407. if logger.V(4) {
  408. logger.Infof("clientv3/balancer: %q is up but not pinned (failed %v ago, require minimum %v after failure)", addr.Addr, elapsed, b.healthCheckTimeout)
  409. }
  410. return false
  411. }
  412. if ok, _ := b.healthCheck(addr.Addr); ok {
  413. b.removeUnhealthy(addr.Addr, "health check success")
  414. return true
  415. }
  416. b.hostPortError(addr.Addr, errors.New("health check failed"))
  417. return false
  418. }
  419. func (b *healthBalancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (grpc.Address, func(), error) {
  420. var (
  421. addr string
  422. closed bool
  423. )
  424. // If opts.BlockingWait is false (for fail-fast RPCs), it should return
  425. // an address it has notified via Notify immediately instead of blocking.
  426. if !opts.BlockingWait {
  427. b.mu.RLock()
  428. closed = b.closed
  429. addr = b.pinAddr
  430. b.mu.RUnlock()
  431. if closed {
  432. return grpc.Address{Addr: ""}, nil, grpc.ErrClientConnClosing
  433. }
  434. if addr == "" {
  435. return grpc.Address{Addr: ""}, nil, ErrNoAddrAvilable
  436. }
  437. return grpc.Address{Addr: addr}, func() {}, nil
  438. }
  439. for {
  440. b.mu.RLock()
  441. ch := b.upc
  442. b.mu.RUnlock()
  443. select {
  444. case <-ch:
  445. case <-b.donec:
  446. return grpc.Address{Addr: ""}, nil, grpc.ErrClientConnClosing
  447. case <-ctx.Done():
  448. return grpc.Address{Addr: ""}, nil, ctx.Err()
  449. }
  450. b.mu.RLock()
  451. closed = b.closed
  452. addr = b.pinAddr
  453. b.mu.RUnlock()
  454. // Close() which sets b.closed = true can be called before Get(), Get() must exit if balancer is closed.
  455. if closed {
  456. return grpc.Address{Addr: ""}, nil, grpc.ErrClientConnClosing
  457. }
  458. if addr != "" {
  459. break
  460. }
  461. }
  462. return grpc.Address{Addr: addr}, func() {}, nil
  463. }
  464. func (b *healthBalancer) Notify() <-chan []grpc.Address { return b.notifyCh }
  465. func (b *healthBalancer) Close() error {
  466. b.mu.Lock()
  467. // In case gRPC calls close twice. TODO: remove the checking
  468. // when we are sure that gRPC wont call close twice.
  469. if b.closed {
  470. b.mu.Unlock()
  471. <-b.donec
  472. return nil
  473. }
  474. b.closed = true
  475. b.stopOnce.Do(func() { close(b.stopc) })
  476. b.pinAddr = ""
  477. // In the case of following scenario:
  478. // 1. upc is not closed; no pinned address
  479. // 2. client issues an RPC, calling invoke(), which calls Get(), enters for loop, blocks
  480. // 3. client.conn.Close() calls balancer.Close(); closed = true
  481. // 4. for loop in Get() never exits since ctx is the context passed in by the client and may not be canceled
  482. // we must close upc so Get() exits from blocking on upc
  483. select {
  484. case <-b.upc:
  485. default:
  486. // terminate all waiting Get()s
  487. close(b.upc)
  488. }
  489. b.mu.Unlock()
  490. b.wg.Wait()
  491. // wait for updateNotifyLoop to finish
  492. <-b.donec
  493. close(b.notifyCh)
  494. return nil
  495. }
  496. func grpcHealthCheck(client *Client, ep string) (bool, error) {
  497. conn, err := client.dial(ep)
  498. if err != nil {
  499. return false, err
  500. }
  501. defer conn.Close()
  502. cli := healthpb.NewHealthClient(conn)
  503. ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  504. resp, err := cli.Check(ctx, &healthpb.HealthCheckRequest{})
  505. cancel()
  506. if err != nil {
  507. if s, ok := status.FromError(err); ok && s.Code() == codes.Unavailable {
  508. if s.Message() == unknownService { // etcd < v3.3.0
  509. return true, nil
  510. }
  511. }
  512. return false, err
  513. }
  514. return resp.Status == healthpb.HealthCheckResponse_SERVING, nil
  515. }
  516. func hasAddr(addrs []grpc.Address, targetAddr string) bool {
  517. for _, addr := range addrs {
  518. if targetAddr == addr.Addr {
  519. return true
  520. }
  521. }
  522. return false
  523. }
  524. func getHost(ep string) string {
  525. url, uerr := url.Parse(ep)
  526. if uerr != nil || !strings.Contains(ep, "://") {
  527. return ep
  528. }
  529. return url.Host
  530. }
  531. func eps2addrs(eps []string) []grpc.Address {
  532. addrs := make([]grpc.Address, len(eps))
  533. for i := range eps {
  534. addrs[i].Addr = getHost(eps[i])
  535. }
  536. return addrs
  537. }
  538. func getHostPort2ep(eps []string) map[string]string {
  539. hm := make(map[string]string, len(eps))
  540. for i := range eps {
  541. _, host, _ := parseEndpoint(eps[i])
  542. hm[host] = eps[i]
  543. }
  544. return hm
  545. }