clientconn.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845
  1. /*
  2. *
  3. * Copyright 2014, Google Inc.
  4. * All rights reserved.
  5. *
  6. * Redistribution and use in source and binary forms, with or without
  7. * modification, are permitted provided that the following conditions are
  8. * met:
  9. *
  10. * * Redistributions of source code must retain the above copyright
  11. * notice, this list of conditions and the following disclaimer.
  12. * * Redistributions in binary form must reproduce the above
  13. * copyright notice, this list of conditions and the following disclaimer
  14. * in the documentation and/or other materials provided with the
  15. * distribution.
  16. * * Neither the name of Google Inc. nor the names of its
  17. * contributors may be used to endorse or promote products derived from
  18. * this software without specific prior written permission.
  19. *
  20. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  21. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  22. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  23. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  24. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  25. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  26. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  27. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  28. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  29. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  30. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  31. *
  32. */
  33. package grpc
  34. import (
  35. "errors"
  36. "fmt"
  37. "net"
  38. "strings"
  39. "sync"
  40. "time"
  41. "golang.org/x/net/context"
  42. "golang.org/x/net/trace"
  43. "google.golang.org/grpc/credentials"
  44. "google.golang.org/grpc/grpclog"
  45. "google.golang.org/grpc/transport"
  46. )
  47. var (
  48. // ErrClientConnClosing indicates that the operation is illegal because
  49. // the ClientConn is closing.
  50. ErrClientConnClosing = errors.New("grpc: the client connection is closing")
  51. // ErrClientConnTimeout indicates that the ClientConn cannot establish the
  52. // underlying connections within the specified timeout.
  53. ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
  54. // errNoTransportSecurity indicates that there is no transport security
  55. // being set for ClientConn. Users should either set one or explicitly
  56. // call WithInsecure DialOption to disable security.
  57. errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")
  58. // errTransportCredentialsMissing indicates that users want to transmit security
  59. // information (e.g., oauth2 token) which requires secure connection on an insecure
  60. // connection.
  61. errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
  62. // errCredentialsConflict indicates that grpc.WithTransportCredentials()
  63. // and grpc.WithInsecure() are both called for a connection.
  64. errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
  65. // errNetworkIO indicates that the connection is down due to some network I/O error.
  66. errNetworkIO = errors.New("grpc: failed with network I/O error")
  67. // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
  68. errConnDrain = errors.New("grpc: the connection is drained")
  69. // errConnClosing indicates that the connection is closing.
  70. errConnClosing = errors.New("grpc: the connection is closing")
  71. // errConnUnavailable indicates that the connection is unavailable.
  72. errConnUnavailable = errors.New("grpc: the connection is unavailable")
  73. errNoAddr = errors.New("grpc: there is no address available to dial")
  74. // minimum time to give a connection to complete
  75. minConnectTimeout = 20 * time.Second
  76. )
  77. // dialOptions configure a Dial call. dialOptions are set by the DialOption
  78. // values passed to Dial.
  79. type dialOptions struct {
  80. codec Codec
  81. cp Compressor
  82. dc Decompressor
  83. bs backoffStrategy
  84. balancer Balancer
  85. block bool
  86. insecure bool
  87. timeout time.Duration
  88. copts transport.ConnectOptions
  89. }
  90. // DialOption configures how we set up the connection.
  91. type DialOption func(*dialOptions)
  92. // WithCodec returns a DialOption which sets a codec for message marshaling and unmarshaling.
  93. func WithCodec(c Codec) DialOption {
  94. return func(o *dialOptions) {
  95. o.codec = c
  96. }
  97. }
  98. // WithCompressor returns a DialOption which sets a CompressorGenerator for generating message
  99. // compressor.
  100. func WithCompressor(cp Compressor) DialOption {
  101. return func(o *dialOptions) {
  102. o.cp = cp
  103. }
  104. }
  105. // WithDecompressor returns a DialOption which sets a DecompressorGenerator for generating
  106. // message decompressor.
  107. func WithDecompressor(dc Decompressor) DialOption {
  108. return func(o *dialOptions) {
  109. o.dc = dc
  110. }
  111. }
  112. // WithBalancer returns a DialOption which sets a load balancer.
  113. func WithBalancer(b Balancer) DialOption {
  114. return func(o *dialOptions) {
  115. o.balancer = b
  116. }
  117. }
  118. // WithBackoffMaxDelay configures the dialer to use the provided maximum delay
  119. // when backing off after failed connection attempts.
  120. func WithBackoffMaxDelay(md time.Duration) DialOption {
  121. return WithBackoffConfig(BackoffConfig{MaxDelay: md})
  122. }
  123. // WithBackoffConfig configures the dialer to use the provided backoff
  124. // parameters after connection failures.
  125. //
  126. // Use WithBackoffMaxDelay until more parameters on BackoffConfig are opened up
  127. // for use.
  128. func WithBackoffConfig(b BackoffConfig) DialOption {
  129. // Set defaults to ensure that provided BackoffConfig is valid and
  130. // unexported fields get default values.
  131. setDefaults(&b)
  132. return withBackoff(b)
  133. }
  134. // withBackoff sets the backoff strategy used for retries after a
  135. // failed connection attempt.
  136. //
  137. // This can be exported if arbitrary backoff strategies are allowed by gRPC.
  138. func withBackoff(bs backoffStrategy) DialOption {
  139. return func(o *dialOptions) {
  140. o.bs = bs
  141. }
  142. }
  143. // WithBlock returns a DialOption which makes caller of Dial blocks until the underlying
  144. // connection is up. Without this, Dial returns immediately and connecting the server
  145. // happens in background.
  146. func WithBlock() DialOption {
  147. return func(o *dialOptions) {
  148. o.block = true
  149. }
  150. }
  151. // WithInsecure returns a DialOption which disables transport security for this ClientConn.
  152. // Note that transport security is required unless WithInsecure is set.
  153. func WithInsecure() DialOption {
  154. return func(o *dialOptions) {
  155. o.insecure = true
  156. }
  157. }
  158. // WithTransportCredentials returns a DialOption which configures a
  159. // connection level security credentials (e.g., TLS/SSL).
  160. func WithTransportCredentials(creds credentials.TransportCredentials) DialOption {
  161. return func(o *dialOptions) {
  162. o.copts.TransportCredentials = creds
  163. }
  164. }
  165. // WithPerRPCCredentials returns a DialOption which sets
  166. // credentials which will place auth state on each outbound RPC.
  167. func WithPerRPCCredentials(creds credentials.PerRPCCredentials) DialOption {
  168. return func(o *dialOptions) {
  169. o.copts.PerRPCCredentials = append(o.copts.PerRPCCredentials, creds)
  170. }
  171. }
  172. // WithTimeout returns a DialOption that configures a timeout for dialing a ClientConn
  173. // initially. This is valid if and only if WithBlock() is present.
  174. func WithTimeout(d time.Duration) DialOption {
  175. return func(o *dialOptions) {
  176. o.timeout = d
  177. }
  178. }
  179. // WithDialer returns a DialOption that specifies a function to use for dialing network addresses.
  180. func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption {
  181. return func(o *dialOptions) {
  182. o.copts.Dialer = func(ctx context.Context, addr string) (net.Conn, error) {
  183. if deadline, ok := ctx.Deadline(); ok {
  184. return f(addr, deadline.Sub(time.Now()))
  185. }
  186. return f(addr, 0)
  187. }
  188. }
  189. }
  190. // WithUserAgent returns a DialOption that specifies a user agent string for all the RPCs.
  191. func WithUserAgent(s string) DialOption {
  192. return func(o *dialOptions) {
  193. o.copts.UserAgent = s
  194. }
  195. }
  196. // Dial creates a client connection to the given target.
  197. func Dial(target string, opts ...DialOption) (*ClientConn, error) {
  198. return DialContext(context.Background(), target, opts...)
  199. }
  200. // DialContext creates a client connection to the given target
  201. // using the supplied context.
  202. func DialContext(ctx context.Context, target string, opts ...DialOption) (*ClientConn, error) {
  203. cc := &ClientConn{
  204. target: target,
  205. conns: make(map[Address]*addrConn),
  206. }
  207. cc.ctx, cc.cancel = context.WithCancel(ctx)
  208. for _, opt := range opts {
  209. opt(&cc.dopts)
  210. }
  211. // Set defaults.
  212. if cc.dopts.codec == nil {
  213. cc.dopts.codec = protoCodec{}
  214. }
  215. if cc.dopts.bs == nil {
  216. cc.dopts.bs = DefaultBackoffConfig
  217. }
  218. var (
  219. ok bool
  220. addrs []Address
  221. )
  222. if cc.dopts.balancer == nil {
  223. // Connect to target directly if balancer is nil.
  224. addrs = append(addrs, Address{Addr: target})
  225. } else {
  226. if err := cc.dopts.balancer.Start(target); err != nil {
  227. return nil, err
  228. }
  229. ch := cc.dopts.balancer.Notify()
  230. if ch == nil {
  231. // There is no name resolver installed.
  232. addrs = append(addrs, Address{Addr: target})
  233. } else {
  234. addrs, ok = <-ch
  235. if !ok || len(addrs) == 0 {
  236. return nil, errNoAddr
  237. }
  238. }
  239. }
  240. waitC := make(chan error, 1)
  241. go func() {
  242. for _, a := range addrs {
  243. if err := cc.resetAddrConn(a, false, nil); err != nil {
  244. waitC <- err
  245. return
  246. }
  247. }
  248. close(waitC)
  249. }()
  250. var timeoutCh <-chan time.Time
  251. if cc.dopts.timeout > 0 {
  252. timeoutCh = time.After(cc.dopts.timeout)
  253. }
  254. select {
  255. case err := <-waitC:
  256. if err != nil {
  257. cc.Close()
  258. return nil, err
  259. }
  260. case <-cc.ctx.Done():
  261. cc.Close()
  262. return nil, cc.ctx.Err()
  263. case <-timeoutCh:
  264. cc.Close()
  265. return nil, ErrClientConnTimeout
  266. }
  267. // If balancer is nil or balancer.Notify() is nil, ok will be false here.
  268. // The lbWatcher goroutine will not be created.
  269. if ok {
  270. go cc.lbWatcher()
  271. }
  272. colonPos := strings.LastIndex(target, ":")
  273. if colonPos == -1 {
  274. colonPos = len(target)
  275. }
  276. cc.authority = target[:colonPos]
  277. return cc, nil
  278. }
  279. // ConnectivityState indicates the state of a client connection.
  280. type ConnectivityState int
  281. const (
  282. // Idle indicates the ClientConn is idle.
  283. Idle ConnectivityState = iota
  284. // Connecting indicates the ClienConn is connecting.
  285. Connecting
  286. // Ready indicates the ClientConn is ready for work.
  287. Ready
  288. // TransientFailure indicates the ClientConn has seen a failure but expects to recover.
  289. TransientFailure
  290. // Shutdown indicates the ClientConn has started shutting down.
  291. Shutdown
  292. )
  293. func (s ConnectivityState) String() string {
  294. switch s {
  295. case Idle:
  296. return "IDLE"
  297. case Connecting:
  298. return "CONNECTING"
  299. case Ready:
  300. return "READY"
  301. case TransientFailure:
  302. return "TRANSIENT_FAILURE"
  303. case Shutdown:
  304. return "SHUTDOWN"
  305. default:
  306. panic(fmt.Sprintf("unknown connectivity state: %d", s))
  307. }
  308. }
  309. // ClientConn represents a client connection to an RPC server.
  310. type ClientConn struct {
  311. ctx context.Context
  312. cancel context.CancelFunc
  313. target string
  314. authority string
  315. dopts dialOptions
  316. mu sync.RWMutex
  317. conns map[Address]*addrConn
  318. }
  319. func (cc *ClientConn) lbWatcher() {
  320. for addrs := range cc.dopts.balancer.Notify() {
  321. var (
  322. add []Address // Addresses need to setup connections.
  323. del []*addrConn // Connections need to tear down.
  324. )
  325. cc.mu.Lock()
  326. for _, a := range addrs {
  327. if _, ok := cc.conns[a]; !ok {
  328. add = append(add, a)
  329. }
  330. }
  331. for k, c := range cc.conns {
  332. var keep bool
  333. for _, a := range addrs {
  334. if k == a {
  335. keep = true
  336. break
  337. }
  338. }
  339. if !keep {
  340. del = append(del, c)
  341. delete(cc.conns, c.addr)
  342. }
  343. }
  344. cc.mu.Unlock()
  345. for _, a := range add {
  346. cc.resetAddrConn(a, true, nil)
  347. }
  348. for _, c := range del {
  349. c.tearDown(errConnDrain)
  350. }
  351. }
  352. }
  353. // resetAddrConn creates an addrConn for addr and adds it to cc.conns.
  354. // If there is an old addrConn for addr, it will be torn down, using tearDownErr as the reason.
  355. // If tearDownErr is nil, errConnDrain will be used instead.
  356. func (cc *ClientConn) resetAddrConn(addr Address, skipWait bool, tearDownErr error) error {
  357. ac := &addrConn{
  358. cc: cc,
  359. addr: addr,
  360. dopts: cc.dopts,
  361. }
  362. ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
  363. ac.stateCV = sync.NewCond(&ac.mu)
  364. if EnableTracing {
  365. ac.events = trace.NewEventLog("grpc.ClientConn", ac.addr.Addr)
  366. }
  367. if !ac.dopts.insecure {
  368. if ac.dopts.copts.TransportCredentials == nil {
  369. return errNoTransportSecurity
  370. }
  371. } else {
  372. if ac.dopts.copts.TransportCredentials != nil {
  373. return errCredentialsConflict
  374. }
  375. for _, cd := range ac.dopts.copts.PerRPCCredentials {
  376. if cd.RequireTransportSecurity() {
  377. return errTransportCredentialsMissing
  378. }
  379. }
  380. }
  381. // Track ac in cc. This needs to be done before any getTransport(...) is called.
  382. cc.mu.Lock()
  383. if cc.conns == nil {
  384. cc.mu.Unlock()
  385. return ErrClientConnClosing
  386. }
  387. stale := cc.conns[ac.addr]
  388. cc.conns[ac.addr] = ac
  389. cc.mu.Unlock()
  390. if stale != nil {
  391. // There is an addrConn alive on ac.addr already. This could be due to
  392. // 1) a buggy Balancer notifies duplicated Addresses;
  393. // 2) goaway was received, a new ac will replace the old ac.
  394. // The old ac should be deleted from cc.conns, but the
  395. // underlying transport should drain rather than close.
  396. if tearDownErr == nil {
  397. // tearDownErr is nil if resetAddrConn is called by
  398. // 1) Dial
  399. // 2) lbWatcher
  400. // In both cases, the stale ac should drain, not close.
  401. stale.tearDown(errConnDrain)
  402. } else {
  403. stale.tearDown(tearDownErr)
  404. }
  405. }
  406. // skipWait may overwrite the decision in ac.dopts.block.
  407. if ac.dopts.block && !skipWait {
  408. if err := ac.resetTransport(false); err != nil {
  409. if err != errConnClosing {
  410. // Tear down ac and delete it from cc.conns.
  411. cc.mu.Lock()
  412. delete(cc.conns, ac.addr)
  413. cc.mu.Unlock()
  414. ac.tearDown(err)
  415. }
  416. if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
  417. return e.Origin()
  418. }
  419. return err
  420. }
  421. // Start to monitor the error status of transport.
  422. go ac.transportMonitor()
  423. } else {
  424. // Start a goroutine connecting to the server asynchronously.
  425. go func() {
  426. if err := ac.resetTransport(false); err != nil {
  427. grpclog.Printf("Failed to dial %s: %v; please retry.", ac.addr.Addr, err)
  428. if err != errConnClosing {
  429. // Keep this ac in cc.conns, to get the reason it's torn down.
  430. ac.tearDown(err)
  431. }
  432. return
  433. }
  434. ac.transportMonitor()
  435. }()
  436. }
  437. return nil
  438. }
  439. func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) {
  440. var (
  441. ac *addrConn
  442. ok bool
  443. put func()
  444. )
  445. if cc.dopts.balancer == nil {
  446. // If balancer is nil, there should be only one addrConn available.
  447. cc.mu.RLock()
  448. if cc.conns == nil {
  449. cc.mu.RUnlock()
  450. return nil, nil, toRPCErr(ErrClientConnClosing)
  451. }
  452. for _, ac = range cc.conns {
  453. // Break after the first iteration to get the first addrConn.
  454. ok = true
  455. break
  456. }
  457. cc.mu.RUnlock()
  458. } else {
  459. var (
  460. addr Address
  461. err error
  462. )
  463. addr, put, err = cc.dopts.balancer.Get(ctx, opts)
  464. if err != nil {
  465. return nil, nil, toRPCErr(err)
  466. }
  467. cc.mu.RLock()
  468. if cc.conns == nil {
  469. cc.mu.RUnlock()
  470. return nil, nil, toRPCErr(ErrClientConnClosing)
  471. }
  472. ac, ok = cc.conns[addr]
  473. cc.mu.RUnlock()
  474. }
  475. if !ok {
  476. if put != nil {
  477. put()
  478. }
  479. return nil, nil, errConnClosing
  480. }
  481. t, err := ac.wait(ctx, cc.dopts.balancer != nil, !opts.BlockingWait)
  482. if err != nil {
  483. if put != nil {
  484. put()
  485. }
  486. return nil, nil, err
  487. }
  488. return t, put, nil
  489. }
  490. // Close tears down the ClientConn and all underlying connections.
  491. func (cc *ClientConn) Close() error {
  492. cc.cancel()
  493. cc.mu.Lock()
  494. if cc.conns == nil {
  495. cc.mu.Unlock()
  496. return ErrClientConnClosing
  497. }
  498. conns := cc.conns
  499. cc.conns = nil
  500. cc.mu.Unlock()
  501. if cc.dopts.balancer != nil {
  502. cc.dopts.balancer.Close()
  503. }
  504. for _, ac := range conns {
  505. ac.tearDown(ErrClientConnClosing)
  506. }
  507. return nil
  508. }
  509. // addrConn is a network connection to a given address.
  510. type addrConn struct {
  511. ctx context.Context
  512. cancel context.CancelFunc
  513. cc *ClientConn
  514. addr Address
  515. dopts dialOptions
  516. events trace.EventLog
  517. mu sync.Mutex
  518. state ConnectivityState
  519. stateCV *sync.Cond
  520. down func(error) // the handler called when a connection is down.
  521. // ready is closed and becomes nil when a new transport is up or failed
  522. // due to timeout.
  523. ready chan struct{}
  524. transport transport.ClientTransport
  525. // The reason this addrConn is torn down.
  526. tearDownErr error
  527. }
  528. // printf records an event in ac's event log, unless ac has been closed.
  529. // REQUIRES ac.mu is held.
  530. func (ac *addrConn) printf(format string, a ...interface{}) {
  531. if ac.events != nil {
  532. ac.events.Printf(format, a...)
  533. }
  534. }
  535. // errorf records an error in ac's event log, unless ac has been closed.
  536. // REQUIRES ac.mu is held.
  537. func (ac *addrConn) errorf(format string, a ...interface{}) {
  538. if ac.events != nil {
  539. ac.events.Errorf(format, a...)
  540. }
  541. }
  542. // getState returns the connectivity state of the Conn
  543. func (ac *addrConn) getState() ConnectivityState {
  544. ac.mu.Lock()
  545. defer ac.mu.Unlock()
  546. return ac.state
  547. }
  548. // waitForStateChange blocks until the state changes to something other than the sourceState.
  549. func (ac *addrConn) waitForStateChange(ctx context.Context, sourceState ConnectivityState) (ConnectivityState, error) {
  550. ac.mu.Lock()
  551. defer ac.mu.Unlock()
  552. if sourceState != ac.state {
  553. return ac.state, nil
  554. }
  555. done := make(chan struct{})
  556. var err error
  557. go func() {
  558. select {
  559. case <-ctx.Done():
  560. ac.mu.Lock()
  561. err = ctx.Err()
  562. ac.stateCV.Broadcast()
  563. ac.mu.Unlock()
  564. case <-done:
  565. }
  566. }()
  567. defer close(done)
  568. for sourceState == ac.state {
  569. ac.stateCV.Wait()
  570. if err != nil {
  571. return ac.state, err
  572. }
  573. }
  574. return ac.state, nil
  575. }
  576. func (ac *addrConn) resetTransport(closeTransport bool) error {
  577. for retries := 0; ; retries++ {
  578. ac.mu.Lock()
  579. ac.printf("connecting")
  580. if ac.state == Shutdown {
  581. // ac.tearDown(...) has been invoked.
  582. ac.mu.Unlock()
  583. return errConnClosing
  584. }
  585. if ac.down != nil {
  586. ac.down(downErrorf(false, true, "%v", errNetworkIO))
  587. ac.down = nil
  588. }
  589. ac.state = Connecting
  590. ac.stateCV.Broadcast()
  591. t := ac.transport
  592. ac.mu.Unlock()
  593. if closeTransport && t != nil {
  594. t.Close()
  595. }
  596. sleepTime := ac.dopts.bs.backoff(retries)
  597. timeout := minConnectTimeout
  598. if timeout < sleepTime {
  599. timeout = sleepTime
  600. }
  601. ctx, cancel := context.WithTimeout(ac.ctx, timeout)
  602. connectTime := time.Now()
  603. newTransport, err := transport.NewClientTransport(ctx, ac.addr.Addr, ac.dopts.copts)
  604. if err != nil {
  605. cancel()
  606. if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
  607. return err
  608. }
  609. grpclog.Printf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %q", err, ac.addr)
  610. ac.mu.Lock()
  611. if ac.state == Shutdown {
  612. // ac.tearDown(...) has been invoked.
  613. ac.mu.Unlock()
  614. return errConnClosing
  615. }
  616. ac.errorf("transient failure: %v", err)
  617. ac.state = TransientFailure
  618. ac.stateCV.Broadcast()
  619. if ac.ready != nil {
  620. close(ac.ready)
  621. ac.ready = nil
  622. }
  623. ac.mu.Unlock()
  624. closeTransport = false
  625. select {
  626. case <-time.After(sleepTime - time.Since(connectTime)):
  627. case <-ac.ctx.Done():
  628. return ac.ctx.Err()
  629. }
  630. continue
  631. }
  632. ac.mu.Lock()
  633. ac.printf("ready")
  634. if ac.state == Shutdown {
  635. // ac.tearDown(...) has been invoked.
  636. ac.mu.Unlock()
  637. newTransport.Close()
  638. return errConnClosing
  639. }
  640. ac.state = Ready
  641. ac.stateCV.Broadcast()
  642. ac.transport = newTransport
  643. if ac.ready != nil {
  644. close(ac.ready)
  645. ac.ready = nil
  646. }
  647. if ac.cc.dopts.balancer != nil {
  648. ac.down = ac.cc.dopts.balancer.Up(ac.addr)
  649. }
  650. ac.mu.Unlock()
  651. return nil
  652. }
  653. }
  654. // Run in a goroutine to track the error in transport and create the
  655. // new transport if an error happens. It returns when the channel is closing.
  656. func (ac *addrConn) transportMonitor() {
  657. for {
  658. ac.mu.Lock()
  659. t := ac.transport
  660. ac.mu.Unlock()
  661. select {
  662. // This is needed to detect the teardown when
  663. // the addrConn is idle (i.e., no RPC in flight).
  664. case <-ac.ctx.Done():
  665. select {
  666. case <-t.Error():
  667. t.Close()
  668. default:
  669. }
  670. return
  671. case <-t.GoAway():
  672. // If GoAway happens without any network I/O error, ac is closed without shutting down the
  673. // underlying transport (the transport will be closed when all the pending RPCs finished or
  674. // failed.).
  675. // If GoAway and some network I/O error happen concurrently, ac and its underlying transport
  676. // are closed.
  677. // In both cases, a new ac is created.
  678. select {
  679. case <-t.Error():
  680. ac.cc.resetAddrConn(ac.addr, true, errNetworkIO)
  681. default:
  682. ac.cc.resetAddrConn(ac.addr, true, errConnDrain)
  683. }
  684. return
  685. case <-t.Error():
  686. select {
  687. case <-ac.ctx.Done():
  688. t.Close()
  689. return
  690. case <-t.GoAway():
  691. ac.cc.resetAddrConn(ac.addr, true, errNetworkIO)
  692. return
  693. default:
  694. }
  695. ac.mu.Lock()
  696. if ac.state == Shutdown {
  697. // ac has been shutdown.
  698. ac.mu.Unlock()
  699. return
  700. }
  701. ac.state = TransientFailure
  702. ac.stateCV.Broadcast()
  703. ac.mu.Unlock()
  704. if err := ac.resetTransport(true); err != nil {
  705. ac.mu.Lock()
  706. ac.printf("transport exiting: %v", err)
  707. ac.mu.Unlock()
  708. grpclog.Printf("grpc: addrConn.transportMonitor exits due to: %v", err)
  709. if err != errConnClosing {
  710. // Keep this ac in cc.conns, to get the reason it's torn down.
  711. ac.tearDown(err)
  712. }
  713. return
  714. }
  715. }
  716. }
  717. }
  718. // wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or
  719. // iv) transport is in TransientFailure and there's no balancer/failfast is true.
  720. func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (transport.ClientTransport, error) {
  721. for {
  722. ac.mu.Lock()
  723. switch {
  724. case ac.state == Shutdown:
  725. if failfast || !hasBalancer {
  726. // RPC is failfast or balancer is nil. This RPC should fail with ac.tearDownErr.
  727. err := ac.tearDownErr
  728. ac.mu.Unlock()
  729. return nil, err
  730. }
  731. ac.mu.Unlock()
  732. return nil, errConnClosing
  733. case ac.state == Ready:
  734. ct := ac.transport
  735. ac.mu.Unlock()
  736. return ct, nil
  737. case ac.state == TransientFailure:
  738. if failfast || hasBalancer {
  739. ac.mu.Unlock()
  740. return nil, errConnUnavailable
  741. }
  742. }
  743. ready := ac.ready
  744. if ready == nil {
  745. ready = make(chan struct{})
  746. ac.ready = ready
  747. }
  748. ac.mu.Unlock()
  749. select {
  750. case <-ctx.Done():
  751. return nil, toRPCErr(ctx.Err())
  752. // Wait until the new transport is ready or failed.
  753. case <-ready:
  754. }
  755. }
  756. }
  757. // tearDown starts to tear down the addrConn.
  758. // TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
  759. // some edge cases (e.g., the caller opens and closes many addrConn's in a
  760. // tight loop.
  761. // tearDown doesn't remove ac from ac.cc.conns.
  762. func (ac *addrConn) tearDown(err error) {
  763. ac.cancel()
  764. ac.mu.Lock()
  765. defer ac.mu.Unlock()
  766. if ac.down != nil {
  767. ac.down(downErrorf(false, false, "%v", err))
  768. ac.down = nil
  769. }
  770. if err == errConnDrain && ac.transport != nil {
  771. // GracefulClose(...) may be executed multiple times when
  772. // i) receiving multiple GoAway frames from the server; or
  773. // ii) there are concurrent name resolver/Balancer triggered
  774. // address removal and GoAway.
  775. ac.transport.GracefulClose()
  776. }
  777. if ac.state == Shutdown {
  778. return
  779. }
  780. ac.state = Shutdown
  781. ac.tearDownErr = err
  782. ac.stateCV.Broadcast()
  783. if ac.events != nil {
  784. ac.events.Finish()
  785. ac.events = nil
  786. }
  787. if ac.ready != nil {
  788. close(ac.ready)
  789. ac.ready = nil
  790. }
  791. if ac.transport != nil && err != errConnDrain {
  792. ac.transport.Close()
  793. }
  794. return
  795. }