clientconn.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832
  1. /*
  2. *
  3. * Copyright 2014, Google Inc.
  4. * All rights reserved.
  5. *
  6. * Redistribution and use in source and binary forms, with or without
  7. * modification, are permitted provided that the following conditions are
  8. * met:
  9. *
  10. * * Redistributions of source code must retain the above copyright
  11. * notice, this list of conditions and the following disclaimer.
  12. * * Redistributions in binary form must reproduce the above
  13. * copyright notice, this list of conditions and the following disclaimer
  14. * in the documentation and/or other materials provided with the
  15. * distribution.
  16. * * Neither the name of Google Inc. nor the names of its
  17. * contributors may be used to endorse or promote products derived from
  18. * this software without specific prior written permission.
  19. *
  20. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  21. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  22. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  23. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  24. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  25. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  26. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  27. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  28. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  29. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  30. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  31. *
  32. */
  33. package grpc
  34. import (
  35. "errors"
  36. "fmt"
  37. "net"
  38. "strings"
  39. "sync"
  40. "time"
  41. "golang.org/x/net/context"
  42. "golang.org/x/net/trace"
  43. "google.golang.org/grpc/credentials"
  44. "google.golang.org/grpc/grpclog"
  45. "google.golang.org/grpc/transport"
  46. )
  47. var (
  48. // ErrClientConnClosing indicates that the operation is illegal because
  49. // the ClientConn is closing.
  50. ErrClientConnClosing = errors.New("grpc: the client connection is closing")
  51. // ErrClientConnTimeout indicates that the ClientConn cannot establish the
  52. // underlying connections within the specified timeout.
  53. ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
  54. // errNoTransportSecurity indicates that there is no transport security
  55. // being set for ClientConn. Users should either set one or explicitly
  56. // call WithInsecure DialOption to disable security.
  57. errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")
  58. // errTransportCredentialsMissing indicates that users want to transmit security
  59. // information (e.g., oauth2 token) which requires secure connection on an insecure
  60. // connection.
  61. errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
  62. // errCredentialsConflict indicates that grpc.WithTransportCredentials()
  63. // and grpc.WithInsecure() are both called for a connection.
  64. errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
  65. // errNetworkIO indicates that the connection is down due to some network I/O error.
  66. errNetworkIO = errors.New("grpc: failed with network I/O error")
  67. // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
  68. errConnDrain = errors.New("grpc: the connection is drained")
  69. // errConnClosing indicates that the connection is closing.
  70. errConnClosing = errors.New("grpc: the connection is closing")
  71. errNoAddr = errors.New("grpc: there is no address available to dial")
  72. // minimum time to give a connection to complete
  73. minConnectTimeout = 20 * time.Second
  74. )
  75. // dialOptions configure a Dial call. dialOptions are set by the DialOption
  76. // values passed to Dial.
  77. type dialOptions struct {
  78. codec Codec
  79. cp Compressor
  80. dc Decompressor
  81. bs backoffStrategy
  82. balancer Balancer
  83. block bool
  84. insecure bool
  85. timeout time.Duration
  86. copts transport.ConnectOptions
  87. }
  88. // DialOption configures how we set up the connection.
  89. type DialOption func(*dialOptions)
  90. // WithCodec returns a DialOption which sets a codec for message marshaling and unmarshaling.
  91. func WithCodec(c Codec) DialOption {
  92. return func(o *dialOptions) {
  93. o.codec = c
  94. }
  95. }
  96. // WithCompressor returns a DialOption which sets a CompressorGenerator for generating message
  97. // compressor.
  98. func WithCompressor(cp Compressor) DialOption {
  99. return func(o *dialOptions) {
  100. o.cp = cp
  101. }
  102. }
  103. // WithDecompressor returns a DialOption which sets a DecompressorGenerator for generating
  104. // message decompressor.
  105. func WithDecompressor(dc Decompressor) DialOption {
  106. return func(o *dialOptions) {
  107. o.dc = dc
  108. }
  109. }
  110. // WithBalancer returns a DialOption which sets a load balancer.
  111. func WithBalancer(b Balancer) DialOption {
  112. return func(o *dialOptions) {
  113. o.balancer = b
  114. }
  115. }
  116. // WithBackoffMaxDelay configures the dialer to use the provided maximum delay
  117. // when backing off after failed connection attempts.
  118. func WithBackoffMaxDelay(md time.Duration) DialOption {
  119. return WithBackoffConfig(BackoffConfig{MaxDelay: md})
  120. }
  121. // WithBackoffConfig configures the dialer to use the provided backoff
  122. // parameters after connection failures.
  123. //
  124. // Use WithBackoffMaxDelay until more parameters on BackoffConfig are opened up
  125. // for use.
  126. func WithBackoffConfig(b BackoffConfig) DialOption {
  127. // Set defaults to ensure that provided BackoffConfig is valid and
  128. // unexported fields get default values.
  129. setDefaults(&b)
  130. return withBackoff(b)
  131. }
  132. // withBackoff sets the backoff strategy used for retries after a
  133. // failed connection attempt.
  134. //
  135. // This can be exported if arbitrary backoff strategies are allowed by gRPC.
  136. func withBackoff(bs backoffStrategy) DialOption {
  137. return func(o *dialOptions) {
  138. o.bs = bs
  139. }
  140. }
  141. // WithBlock returns a DialOption which makes caller of Dial blocks until the underlying
  142. // connection is up. Without this, Dial returns immediately and connecting the server
  143. // happens in background.
  144. func WithBlock() DialOption {
  145. return func(o *dialOptions) {
  146. o.block = true
  147. }
  148. }
  149. // WithInsecure returns a DialOption which disables transport security for this ClientConn.
  150. // Note that transport security is required unless WithInsecure is set.
  151. func WithInsecure() DialOption {
  152. return func(o *dialOptions) {
  153. o.insecure = true
  154. }
  155. }
  156. // WithTransportCredentials returns a DialOption which configures a
  157. // connection level security credentials (e.g., TLS/SSL).
  158. func WithTransportCredentials(creds credentials.TransportCredentials) DialOption {
  159. return func(o *dialOptions) {
  160. o.copts.TransportCredentials = creds
  161. }
  162. }
  163. // WithPerRPCCredentials returns a DialOption which sets
  164. // credentials which will place auth state on each outbound RPC.
  165. func WithPerRPCCredentials(creds credentials.PerRPCCredentials) DialOption {
  166. return func(o *dialOptions) {
  167. o.copts.PerRPCCredentials = append(o.copts.PerRPCCredentials, creds)
  168. }
  169. }
  170. // WithTimeout returns a DialOption that configures a timeout for dialing a ClientConn
  171. // initially. This is valid if and only if WithBlock() is present.
  172. func WithTimeout(d time.Duration) DialOption {
  173. return func(o *dialOptions) {
  174. o.timeout = d
  175. }
  176. }
  177. // WithDialer returns a DialOption that specifies a function to use for dialing network addresses.
  178. func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption {
  179. return func(o *dialOptions) {
  180. o.copts.Dialer = func(ctx context.Context, addr string) (net.Conn, error) {
  181. if deadline, ok := ctx.Deadline(); ok {
  182. return f(addr, deadline.Sub(time.Now()))
  183. }
  184. return f(addr, 0)
  185. }
  186. }
  187. }
  188. // WithUserAgent returns a DialOption that specifies a user agent string for all the RPCs.
  189. func WithUserAgent(s string) DialOption {
  190. return func(o *dialOptions) {
  191. o.copts.UserAgent = s
  192. }
  193. }
  194. // Dial creates a client connection the given target.
  195. func Dial(target string, opts ...DialOption) (*ClientConn, error) {
  196. ctx := context.Background()
  197. cc := &ClientConn{
  198. target: target,
  199. conns: make(map[Address]*addrConn),
  200. }
  201. cc.ctx, cc.cancel = context.WithCancel(ctx)
  202. for _, opt := range opts {
  203. opt(&cc.dopts)
  204. }
  205. // Set defaults.
  206. if cc.dopts.codec == nil {
  207. cc.dopts.codec = protoCodec{}
  208. }
  209. if cc.dopts.bs == nil {
  210. cc.dopts.bs = DefaultBackoffConfig
  211. }
  212. var (
  213. ok bool
  214. addrs []Address
  215. )
  216. if cc.dopts.balancer == nil {
  217. // Connect to target directly if balancer is nil.
  218. addrs = append(addrs, Address{Addr: target})
  219. } else {
  220. if err := cc.dopts.balancer.Start(target); err != nil {
  221. return nil, err
  222. }
  223. ch := cc.dopts.balancer.Notify()
  224. if ch == nil {
  225. // There is no name resolver installed.
  226. addrs = append(addrs, Address{Addr: target})
  227. } else {
  228. addrs, ok = <-ch
  229. if !ok || len(addrs) == 0 {
  230. return nil, errNoAddr
  231. }
  232. }
  233. }
  234. waitC := make(chan error, 1)
  235. go func() {
  236. for _, a := range addrs {
  237. if err := cc.resetAddrConn(a, false, nil); err != nil {
  238. waitC <- err
  239. return
  240. }
  241. }
  242. close(waitC)
  243. }()
  244. var timeoutCh <-chan time.Time
  245. if cc.dopts.timeout > 0 {
  246. timeoutCh = time.After(cc.dopts.timeout)
  247. }
  248. select {
  249. case err := <-waitC:
  250. if err != nil {
  251. cc.Close()
  252. return nil, err
  253. }
  254. case <-cc.ctx.Done():
  255. cc.Close()
  256. return nil, cc.ctx.Err()
  257. case <-timeoutCh:
  258. cc.Close()
  259. return nil, ErrClientConnTimeout
  260. }
  261. // If balancer is nil or balancer.Notify() is nil, ok will be false here.
  262. // The lbWatcher goroutine will not be created.
  263. if ok {
  264. go cc.lbWatcher()
  265. }
  266. colonPos := strings.LastIndex(target, ":")
  267. if colonPos == -1 {
  268. colonPos = len(target)
  269. }
  270. cc.authority = target[:colonPos]
  271. return cc, nil
  272. }
  273. // ConnectivityState indicates the state of a client connection.
  274. type ConnectivityState int
  275. const (
  276. // Idle indicates the ClientConn is idle.
  277. Idle ConnectivityState = iota
  278. // Connecting indicates the ClienConn is connecting.
  279. Connecting
  280. // Ready indicates the ClientConn is ready for work.
  281. Ready
  282. // TransientFailure indicates the ClientConn has seen a failure but expects to recover.
  283. TransientFailure
  284. // Shutdown indicates the ClientConn has started shutting down.
  285. Shutdown
  286. )
  287. func (s ConnectivityState) String() string {
  288. switch s {
  289. case Idle:
  290. return "IDLE"
  291. case Connecting:
  292. return "CONNECTING"
  293. case Ready:
  294. return "READY"
  295. case TransientFailure:
  296. return "TRANSIENT_FAILURE"
  297. case Shutdown:
  298. return "SHUTDOWN"
  299. default:
  300. panic(fmt.Sprintf("unknown connectivity state: %d", s))
  301. }
  302. }
  303. // ClientConn represents a client connection to an RPC server.
  304. type ClientConn struct {
  305. ctx context.Context
  306. cancel context.CancelFunc
  307. target string
  308. authority string
  309. dopts dialOptions
  310. mu sync.RWMutex
  311. conns map[Address]*addrConn
  312. }
  313. func (cc *ClientConn) lbWatcher() {
  314. for addrs := range cc.dopts.balancer.Notify() {
  315. var (
  316. add []Address // Addresses need to setup connections.
  317. del []*addrConn // Connections need to tear down.
  318. )
  319. cc.mu.Lock()
  320. for _, a := range addrs {
  321. if _, ok := cc.conns[a]; !ok {
  322. add = append(add, a)
  323. }
  324. }
  325. for k, c := range cc.conns {
  326. var keep bool
  327. for _, a := range addrs {
  328. if k == a {
  329. keep = true
  330. break
  331. }
  332. }
  333. if !keep {
  334. del = append(del, c)
  335. delete(cc.conns, c.addr)
  336. }
  337. }
  338. cc.mu.Unlock()
  339. for _, a := range add {
  340. cc.resetAddrConn(a, true, nil)
  341. }
  342. for _, c := range del {
  343. c.tearDown(errConnDrain)
  344. }
  345. }
  346. }
  347. // resetAddrConn creates an addrConn for addr and adds it to cc.conns.
  348. // If there is an old addrConn for addr, it will be torn down, using tearDownErr as the reason.
  349. // If tearDownErr is nil, errConnDrain will be used instead.
  350. func (cc *ClientConn) resetAddrConn(addr Address, skipWait bool, tearDownErr error) error {
  351. ac := &addrConn{
  352. cc: cc,
  353. addr: addr,
  354. dopts: cc.dopts,
  355. }
  356. ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
  357. ac.stateCV = sync.NewCond(&ac.mu)
  358. if EnableTracing {
  359. ac.events = trace.NewEventLog("grpc.ClientConn", ac.addr.Addr)
  360. }
  361. if !ac.dopts.insecure {
  362. if ac.dopts.copts.TransportCredentials == nil {
  363. return errNoTransportSecurity
  364. }
  365. } else {
  366. if ac.dopts.copts.TransportCredentials != nil {
  367. return errCredentialsConflict
  368. }
  369. for _, cd := range ac.dopts.copts.PerRPCCredentials {
  370. if cd.RequireTransportSecurity() {
  371. return errTransportCredentialsMissing
  372. }
  373. }
  374. }
  375. // Track ac in cc. This needs to be done before any getTransport(...) is called.
  376. cc.mu.Lock()
  377. if cc.conns == nil {
  378. cc.mu.Unlock()
  379. return ErrClientConnClosing
  380. }
  381. stale := cc.conns[ac.addr]
  382. cc.conns[ac.addr] = ac
  383. cc.mu.Unlock()
  384. if stale != nil {
  385. // There is an addrConn alive on ac.addr already. This could be due to
  386. // 1) a buggy Balancer notifies duplicated Addresses;
  387. // 2) goaway was received, a new ac will replace the old ac.
  388. // The old ac should be deleted from cc.conns, but the
  389. // underlying transport should drain rather than close.
  390. if tearDownErr == nil {
  391. // tearDownErr is nil if resetAddrConn is called by
  392. // 1) Dial
  393. // 2) lbWatcher
  394. // In both cases, the stale ac should drain, not close.
  395. stale.tearDown(errConnDrain)
  396. } else {
  397. stale.tearDown(tearDownErr)
  398. }
  399. }
  400. // skipWait may overwrite the decision in ac.dopts.block.
  401. if ac.dopts.block && !skipWait {
  402. if err := ac.resetTransport(false); err != nil {
  403. if err != errConnClosing {
  404. // Tear down ac and delete it from cc.conns.
  405. cc.mu.Lock()
  406. delete(cc.conns, ac.addr)
  407. cc.mu.Unlock()
  408. ac.tearDown(err)
  409. }
  410. if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
  411. return e.Origin()
  412. }
  413. return err
  414. }
  415. // Start to monitor the error status of transport.
  416. go ac.transportMonitor()
  417. } else {
  418. // Start a goroutine connecting to the server asynchronously.
  419. go func() {
  420. if err := ac.resetTransport(false); err != nil {
  421. grpclog.Printf("Failed to dial %s: %v; please retry.", ac.addr.Addr, err)
  422. if err != errConnClosing {
  423. // Keep this ac in cc.conns, to get the reason it's torn down.
  424. ac.tearDown(err)
  425. }
  426. return
  427. }
  428. ac.transportMonitor()
  429. }()
  430. }
  431. return nil
  432. }
  433. func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) {
  434. var (
  435. ac *addrConn
  436. ok bool
  437. put func()
  438. )
  439. if cc.dopts.balancer == nil {
  440. // If balancer is nil, there should be only one addrConn available.
  441. cc.mu.RLock()
  442. for _, ac = range cc.conns {
  443. // Break after the first iteration to get the first addrConn.
  444. ok = true
  445. break
  446. }
  447. cc.mu.RUnlock()
  448. } else {
  449. var (
  450. addr Address
  451. err error
  452. )
  453. addr, put, err = cc.dopts.balancer.Get(ctx, opts)
  454. if err != nil {
  455. return nil, nil, toRPCErr(err)
  456. }
  457. cc.mu.RLock()
  458. if cc.conns == nil {
  459. cc.mu.RUnlock()
  460. return nil, nil, toRPCErr(ErrClientConnClosing)
  461. }
  462. ac, ok = cc.conns[addr]
  463. cc.mu.RUnlock()
  464. }
  465. if !ok {
  466. if put != nil {
  467. put()
  468. }
  469. return nil, nil, errConnClosing
  470. }
  471. // ac.wait should block on transient failure only if balancer is nil and RPC is non-failfast.
  472. // - If RPC is failfast, ac.wait should not block.
  473. // - If balancer is not nil, ac.wait should return errConnClosing on transient failure
  474. // so that non-failfast RPCs will try to get a new transport instead of waiting on ac.
  475. t, err := ac.wait(ctx, cc.dopts.balancer == nil && opts.BlockingWait)
  476. if err != nil {
  477. if put != nil {
  478. put()
  479. }
  480. return nil, nil, err
  481. }
  482. return t, put, nil
  483. }
  484. // Close tears down the ClientConn and all underlying connections.
  485. func (cc *ClientConn) Close() error {
  486. cc.cancel()
  487. cc.mu.Lock()
  488. if cc.conns == nil {
  489. cc.mu.Unlock()
  490. return ErrClientConnClosing
  491. }
  492. conns := cc.conns
  493. cc.conns = nil
  494. cc.mu.Unlock()
  495. if cc.dopts.balancer != nil {
  496. cc.dopts.balancer.Close()
  497. }
  498. for _, ac := range conns {
  499. ac.tearDown(ErrClientConnClosing)
  500. }
  501. return nil
  502. }
  503. // addrConn is a network connection to a given address.
  504. type addrConn struct {
  505. ctx context.Context
  506. cancel context.CancelFunc
  507. cc *ClientConn
  508. addr Address
  509. dopts dialOptions
  510. events trace.EventLog
  511. mu sync.Mutex
  512. state ConnectivityState
  513. stateCV *sync.Cond
  514. down func(error) // the handler called when a connection is down.
  515. // ready is closed and becomes nil when a new transport is up or failed
  516. // due to timeout.
  517. ready chan struct{}
  518. transport transport.ClientTransport
  519. // The reason this addrConn is torn down.
  520. tearDownErr error
  521. }
  522. // printf records an event in ac's event log, unless ac has been closed.
  523. // REQUIRES ac.mu is held.
  524. func (ac *addrConn) printf(format string, a ...interface{}) {
  525. if ac.events != nil {
  526. ac.events.Printf(format, a...)
  527. }
  528. }
  529. // errorf records an error in ac's event log, unless ac has been closed.
  530. // REQUIRES ac.mu is held.
  531. func (ac *addrConn) errorf(format string, a ...interface{}) {
  532. if ac.events != nil {
  533. ac.events.Errorf(format, a...)
  534. }
  535. }
  536. // getState returns the connectivity state of the Conn
  537. func (ac *addrConn) getState() ConnectivityState {
  538. ac.mu.Lock()
  539. defer ac.mu.Unlock()
  540. return ac.state
  541. }
  542. // waitForStateChange blocks until the state changes to something other than the sourceState.
  543. func (ac *addrConn) waitForStateChange(ctx context.Context, sourceState ConnectivityState) (ConnectivityState, error) {
  544. ac.mu.Lock()
  545. defer ac.mu.Unlock()
  546. if sourceState != ac.state {
  547. return ac.state, nil
  548. }
  549. done := make(chan struct{})
  550. var err error
  551. go func() {
  552. select {
  553. case <-ctx.Done():
  554. ac.mu.Lock()
  555. err = ctx.Err()
  556. ac.stateCV.Broadcast()
  557. ac.mu.Unlock()
  558. case <-done:
  559. }
  560. }()
  561. defer close(done)
  562. for sourceState == ac.state {
  563. ac.stateCV.Wait()
  564. if err != nil {
  565. return ac.state, err
  566. }
  567. }
  568. return ac.state, nil
  569. }
  570. func (ac *addrConn) resetTransport(closeTransport bool) error {
  571. for retries := 0; ; retries++ {
  572. ac.mu.Lock()
  573. ac.printf("connecting")
  574. if ac.state == Shutdown {
  575. // ac.tearDown(...) has been invoked.
  576. ac.mu.Unlock()
  577. return errConnClosing
  578. }
  579. if ac.down != nil {
  580. ac.down(downErrorf(false, true, "%v", errNetworkIO))
  581. ac.down = nil
  582. }
  583. ac.state = Connecting
  584. ac.stateCV.Broadcast()
  585. t := ac.transport
  586. ac.mu.Unlock()
  587. if closeTransport && t != nil {
  588. t.Close()
  589. }
  590. sleepTime := ac.dopts.bs.backoff(retries)
  591. timeout := minConnectTimeout
  592. if timeout < sleepTime {
  593. timeout = sleepTime
  594. }
  595. ctx, cancel := context.WithTimeout(ac.ctx, timeout)
  596. connectTime := time.Now()
  597. newTransport, err := transport.NewClientTransport(ctx, ac.addr.Addr, ac.dopts.copts)
  598. if err != nil {
  599. cancel()
  600. if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
  601. return err
  602. }
  603. grpclog.Printf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %q", err, ac.addr)
  604. ac.mu.Lock()
  605. if ac.state == Shutdown {
  606. // ac.tearDown(...) has been invoked.
  607. ac.mu.Unlock()
  608. return errConnClosing
  609. }
  610. ac.errorf("transient failure: %v", err)
  611. ac.state = TransientFailure
  612. ac.stateCV.Broadcast()
  613. if ac.ready != nil {
  614. close(ac.ready)
  615. ac.ready = nil
  616. }
  617. ac.mu.Unlock()
  618. closeTransport = false
  619. select {
  620. case <-time.After(sleepTime - time.Since(connectTime)):
  621. case <-ac.ctx.Done():
  622. return ac.ctx.Err()
  623. }
  624. continue
  625. }
  626. ac.mu.Lock()
  627. ac.printf("ready")
  628. if ac.state == Shutdown {
  629. // ac.tearDown(...) has been invoked.
  630. ac.mu.Unlock()
  631. newTransport.Close()
  632. return errConnClosing
  633. }
  634. ac.state = Ready
  635. ac.stateCV.Broadcast()
  636. ac.transport = newTransport
  637. if ac.ready != nil {
  638. close(ac.ready)
  639. ac.ready = nil
  640. }
  641. if ac.cc.dopts.balancer != nil {
  642. ac.down = ac.cc.dopts.balancer.Up(ac.addr)
  643. }
  644. ac.mu.Unlock()
  645. return nil
  646. }
  647. }
  648. // Run in a goroutine to track the error in transport and create the
  649. // new transport if an error happens. It returns when the channel is closing.
  650. func (ac *addrConn) transportMonitor() {
  651. for {
  652. ac.mu.Lock()
  653. t := ac.transport
  654. ac.mu.Unlock()
  655. select {
  656. // This is needed to detect the teardown when
  657. // the addrConn is idle (i.e., no RPC in flight).
  658. case <-ac.ctx.Done():
  659. select {
  660. case <-t.Error():
  661. t.Close()
  662. default:
  663. }
  664. return
  665. case <-t.GoAway():
  666. // If GoAway happens without any network I/O error, ac is closed without shutting down the
  667. // underlying transport (the transport will be closed when all the pending RPCs finished or
  668. // failed.).
  669. // If GoAway and some network I/O error happen concurrently, ac and its underlying transport
  670. // are closed.
  671. // In both cases, a new ac is created.
  672. select {
  673. case <-t.Error():
  674. ac.cc.resetAddrConn(ac.addr, true, errNetworkIO)
  675. default:
  676. ac.cc.resetAddrConn(ac.addr, true, errConnDrain)
  677. }
  678. return
  679. case <-t.Error():
  680. select {
  681. case <-ac.ctx.Done():
  682. t.Close()
  683. return
  684. case <-t.GoAway():
  685. ac.cc.resetAddrConn(ac.addr, true, errNetworkIO)
  686. return
  687. default:
  688. }
  689. ac.mu.Lock()
  690. if ac.state == Shutdown {
  691. // ac has been shutdown.
  692. ac.mu.Unlock()
  693. return
  694. }
  695. ac.state = TransientFailure
  696. ac.stateCV.Broadcast()
  697. ac.mu.Unlock()
  698. if err := ac.resetTransport(true); err != nil {
  699. ac.mu.Lock()
  700. ac.printf("transport exiting: %v", err)
  701. ac.mu.Unlock()
  702. grpclog.Printf("grpc: addrConn.transportMonitor exits due to: %v", err)
  703. if err != errConnClosing {
  704. // Keep this ac in cc.conns, to get the reason it's torn down.
  705. ac.tearDown(err)
  706. }
  707. return
  708. }
  709. }
  710. }
  711. }
  712. // wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or
  713. // iv) transport is in TransientFailure and blocking is false.
  714. func (ac *addrConn) wait(ctx context.Context, blocking bool) (transport.ClientTransport, error) {
  715. for {
  716. ac.mu.Lock()
  717. switch {
  718. case ac.state == Shutdown:
  719. err := ac.tearDownErr
  720. ac.mu.Unlock()
  721. return nil, err
  722. case ac.state == Ready:
  723. ct := ac.transport
  724. ac.mu.Unlock()
  725. return ct, nil
  726. case ac.state == TransientFailure && !blocking:
  727. ac.mu.Unlock()
  728. return nil, errConnClosing
  729. default:
  730. ready := ac.ready
  731. if ready == nil {
  732. ready = make(chan struct{})
  733. ac.ready = ready
  734. }
  735. ac.mu.Unlock()
  736. select {
  737. case <-ctx.Done():
  738. return nil, toRPCErr(ctx.Err())
  739. // Wait until the new transport is ready or failed.
  740. case <-ready:
  741. }
  742. }
  743. }
  744. }
  745. // tearDown starts to tear down the addrConn.
  746. // TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
  747. // some edge cases (e.g., the caller opens and closes many addrConn's in a
  748. // tight loop.
  749. // tearDown doesn't remove ac from ac.cc.conns.
  750. func (ac *addrConn) tearDown(err error) {
  751. ac.cancel()
  752. ac.mu.Lock()
  753. defer ac.mu.Unlock()
  754. if ac.down != nil {
  755. ac.down(downErrorf(false, false, "%v", err))
  756. ac.down = nil
  757. }
  758. if err == errConnDrain && ac.transport != nil {
  759. // GracefulClose(...) may be executed multiple times when
  760. // i) receiving multiple GoAway frames from the server; or
  761. // ii) there are concurrent name resolver/Balancer triggered
  762. // address removal and GoAway.
  763. ac.transport.GracefulClose()
  764. }
  765. if ac.state == Shutdown {
  766. return
  767. }
  768. ac.state = Shutdown
  769. ac.tearDownErr = err
  770. ac.stateCV.Broadcast()
  771. if ac.events != nil {
  772. ac.events.Finish()
  773. ac.events = nil
  774. }
  775. if ac.ready != nil {
  776. close(ac.ready)
  777. ac.ready = nil
  778. }
  779. if ac.transport != nil && err != errConnDrain {
  780. ac.transport.Close()
  781. }
  782. return
  783. }