clientconn.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724
  1. /*
  2. *
  3. * Copyright 2014, Google Inc.
  4. * All rights reserved.
  5. *
  6. * Redistribution and use in source and binary forms, with or without
  7. * modification, are permitted provided that the following conditions are
  8. * met:
  9. *
  10. * * Redistributions of source code must retain the above copyright
  11. * notice, this list of conditions and the following disclaimer.
  12. * * Redistributions in binary form must reproduce the above
  13. * copyright notice, this list of conditions and the following disclaimer
  14. * in the documentation and/or other materials provided with the
  15. * distribution.
  16. * * Neither the name of Google Inc. nor the names of its
  17. * contributors may be used to endorse or promote products derived from
  18. * this software without specific prior written permission.
  19. *
  20. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  21. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  22. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  23. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  24. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  25. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  26. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  27. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  28. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  29. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  30. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  31. *
  32. */
  33. package grpc
  34. import (
  35. "errors"
  36. "fmt"
  37. "net"
  38. "strings"
  39. "sync"
  40. "time"
  41. "golang.org/x/net/context"
  42. "golang.org/x/net/trace"
  43. "google.golang.org/grpc/codes"
  44. "google.golang.org/grpc/credentials"
  45. "google.golang.org/grpc/grpclog"
  46. "google.golang.org/grpc/transport"
  47. )
  48. var (
  49. // ErrClientConnClosing indicates that the operation is illegal because
  50. // the ClientConn is closing.
  51. ErrClientConnClosing = errors.New("grpc: the client connection is closing")
  52. // ErrClientConnTimeout indicates that the ClientConn cannot establish the
  53. // underlying connections within the specified timeout.
  54. ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
  55. // errNoTransportSecurity indicates that there is no transport security
  56. // being set for ClientConn. Users should either set one or explicitly
  57. // call WithInsecure DialOption to disable security.
  58. errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")
  59. // errTransportCredentialsMissing indicates that users want to transmit security
  60. // information (e.g., oauth2 token) which requires secure connection on an insecure
  61. // connection.
  62. errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
  63. // errCredentialsConflict indicates that grpc.WithTransportCredentials()
  64. // and grpc.WithInsecure() are both called for a connection.
  65. errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
  66. // errNetworkIP indicates that the connection is down due to some network I/O error.
  67. errNetworkIO = errors.New("grpc: failed with network I/O error")
  68. // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
  69. errConnDrain = errors.New("grpc: the connection is drained")
  70. // errConnClosing indicates that the connection is closing.
  71. errConnClosing = errors.New("grpc: the connection is closing")
  72. errNoAddr = errors.New("grpc: there is no address available to dial")
  73. // minimum time to give a connection to complete
  74. minConnectTimeout = 20 * time.Second
  75. )
  76. // dialOptions configure a Dial call. dialOptions are set by the DialOption
  77. // values passed to Dial.
  78. type dialOptions struct {
  79. codec Codec
  80. cp Compressor
  81. dc Decompressor
  82. bs backoffStrategy
  83. balancer Balancer
  84. block bool
  85. insecure bool
  86. timeout time.Duration
  87. copts transport.ConnectOptions
  88. }
  89. // DialOption configures how we set up the connection.
  90. type DialOption func(*dialOptions)
  91. // WithCodec returns a DialOption which sets a codec for message marshaling and unmarshaling.
  92. func WithCodec(c Codec) DialOption {
  93. return func(o *dialOptions) {
  94. o.codec = c
  95. }
  96. }
  97. // WithCompressor returns a DialOption which sets a CompressorGenerator for generating message
  98. // compressor.
  99. func WithCompressor(cp Compressor) DialOption {
  100. return func(o *dialOptions) {
  101. o.cp = cp
  102. }
  103. }
  104. // WithDecompressor returns a DialOption which sets a DecompressorGenerator for generating
  105. // message decompressor.
  106. func WithDecompressor(dc Decompressor) DialOption {
  107. return func(o *dialOptions) {
  108. o.dc = dc
  109. }
  110. }
  111. // WithBalancer returns a DialOption which sets a load balancer.
  112. func WithBalancer(b Balancer) DialOption {
  113. return func(o *dialOptions) {
  114. o.balancer = b
  115. }
  116. }
  117. // WithBackoffMaxDelay configures the dialer to use the provided maximum delay
  118. // when backing off after failed connection attempts.
  119. func WithBackoffMaxDelay(md time.Duration) DialOption {
  120. return WithBackoffConfig(BackoffConfig{MaxDelay: md})
  121. }
  122. // WithBackoffConfig configures the dialer to use the provided backoff
  123. // parameters after connection failures.
  124. //
  125. // Use WithBackoffMaxDelay until more parameters on BackoffConfig are opened up
  126. // for use.
  127. func WithBackoffConfig(b BackoffConfig) DialOption {
  128. // Set defaults to ensure that provided BackoffConfig is valid and
  129. // unexported fields get default values.
  130. setDefaults(&b)
  131. return withBackoff(b)
  132. }
  133. // withBackoff sets the backoff strategy used for retries after a
  134. // failed connection attempt.
  135. //
  136. // This can be exported if arbitrary backoff strategies are allowed by gRPC.
  137. func withBackoff(bs backoffStrategy) DialOption {
  138. return func(o *dialOptions) {
  139. o.bs = bs
  140. }
  141. }
  142. // WithBlock returns a DialOption which makes caller of Dial blocks until the underlying
  143. // connection is up. Without this, Dial returns immediately and connecting the server
  144. // happens in background.
  145. func WithBlock() DialOption {
  146. return func(o *dialOptions) {
  147. o.block = true
  148. }
  149. }
  150. // WithInsecure returns a DialOption which disables transport security for this ClientConn.
  151. // Note that transport security is required unless WithInsecure is set.
  152. func WithInsecure() DialOption {
  153. return func(o *dialOptions) {
  154. o.insecure = true
  155. }
  156. }
  157. // WithTransportCredentials returns a DialOption which configures a
  158. // connection level security credentials (e.g., TLS/SSL).
  159. func WithTransportCredentials(creds credentials.TransportCredentials) DialOption {
  160. return func(o *dialOptions) {
  161. o.copts.TransportCredentials = creds
  162. }
  163. }
  164. // WithPerRPCCredentials returns a DialOption which sets
  165. // credentials which will place auth state on each outbound RPC.
  166. func WithPerRPCCredentials(creds credentials.PerRPCCredentials) DialOption {
  167. return func(o *dialOptions) {
  168. o.copts.PerRPCCredentials = append(o.copts.PerRPCCredentials, creds)
  169. }
  170. }
  171. // WithTimeout returns a DialOption that configures a timeout for dialing a ClientConn
  172. // initially. This is valid if and only if WithBlock() is present.
  173. func WithTimeout(d time.Duration) DialOption {
  174. return func(o *dialOptions) {
  175. o.timeout = d
  176. }
  177. }
  178. // WithDialer returns a DialOption that specifies a function to use for dialing network addresses.
  179. func WithDialer(f func(addr string, timeout time.Duration) (net.Conn, error)) DialOption {
  180. return func(o *dialOptions) {
  181. o.copts.Dialer = f
  182. }
  183. }
  184. // WithUserAgent returns a DialOption that specifies a user agent string for all the RPCs.
  185. func WithUserAgent(s string) DialOption {
  186. return func(o *dialOptions) {
  187. o.copts.UserAgent = s
  188. }
  189. }
  190. // Dial creates a client connection the given target.
  191. func Dial(target string, opts ...DialOption) (*ClientConn, error) {
  192. cc := &ClientConn{
  193. target: target,
  194. conns: make(map[Address]*addrConn),
  195. }
  196. for _, opt := range opts {
  197. opt(&cc.dopts)
  198. }
  199. // Set defaults.
  200. if cc.dopts.codec == nil {
  201. cc.dopts.codec = protoCodec{}
  202. }
  203. if cc.dopts.bs == nil {
  204. cc.dopts.bs = DefaultBackoffConfig
  205. }
  206. if cc.dopts.balancer == nil {
  207. cc.dopts.balancer = RoundRobin(nil)
  208. }
  209. if err := cc.dopts.balancer.Start(target); err != nil {
  210. return nil, err
  211. }
  212. var (
  213. ok bool
  214. addrs []Address
  215. )
  216. ch := cc.dopts.balancer.Notify()
  217. if ch == nil {
  218. // There is no name resolver installed.
  219. addrs = append(addrs, Address{Addr: target})
  220. } else {
  221. addrs, ok = <-ch
  222. if !ok || len(addrs) == 0 {
  223. return nil, errNoAddr
  224. }
  225. }
  226. waitC := make(chan error, 1)
  227. go func() {
  228. for _, a := range addrs {
  229. if err := cc.newAddrConn(a, false); err != nil {
  230. waitC <- err
  231. return
  232. }
  233. }
  234. close(waitC)
  235. }()
  236. var timeoutCh <-chan time.Time
  237. if cc.dopts.timeout > 0 {
  238. timeoutCh = time.After(cc.dopts.timeout)
  239. }
  240. select {
  241. case err := <-waitC:
  242. if err != nil {
  243. cc.Close()
  244. return nil, err
  245. }
  246. case <-timeoutCh:
  247. cc.Close()
  248. return nil, ErrClientConnTimeout
  249. }
  250. if ok {
  251. go cc.lbWatcher()
  252. }
  253. colonPos := strings.LastIndex(target, ":")
  254. if colonPos == -1 {
  255. colonPos = len(target)
  256. }
  257. cc.authority = target[:colonPos]
  258. return cc, nil
  259. }
  260. // ConnectivityState indicates the state of a client connection.
  261. type ConnectivityState int
  262. const (
  263. // Idle indicates the ClientConn is idle.
  264. Idle ConnectivityState = iota
  265. // Connecting indicates the ClienConn is connecting.
  266. Connecting
  267. // Ready indicates the ClientConn is ready for work.
  268. Ready
  269. // TransientFailure indicates the ClientConn has seen a failure but expects to recover.
  270. TransientFailure
  271. // Shutdown indicates the ClientConn has started shutting down.
  272. Shutdown
  273. )
  274. func (s ConnectivityState) String() string {
  275. switch s {
  276. case Idle:
  277. return "IDLE"
  278. case Connecting:
  279. return "CONNECTING"
  280. case Ready:
  281. return "READY"
  282. case TransientFailure:
  283. return "TRANSIENT_FAILURE"
  284. case Shutdown:
  285. return "SHUTDOWN"
  286. default:
  287. panic(fmt.Sprintf("unknown connectivity state: %d", s))
  288. }
  289. }
  290. // ClientConn represents a client connection to an RPC server.
  291. type ClientConn struct {
  292. target string
  293. authority string
  294. dopts dialOptions
  295. mu sync.RWMutex
  296. conns map[Address]*addrConn
  297. }
  298. func (cc *ClientConn) lbWatcher() {
  299. for addrs := range cc.dopts.balancer.Notify() {
  300. var (
  301. add []Address // Addresses need to setup connections.
  302. del []*addrConn // Connections need to tear down.
  303. )
  304. cc.mu.Lock()
  305. for _, a := range addrs {
  306. if _, ok := cc.conns[a]; !ok {
  307. add = append(add, a)
  308. }
  309. }
  310. for k, c := range cc.conns {
  311. var keep bool
  312. for _, a := range addrs {
  313. if k == a {
  314. keep = true
  315. break
  316. }
  317. }
  318. if !keep {
  319. del = append(del, c)
  320. }
  321. }
  322. cc.mu.Unlock()
  323. for _, a := range add {
  324. cc.newAddrConn(a, true)
  325. }
  326. for _, c := range del {
  327. c.tearDown(errConnDrain)
  328. }
  329. }
  330. }
  331. func (cc *ClientConn) newAddrConn(addr Address, skipWait bool) error {
  332. ac := &addrConn{
  333. cc: cc,
  334. addr: addr,
  335. dopts: cc.dopts,
  336. shutdownChan: make(chan struct{}),
  337. }
  338. if EnableTracing {
  339. ac.events = trace.NewEventLog("grpc.ClientConn", ac.addr.Addr)
  340. }
  341. if !ac.dopts.insecure {
  342. if ac.dopts.copts.TransportCredentials == nil {
  343. return errNoTransportSecurity
  344. }
  345. } else {
  346. if ac.dopts.copts.TransportCredentials != nil {
  347. return errCredentialsConflict
  348. }
  349. for _, cd := range ac.dopts.copts.PerRPCCredentials {
  350. if cd.RequireTransportSecurity() {
  351. return errTransportCredentialsMissing
  352. }
  353. }
  354. }
  355. // Insert ac into ac.cc.conns. This needs to be done before any getTransport(...) is called.
  356. ac.cc.mu.Lock()
  357. if ac.cc.conns == nil {
  358. ac.cc.mu.Unlock()
  359. return ErrClientConnClosing
  360. }
  361. stale := ac.cc.conns[ac.addr]
  362. ac.cc.conns[ac.addr] = ac
  363. ac.cc.mu.Unlock()
  364. if stale != nil {
  365. // There is an addrConn alive on ac.addr already. This could be due to
  366. // i) stale's Close is undergoing;
  367. // ii) a buggy Balancer notifies duplicated Addresses.
  368. stale.tearDown(errConnDrain)
  369. }
  370. ac.stateCV = sync.NewCond(&ac.mu)
  371. // skipWait may overwrite the decision in ac.dopts.block.
  372. if ac.dopts.block && !skipWait {
  373. if err := ac.resetTransport(false); err != nil {
  374. ac.tearDown(err)
  375. return err
  376. }
  377. // Start to monitor the error status of transport.
  378. go ac.transportMonitor()
  379. } else {
  380. // Start a goroutine connecting to the server asynchronously.
  381. go func() {
  382. if err := ac.resetTransport(false); err != nil {
  383. grpclog.Printf("Failed to dial %s: %v; please retry.", ac.addr.Addr, err)
  384. ac.tearDown(err)
  385. return
  386. }
  387. ac.transportMonitor()
  388. }()
  389. }
  390. return nil
  391. }
  392. func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) {
  393. addr, put, err := cc.dopts.balancer.Get(ctx, opts)
  394. if err != nil {
  395. return nil, nil, toRPCErr(err)
  396. }
  397. cc.mu.RLock()
  398. if cc.conns == nil {
  399. cc.mu.RUnlock()
  400. return nil, nil, toRPCErr(ErrClientConnClosing)
  401. }
  402. ac, ok := cc.conns[addr]
  403. cc.mu.RUnlock()
  404. if !ok {
  405. if put != nil {
  406. put()
  407. }
  408. return nil, nil, Errorf(codes.Internal, "grpc: failed to find the transport to send the rpc")
  409. }
  410. t, err := ac.wait(ctx, !opts.BlockingWait)
  411. if err != nil {
  412. if put != nil {
  413. put()
  414. }
  415. return nil, nil, err
  416. }
  417. return t, put, nil
  418. }
  419. // Close tears down the ClientConn and all underlying connections.
  420. func (cc *ClientConn) Close() error {
  421. cc.mu.Lock()
  422. if cc.conns == nil {
  423. cc.mu.Unlock()
  424. return ErrClientConnClosing
  425. }
  426. conns := cc.conns
  427. cc.conns = nil
  428. cc.mu.Unlock()
  429. cc.dopts.balancer.Close()
  430. for _, ac := range conns {
  431. ac.tearDown(ErrClientConnClosing)
  432. }
  433. return nil
  434. }
  435. // addrConn is a network connection to a given address.
  436. type addrConn struct {
  437. cc *ClientConn
  438. addr Address
  439. dopts dialOptions
  440. shutdownChan chan struct{}
  441. events trace.EventLog
  442. mu sync.Mutex
  443. state ConnectivityState
  444. stateCV *sync.Cond
  445. down func(error) // the handler called when a connection is down.
  446. // ready is closed and becomes nil when a new transport is up or failed
  447. // due to timeout.
  448. ready chan struct{}
  449. transport transport.ClientTransport
  450. }
  451. // printf records an event in ac's event log, unless ac has been closed.
  452. // REQUIRES ac.mu is held.
  453. func (ac *addrConn) printf(format string, a ...interface{}) {
  454. if ac.events != nil {
  455. ac.events.Printf(format, a...)
  456. }
  457. }
  458. // errorf records an error in ac's event log, unless ac has been closed.
  459. // REQUIRES ac.mu is held.
  460. func (ac *addrConn) errorf(format string, a ...interface{}) {
  461. if ac.events != nil {
  462. ac.events.Errorf(format, a...)
  463. }
  464. }
  465. // getState returns the connectivity state of the Conn
  466. func (ac *addrConn) getState() ConnectivityState {
  467. ac.mu.Lock()
  468. defer ac.mu.Unlock()
  469. return ac.state
  470. }
  471. // waitForStateChange blocks until the state changes to something other than the sourceState.
  472. func (ac *addrConn) waitForStateChange(ctx context.Context, sourceState ConnectivityState) (ConnectivityState, error) {
  473. ac.mu.Lock()
  474. defer ac.mu.Unlock()
  475. if sourceState != ac.state {
  476. return ac.state, nil
  477. }
  478. done := make(chan struct{})
  479. var err error
  480. go func() {
  481. select {
  482. case <-ctx.Done():
  483. ac.mu.Lock()
  484. err = ctx.Err()
  485. ac.stateCV.Broadcast()
  486. ac.mu.Unlock()
  487. case <-done:
  488. }
  489. }()
  490. defer close(done)
  491. for sourceState == ac.state {
  492. ac.stateCV.Wait()
  493. if err != nil {
  494. return ac.state, err
  495. }
  496. }
  497. return ac.state, nil
  498. }
  499. func (ac *addrConn) resetTransport(closeTransport bool) error {
  500. var retries int
  501. for {
  502. ac.mu.Lock()
  503. ac.printf("connecting")
  504. if ac.state == Shutdown {
  505. // ac.tearDown(...) has been invoked.
  506. ac.mu.Unlock()
  507. return errConnClosing
  508. }
  509. if ac.down != nil {
  510. ac.down(downErrorf(false, true, "%v", errNetworkIO))
  511. ac.down = nil
  512. }
  513. ac.state = Connecting
  514. ac.stateCV.Broadcast()
  515. t := ac.transport
  516. ac.mu.Unlock()
  517. if closeTransport && t != nil {
  518. t.Close()
  519. }
  520. sleepTime := ac.dopts.bs.backoff(retries)
  521. ac.dopts.copts.Timeout = sleepTime
  522. if sleepTime < minConnectTimeout {
  523. ac.dopts.copts.Timeout = minConnectTimeout
  524. }
  525. connectTime := time.Now()
  526. newTransport, err := transport.NewClientTransport(ac.addr.Addr, &ac.dopts.copts)
  527. if err != nil {
  528. ac.mu.Lock()
  529. if ac.state == Shutdown {
  530. // ac.tearDown(...) has been invoked.
  531. ac.mu.Unlock()
  532. return errConnClosing
  533. }
  534. ac.errorf("transient failure: %v", err)
  535. ac.state = TransientFailure
  536. ac.stateCV.Broadcast()
  537. if ac.ready != nil {
  538. close(ac.ready)
  539. ac.ready = nil
  540. }
  541. ac.mu.Unlock()
  542. sleepTime -= time.Since(connectTime)
  543. if sleepTime < 0 {
  544. sleepTime = 0
  545. }
  546. closeTransport = false
  547. select {
  548. case <-time.After(sleepTime):
  549. case <-ac.shutdownChan:
  550. }
  551. retries++
  552. grpclog.Printf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %q", err, ac.addr)
  553. continue
  554. }
  555. ac.mu.Lock()
  556. ac.printf("ready")
  557. if ac.state == Shutdown {
  558. // ac.tearDown(...) has been invoked.
  559. ac.mu.Unlock()
  560. newTransport.Close()
  561. return errConnClosing
  562. }
  563. ac.state = Ready
  564. ac.stateCV.Broadcast()
  565. ac.transport = newTransport
  566. if ac.ready != nil {
  567. close(ac.ready)
  568. ac.ready = nil
  569. }
  570. ac.down = ac.cc.dopts.balancer.Up(ac.addr)
  571. ac.mu.Unlock()
  572. return nil
  573. }
  574. }
  575. // Run in a goroutine to track the error in transport and create the
  576. // new transport if an error happens. It returns when the channel is closing.
  577. func (ac *addrConn) transportMonitor() {
  578. for {
  579. ac.mu.Lock()
  580. t := ac.transport
  581. ac.mu.Unlock()
  582. select {
  583. // shutdownChan is needed to detect the teardown when
  584. // the addrConn is idle (i.e., no RPC in flight).
  585. case <-ac.shutdownChan:
  586. return
  587. case <-t.Error():
  588. ac.mu.Lock()
  589. if ac.state == Shutdown {
  590. // ac.tearDown(...) has been invoked.
  591. ac.mu.Unlock()
  592. return
  593. }
  594. ac.state = TransientFailure
  595. ac.stateCV.Broadcast()
  596. ac.mu.Unlock()
  597. if err := ac.resetTransport(true); err != nil {
  598. ac.mu.Lock()
  599. ac.printf("transport exiting: %v", err)
  600. ac.mu.Unlock()
  601. grpclog.Printf("grpc: addrConn.transportMonitor exits due to: %v", err)
  602. return
  603. }
  604. }
  605. }
  606. }
  607. // wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or
  608. // iv) transport is in TransientFailure and the RPC is fail-fast.
  609. func (ac *addrConn) wait(ctx context.Context, failFast bool) (transport.ClientTransport, error) {
  610. for {
  611. ac.mu.Lock()
  612. switch {
  613. case ac.state == Shutdown:
  614. ac.mu.Unlock()
  615. return nil, errConnClosing
  616. case ac.state == Ready:
  617. ct := ac.transport
  618. ac.mu.Unlock()
  619. return ct, nil
  620. case ac.state == TransientFailure && failFast:
  621. ac.mu.Unlock()
  622. return nil, Errorf(codes.Unavailable, "grpc: RPC failed fast due to transport failure")
  623. default:
  624. ready := ac.ready
  625. if ready == nil {
  626. ready = make(chan struct{})
  627. ac.ready = ready
  628. }
  629. ac.mu.Unlock()
  630. select {
  631. case <-ctx.Done():
  632. return nil, toRPCErr(ctx.Err())
  633. // Wait until the new transport is ready or failed.
  634. case <-ready:
  635. }
  636. }
  637. }
  638. }
  639. // tearDown starts to tear down the addrConn.
  640. // TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
  641. // some edge cases (e.g., the caller opens and closes many addrConn's in a
  642. // tight loop.
  643. func (ac *addrConn) tearDown(err error) {
  644. ac.mu.Lock()
  645. defer func() {
  646. ac.mu.Unlock()
  647. ac.cc.mu.Lock()
  648. if ac.cc.conns != nil {
  649. delete(ac.cc.conns, ac.addr)
  650. }
  651. ac.cc.mu.Unlock()
  652. }()
  653. if ac.state == Shutdown {
  654. return
  655. }
  656. ac.state = Shutdown
  657. if ac.down != nil {
  658. ac.down(downErrorf(false, false, "%v", err))
  659. ac.down = nil
  660. }
  661. ac.stateCV.Broadcast()
  662. if ac.events != nil {
  663. ac.events.Finish()
  664. ac.events = nil
  665. }
  666. if ac.ready != nil {
  667. close(ac.ready)
  668. ac.ready = nil
  669. }
  670. if ac.transport != nil {
  671. if err == errConnDrain {
  672. ac.transport.GracefulClose()
  673. } else {
  674. ac.transport.Close()
  675. }
  676. }
  677. if ac.shutdownChan != nil {
  678. close(ac.shutdownChan)
  679. }
  680. return
  681. }