grpc1.7-health.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657
  1. // Copyright 2018 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package balancer
  15. import (
  16. "context"
  17. "errors"
  18. "io/ioutil"
  19. "net/url"
  20. "strings"
  21. "sync"
  22. "time"
  23. "google.golang.org/grpc"
  24. "google.golang.org/grpc/codes"
  25. "google.golang.org/grpc/grpclog"
  26. healthpb "google.golang.org/grpc/health/grpc_health_v1"
  27. "google.golang.org/grpc/status"
  28. )
  29. // TODO: replace with something better
  30. var lg = grpclog.NewLoggerV2(ioutil.Discard, ioutil.Discard, ioutil.Discard)
  31. const (
  32. minHealthRetryDuration = 3 * time.Second
  33. unknownService = "unknown service grpc.health.v1.Health"
  34. )
  35. // ErrNoAddrAvilable is returned by Get() when the balancer does not have
  36. // any active connection to endpoints at the time.
  37. // This error is returned only when opts.BlockingWait is true.
  38. var ErrNoAddrAvilable = status.Error(codes.Unavailable, "there is no address available")
  39. type NotifyMsg int
  40. const (
  41. NotifyReset NotifyMsg = iota
  42. NotifyNext
  43. )
  44. // GRPC17Health does the bare minimum to expose multiple eps
  45. // to the grpc reconnection code path
  46. type GRPC17Health struct {
  47. // addrs are the client's endpoint addresses for grpc
  48. addrs []grpc.Address
  49. // eps holds the raw endpoints from the client
  50. eps []string
  51. // notifyCh notifies grpc of the set of addresses for connecting
  52. notifyCh chan []grpc.Address
  53. // readyc closes once the first connection is up
  54. readyc chan struct{}
  55. readyOnce sync.Once
  56. // healthCheck checks an endpoint's health.
  57. healthCheck func(ep string) (bool, error)
  58. healthCheckTimeout time.Duration
  59. unhealthyMu sync.RWMutex
  60. unhealthyHostPorts map[string]time.Time
  61. // mu protects all fields below.
  62. mu sync.RWMutex
  63. // upc closes when pinAddr transitions from empty to non-empty or the balancer closes.
  64. upc chan struct{}
  65. // downc closes when grpc calls down() on pinAddr
  66. downc chan struct{}
  67. // stopc is closed to signal updateNotifyLoop should stop.
  68. stopc chan struct{}
  69. stopOnce sync.Once
  70. wg sync.WaitGroup
  71. // donec closes when all goroutines are exited
  72. donec chan struct{}
  73. // updateAddrsC notifies updateNotifyLoop to update addrs.
  74. updateAddrsC chan NotifyMsg
  75. // grpc issues TLS cert checks using the string passed into dial so
  76. // that string must be the host. To recover the full scheme://host URL,
  77. // have a map from hosts to the original endpoint.
  78. hostPort2ep map[string]string
  79. // pinAddr is the currently pinned address; set to the empty string on
  80. // initialization and shutdown.
  81. pinAddr string
  82. closed bool
  83. }
  84. // DialFunc defines gRPC dial function.
  85. type DialFunc func(ep string, dopts ...grpc.DialOption) (*grpc.ClientConn, error)
  86. // NewGRPC17Health returns a new health balancer with gRPC v1.7.
  87. func NewGRPC17Health(
  88. eps []string,
  89. timeout time.Duration,
  90. dialFunc DialFunc,
  91. ) *GRPC17Health {
  92. notifyCh := make(chan []grpc.Address)
  93. addrs := eps2addrs(eps)
  94. hb := &GRPC17Health{
  95. addrs: addrs,
  96. eps: eps,
  97. notifyCh: notifyCh,
  98. readyc: make(chan struct{}),
  99. healthCheck: func(ep string) (bool, error) { return grpcHealthCheck(ep, dialFunc) },
  100. unhealthyHostPorts: make(map[string]time.Time),
  101. upc: make(chan struct{}),
  102. stopc: make(chan struct{}),
  103. downc: make(chan struct{}),
  104. donec: make(chan struct{}),
  105. updateAddrsC: make(chan NotifyMsg),
  106. hostPort2ep: getHostPort2ep(eps),
  107. }
  108. if timeout < minHealthRetryDuration {
  109. timeout = minHealthRetryDuration
  110. }
  111. hb.healthCheckTimeout = timeout
  112. close(hb.downc)
  113. go hb.updateNotifyLoop()
  114. hb.wg.Add(1)
  115. go func() {
  116. defer hb.wg.Done()
  117. hb.updateUnhealthy()
  118. }()
  119. return hb
  120. }
  121. func (b *GRPC17Health) Start(target string, config grpc.BalancerConfig) error { return nil }
  122. func (b *GRPC17Health) ConnectNotify() <-chan struct{} {
  123. b.mu.Lock()
  124. defer b.mu.Unlock()
  125. return b.upc
  126. }
  127. func (b *GRPC17Health) UpdateAddrsC() chan NotifyMsg { return b.updateAddrsC }
  128. func (b *GRPC17Health) StopC() chan struct{} { return b.stopc }
  129. func (b *GRPC17Health) Ready() <-chan struct{} { return b.readyc }
  130. func (b *GRPC17Health) Endpoint(hostPort string) string {
  131. b.mu.RLock()
  132. defer b.mu.RUnlock()
  133. return b.hostPort2ep[hostPort]
  134. }
  135. func (b *GRPC17Health) Pinned() string {
  136. b.mu.RLock()
  137. defer b.mu.RUnlock()
  138. return b.pinAddr
  139. }
  140. func (b *GRPC17Health) HostPortError(hostPort string, err error) {
  141. if b.Endpoint(hostPort) == "" {
  142. lg.Infof("clientv3/balancer: %q is stale (skip marking as unhealthy on %q)", hostPort, err.Error())
  143. return
  144. }
  145. b.unhealthyMu.Lock()
  146. b.unhealthyHostPorts[hostPort] = time.Now()
  147. b.unhealthyMu.Unlock()
  148. lg.Infof("clientv3/balancer: %q is marked unhealthy (%q)", hostPort, err.Error())
  149. }
  150. func (b *GRPC17Health) removeUnhealthy(hostPort, msg string) {
  151. if b.Endpoint(hostPort) == "" {
  152. lg.Infof("clientv3/balancer: %q was not in unhealthy (%q)", hostPort, msg)
  153. return
  154. }
  155. b.unhealthyMu.Lock()
  156. delete(b.unhealthyHostPorts, hostPort)
  157. b.unhealthyMu.Unlock()
  158. lg.Infof("clientv3/balancer: %q is removed from unhealthy (%q)", hostPort, msg)
  159. }
  160. func (b *GRPC17Health) countUnhealthy() (count int) {
  161. b.unhealthyMu.RLock()
  162. count = len(b.unhealthyHostPorts)
  163. b.unhealthyMu.RUnlock()
  164. return count
  165. }
  166. func (b *GRPC17Health) isUnhealthy(hostPort string) (unhealthy bool) {
  167. b.unhealthyMu.RLock()
  168. _, unhealthy = b.unhealthyHostPorts[hostPort]
  169. b.unhealthyMu.RUnlock()
  170. return unhealthy
  171. }
  172. func (b *GRPC17Health) cleanupUnhealthy() {
  173. b.unhealthyMu.Lock()
  174. for k, v := range b.unhealthyHostPorts {
  175. if time.Since(v) > b.healthCheckTimeout {
  176. delete(b.unhealthyHostPorts, k)
  177. lg.Infof("clientv3/balancer: removed %q from unhealthy after %v", k, b.healthCheckTimeout)
  178. }
  179. }
  180. b.unhealthyMu.Unlock()
  181. }
  182. func (b *GRPC17Health) liveAddrs() ([]grpc.Address, map[string]struct{}) {
  183. unhealthyCnt := b.countUnhealthy()
  184. b.mu.RLock()
  185. defer b.mu.RUnlock()
  186. hbAddrs := b.addrs
  187. if len(b.addrs) == 1 || unhealthyCnt == 0 || unhealthyCnt == len(b.addrs) {
  188. liveHostPorts := make(map[string]struct{}, len(b.hostPort2ep))
  189. for k := range b.hostPort2ep {
  190. liveHostPorts[k] = struct{}{}
  191. }
  192. return hbAddrs, liveHostPorts
  193. }
  194. addrs := make([]grpc.Address, 0, len(b.addrs)-unhealthyCnt)
  195. liveHostPorts := make(map[string]struct{}, len(addrs))
  196. for _, addr := range b.addrs {
  197. if !b.isUnhealthy(addr.Addr) {
  198. addrs = append(addrs, addr)
  199. liveHostPorts[addr.Addr] = struct{}{}
  200. }
  201. }
  202. return addrs, liveHostPorts
  203. }
  204. func (b *GRPC17Health) updateUnhealthy() {
  205. for {
  206. select {
  207. case <-time.After(b.healthCheckTimeout):
  208. b.cleanupUnhealthy()
  209. pinned := b.Pinned()
  210. if pinned == "" || b.isUnhealthy(pinned) {
  211. select {
  212. case b.updateAddrsC <- NotifyNext:
  213. case <-b.stopc:
  214. return
  215. }
  216. }
  217. case <-b.stopc:
  218. return
  219. }
  220. }
  221. }
  222. // NeedUpdate returns true if all connections are down or
  223. // addresses do not include current pinned address.
  224. func (b *GRPC17Health) NeedUpdate() bool {
  225. // updating notifyCh can trigger new connections,
  226. // need update addrs if all connections are down
  227. // or addrs does not include pinAddr.
  228. b.mu.RLock()
  229. update := !hasAddr(b.addrs, b.pinAddr)
  230. b.mu.RUnlock()
  231. return update
  232. }
  233. func (b *GRPC17Health) UpdateAddrs(eps ...string) {
  234. np := getHostPort2ep(eps)
  235. b.mu.Lock()
  236. defer b.mu.Unlock()
  237. match := len(np) == len(b.hostPort2ep)
  238. if match {
  239. for k, v := range np {
  240. if b.hostPort2ep[k] != v {
  241. match = false
  242. break
  243. }
  244. }
  245. }
  246. if match {
  247. // same endpoints, so no need to update address
  248. return
  249. }
  250. b.hostPort2ep = np
  251. b.addrs, b.eps = eps2addrs(eps), eps
  252. b.unhealthyMu.Lock()
  253. b.unhealthyHostPorts = make(map[string]time.Time)
  254. b.unhealthyMu.Unlock()
  255. }
  256. func (b *GRPC17Health) Next() {
  257. b.mu.RLock()
  258. downc := b.downc
  259. b.mu.RUnlock()
  260. select {
  261. case b.updateAddrsC <- NotifyNext:
  262. case <-b.stopc:
  263. }
  264. // wait until disconnect so new RPCs are not issued on old connection
  265. select {
  266. case <-downc:
  267. case <-b.stopc:
  268. }
  269. }
  270. func (b *GRPC17Health) updateNotifyLoop() {
  271. defer close(b.donec)
  272. for {
  273. b.mu.RLock()
  274. upc, downc, addr := b.upc, b.downc, b.pinAddr
  275. b.mu.RUnlock()
  276. // downc or upc should be closed
  277. select {
  278. case <-downc:
  279. downc = nil
  280. default:
  281. }
  282. select {
  283. case <-upc:
  284. upc = nil
  285. default:
  286. }
  287. switch {
  288. case downc == nil && upc == nil:
  289. // stale
  290. select {
  291. case <-b.stopc:
  292. return
  293. default:
  294. }
  295. case downc == nil:
  296. b.notifyAddrs(NotifyReset)
  297. select {
  298. case <-upc:
  299. case msg := <-b.updateAddrsC:
  300. b.notifyAddrs(msg)
  301. case <-b.stopc:
  302. return
  303. }
  304. case upc == nil:
  305. select {
  306. // close connections that are not the pinned address
  307. case b.notifyCh <- []grpc.Address{{Addr: addr}}:
  308. case <-downc:
  309. case <-b.stopc:
  310. return
  311. }
  312. select {
  313. case <-downc:
  314. b.notifyAddrs(NotifyReset)
  315. case msg := <-b.updateAddrsC:
  316. b.notifyAddrs(msg)
  317. case <-b.stopc:
  318. return
  319. }
  320. }
  321. }
  322. }
  323. func (b *GRPC17Health) notifyAddrs(msg NotifyMsg) {
  324. if msg == NotifyNext {
  325. select {
  326. case b.notifyCh <- []grpc.Address{}:
  327. case <-b.stopc:
  328. return
  329. }
  330. }
  331. b.mu.RLock()
  332. pinAddr := b.pinAddr
  333. downc := b.downc
  334. b.mu.RUnlock()
  335. addrs, hostPorts := b.liveAddrs()
  336. var waitDown bool
  337. if pinAddr != "" {
  338. _, ok := hostPorts[pinAddr]
  339. waitDown = !ok
  340. }
  341. select {
  342. case b.notifyCh <- addrs:
  343. if waitDown {
  344. select {
  345. case <-downc:
  346. case <-b.stopc:
  347. }
  348. }
  349. case <-b.stopc:
  350. }
  351. }
  352. func (b *GRPC17Health) Up(addr grpc.Address) func(error) {
  353. if !b.mayPin(addr) {
  354. return func(err error) {}
  355. }
  356. b.mu.Lock()
  357. defer b.mu.Unlock()
  358. // gRPC might call Up after it called Close. We add this check
  359. // to "fix" it up at application layer. Otherwise, will panic
  360. // if b.upc is already closed.
  361. if b.closed {
  362. return func(err error) {}
  363. }
  364. // gRPC might call Up on a stale address.
  365. // Prevent updating pinAddr with a stale address.
  366. if !hasAddr(b.addrs, addr.Addr) {
  367. return func(err error) {}
  368. }
  369. if b.pinAddr != "" {
  370. lg.Infof("clientv3/balancer: %q is up but not pinned (already pinned %q)", addr.Addr, b.pinAddr)
  371. return func(err error) {}
  372. }
  373. // notify waiting Get()s and pin first connected address
  374. close(b.upc)
  375. b.downc = make(chan struct{})
  376. b.pinAddr = addr.Addr
  377. lg.Infof("clientv3/balancer: pin %q", addr.Addr)
  378. // notify client that a connection is up
  379. b.readyOnce.Do(func() { close(b.readyc) })
  380. return func(err error) {
  381. // If connected to a black hole endpoint or a killed server, the gRPC ping
  382. // timeout will induce a network I/O error, and retrying until success;
  383. // finding healthy endpoint on retry could take several timeouts and redials.
  384. // To avoid wasting retries, gray-list unhealthy endpoints.
  385. b.HostPortError(addr.Addr, err)
  386. b.mu.Lock()
  387. b.upc = make(chan struct{})
  388. close(b.downc)
  389. b.pinAddr = ""
  390. b.mu.Unlock()
  391. lg.Infof("clientv3/balancer: unpin %q (%q)", addr.Addr, err.Error())
  392. }
  393. }
  394. func (b *GRPC17Health) mayPin(addr grpc.Address) bool {
  395. if b.Endpoint(addr.Addr) == "" { // stale host:port
  396. return false
  397. }
  398. b.unhealthyMu.RLock()
  399. unhealthyCnt := len(b.unhealthyHostPorts)
  400. failedTime, bad := b.unhealthyHostPorts[addr.Addr]
  401. b.unhealthyMu.RUnlock()
  402. b.mu.RLock()
  403. skip := len(b.addrs) == 1 || unhealthyCnt == 0 || len(b.addrs) == unhealthyCnt
  404. b.mu.RUnlock()
  405. if skip || !bad {
  406. return true
  407. }
  408. // prevent isolated member's endpoint from being infinitely retried, as follows:
  409. // 1. keepalive pings detects GoAway with http2.ErrCodeEnhanceYourCalm
  410. // 2. balancer 'Up' unpins with grpc: failed with network I/O error
  411. // 3. grpc-healthcheck still SERVING, thus retry to pin
  412. // instead, return before grpc-healthcheck if failed within healthcheck timeout
  413. if elapsed := time.Since(failedTime); elapsed < b.healthCheckTimeout {
  414. lg.Infof("clientv3/balancer: %q is up but not pinned (failed %v ago, require minimum %v after failure)", addr.Addr, elapsed, b.healthCheckTimeout)
  415. return false
  416. }
  417. if ok, _ := b.healthCheck(addr.Addr); ok {
  418. b.removeUnhealthy(addr.Addr, "health check success")
  419. return true
  420. }
  421. b.HostPortError(addr.Addr, errors.New("health check failed"))
  422. return false
  423. }
  424. func (b *GRPC17Health) Get(ctx context.Context, opts grpc.BalancerGetOptions) (grpc.Address, func(), error) {
  425. var (
  426. addr string
  427. closed bool
  428. )
  429. // If opts.BlockingWait is false (for fail-fast RPCs), it should return
  430. // an address it has notified via Notify immediately instead of blocking.
  431. if !opts.BlockingWait {
  432. b.mu.RLock()
  433. closed = b.closed
  434. addr = b.pinAddr
  435. b.mu.RUnlock()
  436. if closed {
  437. return grpc.Address{Addr: ""}, nil, grpc.ErrClientConnClosing
  438. }
  439. if addr == "" {
  440. return grpc.Address{Addr: ""}, nil, ErrNoAddrAvilable
  441. }
  442. return grpc.Address{Addr: addr}, func() {}, nil
  443. }
  444. for {
  445. b.mu.RLock()
  446. ch := b.upc
  447. b.mu.RUnlock()
  448. select {
  449. case <-ch:
  450. case <-b.donec:
  451. return grpc.Address{Addr: ""}, nil, grpc.ErrClientConnClosing
  452. case <-ctx.Done():
  453. return grpc.Address{Addr: ""}, nil, ctx.Err()
  454. }
  455. b.mu.RLock()
  456. closed = b.closed
  457. addr = b.pinAddr
  458. b.mu.RUnlock()
  459. // Close() which sets b.closed = true can be called before Get(), Get() must exit if balancer is closed.
  460. if closed {
  461. return grpc.Address{Addr: ""}, nil, grpc.ErrClientConnClosing
  462. }
  463. if addr != "" {
  464. break
  465. }
  466. }
  467. return grpc.Address{Addr: addr}, func() {}, nil
  468. }
  469. func (b *GRPC17Health) Notify() <-chan []grpc.Address { return b.notifyCh }
  470. func (b *GRPC17Health) Close() error {
  471. b.mu.Lock()
  472. // In case gRPC calls close twice. TODO: remove the checking
  473. // when we are sure that gRPC wont call close twice.
  474. if b.closed {
  475. b.mu.Unlock()
  476. <-b.donec
  477. return nil
  478. }
  479. b.closed = true
  480. b.stopOnce.Do(func() { close(b.stopc) })
  481. b.pinAddr = ""
  482. // In the case of following scenario:
  483. // 1. upc is not closed; no pinned address
  484. // 2. client issues an RPC, calling invoke(), which calls Get(), enters for loop, blocks
  485. // 3. client.conn.Close() calls balancer.Close(); closed = true
  486. // 4. for loop in Get() never exits since ctx is the context passed in by the client and may not be canceled
  487. // we must close upc so Get() exits from blocking on upc
  488. select {
  489. case <-b.upc:
  490. default:
  491. // terminate all waiting Get()s
  492. close(b.upc)
  493. }
  494. b.mu.Unlock()
  495. b.wg.Wait()
  496. // wait for updateNotifyLoop to finish
  497. <-b.donec
  498. close(b.notifyCh)
  499. return nil
  500. }
  501. func grpcHealthCheck(ep string, dialFunc func(ep string, dopts ...grpc.DialOption) (*grpc.ClientConn, error)) (bool, error) {
  502. conn, err := dialFunc(ep)
  503. if err != nil {
  504. return false, err
  505. }
  506. defer conn.Close()
  507. cli := healthpb.NewHealthClient(conn)
  508. ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  509. resp, err := cli.Check(ctx, &healthpb.HealthCheckRequest{})
  510. cancel()
  511. if err != nil {
  512. if s, ok := status.FromError(err); ok && s.Code() == codes.Unavailable {
  513. if s.Message() == unknownService { // etcd < v3.3.0
  514. return true, nil
  515. }
  516. }
  517. return false, err
  518. }
  519. return resp.Status == healthpb.HealthCheckResponse_SERVING, nil
  520. }
  521. func hasAddr(addrs []grpc.Address, targetAddr string) bool {
  522. for _, addr := range addrs {
  523. if targetAddr == addr.Addr {
  524. return true
  525. }
  526. }
  527. return false
  528. }
  529. func getHost(ep string) string {
  530. url, uerr := url.Parse(ep)
  531. if uerr != nil || !strings.Contains(ep, "://") {
  532. return ep
  533. }
  534. return url.Host
  535. }
  536. func eps2addrs(eps []string) []grpc.Address {
  537. addrs := make([]grpc.Address, len(eps))
  538. for i := range eps {
  539. addrs[i].Addr = getHost(eps[i])
  540. }
  541. return addrs
  542. }
  543. func getHostPort2ep(eps []string) map[string]string {
  544. hm := make(map[string]string, len(eps))
  545. for i := range eps {
  546. _, host, _ := parseEndpoint(eps[i])
  547. hm[host] = eps[i]
  548. }
  549. return hm
  550. }
  551. func parseEndpoint(endpoint string) (proto string, host string, scheme string) {
  552. proto = "tcp"
  553. host = endpoint
  554. url, uerr := url.Parse(endpoint)
  555. if uerr != nil || !strings.Contains(endpoint, "://") {
  556. return proto, host, scheme
  557. }
  558. scheme = url.Scheme
  559. // strip scheme:// prefix since grpc dials by host
  560. host = url.Host
  561. switch url.Scheme {
  562. case "http", "https":
  563. case "unix", "unixs":
  564. proto = "unix"
  565. host = url.Host + url.Path
  566. default:
  567. proto, host = "", ""
  568. }
  569. return proto, host, scheme
  570. }