clientconn.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888
  1. /*
  2. *
  3. * Copyright 2014, Google Inc.
  4. * All rights reserved.
  5. *
  6. * Redistribution and use in source and binary forms, with or without
  7. * modification, are permitted provided that the following conditions are
  8. * met:
  9. *
  10. * * Redistributions of source code must retain the above copyright
  11. * notice, this list of conditions and the following disclaimer.
  12. * * Redistributions in binary form must reproduce the above
  13. * copyright notice, this list of conditions and the following disclaimer
  14. * in the documentation and/or other materials provided with the
  15. * distribution.
  16. * * Neither the name of Google Inc. nor the names of its
  17. * contributors may be used to endorse or promote products derived from
  18. * this software without specific prior written permission.
  19. *
  20. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  21. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  22. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  23. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  24. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  25. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  26. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  27. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  28. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  29. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  30. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  31. *
  32. */
  33. package grpc
  34. import (
  35. "errors"
  36. "fmt"
  37. "net"
  38. "strings"
  39. "sync"
  40. "time"
  41. "golang.org/x/net/context"
  42. "golang.org/x/net/trace"
  43. "google.golang.org/grpc/credentials"
  44. "google.golang.org/grpc/grpclog"
  45. "google.golang.org/grpc/transport"
  46. )
  47. var (
  48. // ErrClientConnClosing indicates that the operation is illegal because
  49. // the ClientConn is closing.
  50. ErrClientConnClosing = errors.New("grpc: the client connection is closing")
  51. // ErrClientConnTimeout indicates that the ClientConn cannot establish the
  52. // underlying connections within the specified timeout.
  53. ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
  54. // errNoTransportSecurity indicates that there is no transport security
  55. // being set for ClientConn. Users should either set one or explicitly
  56. // call WithInsecure DialOption to disable security.
  57. errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")
  58. // errTransportCredentialsMissing indicates that users want to transmit security
  59. // information (e.g., oauth2 token) which requires secure connection on an insecure
  60. // connection.
  61. errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
  62. // errCredentialsConflict indicates that grpc.WithTransportCredentials()
  63. // and grpc.WithInsecure() are both called for a connection.
  64. errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
  65. // errNetworkIO indicates that the connection is down due to some network I/O error.
  66. errNetworkIO = errors.New("grpc: failed with network I/O error")
  67. // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
  68. errConnDrain = errors.New("grpc: the connection is drained")
  69. // errConnClosing indicates that the connection is closing.
  70. errConnClosing = errors.New("grpc: the connection is closing")
  71. // errConnUnavailable indicates that the connection is unavailable.
  72. errConnUnavailable = errors.New("grpc: the connection is unavailable")
  73. errNoAddr = errors.New("grpc: there is no address available to dial")
  74. // minimum time to give a connection to complete
  75. minConnectTimeout = 20 * time.Second
  76. )
  77. // dialOptions configure a Dial call. dialOptions are set by the DialOption
  78. // values passed to Dial.
  79. type dialOptions struct {
  80. unaryInt UnaryClientInterceptor
  81. streamInt StreamClientInterceptor
  82. codec Codec
  83. cp Compressor
  84. dc Decompressor
  85. bs backoffStrategy
  86. balancer Balancer
  87. block bool
  88. insecure bool
  89. timeout time.Duration
  90. copts transport.ConnectOptions
  91. }
  92. // DialOption configures how we set up the connection.
  93. type DialOption func(*dialOptions)
  94. // WithCodec returns a DialOption which sets a codec for message marshaling and unmarshaling.
  95. func WithCodec(c Codec) DialOption {
  96. return func(o *dialOptions) {
  97. o.codec = c
  98. }
  99. }
  100. // WithCompressor returns a DialOption which sets a CompressorGenerator for generating message
  101. // compressor.
  102. func WithCompressor(cp Compressor) DialOption {
  103. return func(o *dialOptions) {
  104. o.cp = cp
  105. }
  106. }
  107. // WithDecompressor returns a DialOption which sets a DecompressorGenerator for generating
  108. // message decompressor.
  109. func WithDecompressor(dc Decompressor) DialOption {
  110. return func(o *dialOptions) {
  111. o.dc = dc
  112. }
  113. }
  114. // WithBalancer returns a DialOption which sets a load balancer.
  115. func WithBalancer(b Balancer) DialOption {
  116. return func(o *dialOptions) {
  117. o.balancer = b
  118. }
  119. }
  120. // WithBackoffMaxDelay configures the dialer to use the provided maximum delay
  121. // when backing off after failed connection attempts.
  122. func WithBackoffMaxDelay(md time.Duration) DialOption {
  123. return WithBackoffConfig(BackoffConfig{MaxDelay: md})
  124. }
  125. // WithBackoffConfig configures the dialer to use the provided backoff
  126. // parameters after connection failures.
  127. //
  128. // Use WithBackoffMaxDelay until more parameters on BackoffConfig are opened up
  129. // for use.
  130. func WithBackoffConfig(b BackoffConfig) DialOption {
  131. // Set defaults to ensure that provided BackoffConfig is valid and
  132. // unexported fields get default values.
  133. setDefaults(&b)
  134. return withBackoff(b)
  135. }
  136. // withBackoff sets the backoff strategy used for retries after a
  137. // failed connection attempt.
  138. //
  139. // This can be exported if arbitrary backoff strategies are allowed by gRPC.
  140. func withBackoff(bs backoffStrategy) DialOption {
  141. return func(o *dialOptions) {
  142. o.bs = bs
  143. }
  144. }
  145. // WithBlock returns a DialOption which makes caller of Dial blocks until the underlying
  146. // connection is up. Without this, Dial returns immediately and connecting the server
  147. // happens in background.
  148. func WithBlock() DialOption {
  149. return func(o *dialOptions) {
  150. o.block = true
  151. }
  152. }
  153. // WithInsecure returns a DialOption which disables transport security for this ClientConn.
  154. // Note that transport security is required unless WithInsecure is set.
  155. func WithInsecure() DialOption {
  156. return func(o *dialOptions) {
  157. o.insecure = true
  158. }
  159. }
  160. // WithTransportCredentials returns a DialOption which configures a
  161. // connection level security credentials (e.g., TLS/SSL).
  162. func WithTransportCredentials(creds credentials.TransportCredentials) DialOption {
  163. return func(o *dialOptions) {
  164. o.copts.TransportCredentials = creds
  165. }
  166. }
  167. // WithPerRPCCredentials returns a DialOption which sets
  168. // credentials which will place auth state on each outbound RPC.
  169. func WithPerRPCCredentials(creds credentials.PerRPCCredentials) DialOption {
  170. return func(o *dialOptions) {
  171. o.copts.PerRPCCredentials = append(o.copts.PerRPCCredentials, creds)
  172. }
  173. }
  174. // WithTimeout returns a DialOption that configures a timeout for dialing a ClientConn
  175. // initially. This is valid if and only if WithBlock() is present.
  176. func WithTimeout(d time.Duration) DialOption {
  177. return func(o *dialOptions) {
  178. o.timeout = d
  179. }
  180. }
  181. // WithDialer returns a DialOption that specifies a function to use for dialing network addresses.
  182. func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption {
  183. return func(o *dialOptions) {
  184. o.copts.Dialer = func(ctx context.Context, addr string) (net.Conn, error) {
  185. if deadline, ok := ctx.Deadline(); ok {
  186. return f(addr, deadline.Sub(time.Now()))
  187. }
  188. return f(addr, 0)
  189. }
  190. }
  191. }
  192. // WithUserAgent returns a DialOption that specifies a user agent string for all the RPCs.
  193. func WithUserAgent(s string) DialOption {
  194. return func(o *dialOptions) {
  195. o.copts.UserAgent = s
  196. }
  197. }
  198. // WithUnaryInterceptor returns a DialOption that specifies the interceptor for unary RPCs.
  199. func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption {
  200. return func(o *dialOptions) {
  201. o.unaryInt = f
  202. }
  203. }
  204. // WithStreamInterceptor returns a DialOption that specifies the interceptor for streaming RPCs.
  205. func WithStreamInterceptor(f StreamClientInterceptor) DialOption {
  206. return func(o *dialOptions) {
  207. o.streamInt = f
  208. }
  209. }
  210. // Dial creates a client connection to the given target.
  211. func Dial(target string, opts ...DialOption) (*ClientConn, error) {
  212. return DialContext(context.Background(), target, opts...)
  213. }
  214. // DialContext creates a client connection to the given target. ctx can be used to
  215. // cancel or expire the pending connecting. Once this function returns, the
  216. // cancellation and expiration of ctx will be noop. Users should call ClientConn.Close
  217. // to terminate all the pending operations after this function returns.
  218. // This is the EXPERIMENTAL API.
  219. func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
  220. cc := &ClientConn{
  221. target: target,
  222. conns: make(map[Address]*addrConn),
  223. }
  224. cc.ctx, cc.cancel = context.WithCancel(context.Background())
  225. defer func() {
  226. select {
  227. case <-ctx.Done():
  228. conn, err = nil, ctx.Err()
  229. default:
  230. }
  231. if err != nil {
  232. cc.Close()
  233. }
  234. }()
  235. for _, opt := range opts {
  236. opt(&cc.dopts)
  237. }
  238. // Set defaults.
  239. if cc.dopts.codec == nil {
  240. cc.dopts.codec = protoCodec{}
  241. }
  242. if cc.dopts.bs == nil {
  243. cc.dopts.bs = DefaultBackoffConfig
  244. }
  245. creds := cc.dopts.copts.TransportCredentials
  246. if creds != nil && creds.Info().ServerName != "" {
  247. cc.authority = creds.Info().ServerName
  248. } else {
  249. colonPos := strings.LastIndex(target, ":")
  250. if colonPos == -1 {
  251. colonPos = len(target)
  252. }
  253. cc.authority = target[:colonPos]
  254. }
  255. var ok bool
  256. waitC := make(chan error, 1)
  257. go func() {
  258. var addrs []Address
  259. if cc.dopts.balancer == nil {
  260. // Connect to target directly if balancer is nil.
  261. addrs = append(addrs, Address{Addr: target})
  262. } else {
  263. var credsClone credentials.TransportCredentials
  264. if creds != nil {
  265. credsClone = creds.Clone()
  266. }
  267. config := BalancerConfig{
  268. DialCreds: credsClone,
  269. }
  270. if err := cc.dopts.balancer.Start(target, config); err != nil {
  271. waitC <- err
  272. return
  273. }
  274. ch := cc.dopts.balancer.Notify()
  275. if ch == nil {
  276. // There is no name resolver installed.
  277. addrs = append(addrs, Address{Addr: target})
  278. } else {
  279. addrs, ok = <-ch
  280. if !ok || len(addrs) == 0 {
  281. waitC <- errNoAddr
  282. return
  283. }
  284. }
  285. }
  286. for _, a := range addrs {
  287. if err := cc.resetAddrConn(a, false, nil); err != nil {
  288. waitC <- err
  289. return
  290. }
  291. }
  292. close(waitC)
  293. }()
  294. var timeoutCh <-chan time.Time
  295. if cc.dopts.timeout > 0 {
  296. timeoutCh = time.After(cc.dopts.timeout)
  297. }
  298. select {
  299. case <-ctx.Done():
  300. return nil, ctx.Err()
  301. case err := <-waitC:
  302. if err != nil {
  303. return nil, err
  304. }
  305. case <-timeoutCh:
  306. return nil, ErrClientConnTimeout
  307. }
  308. // If balancer is nil or balancer.Notify() is nil, ok will be false here.
  309. // The lbWatcher goroutine will not be created.
  310. if ok {
  311. go cc.lbWatcher()
  312. }
  313. return cc, nil
  314. }
  315. // ConnectivityState indicates the state of a client connection.
  316. type ConnectivityState int
  317. const (
  318. // Idle indicates the ClientConn is idle.
  319. Idle ConnectivityState = iota
  320. // Connecting indicates the ClienConn is connecting.
  321. Connecting
  322. // Ready indicates the ClientConn is ready for work.
  323. Ready
  324. // TransientFailure indicates the ClientConn has seen a failure but expects to recover.
  325. TransientFailure
  326. // Shutdown indicates the ClientConn has started shutting down.
  327. Shutdown
  328. )
  329. func (s ConnectivityState) String() string {
  330. switch s {
  331. case Idle:
  332. return "IDLE"
  333. case Connecting:
  334. return "CONNECTING"
  335. case Ready:
  336. return "READY"
  337. case TransientFailure:
  338. return "TRANSIENT_FAILURE"
  339. case Shutdown:
  340. return "SHUTDOWN"
  341. default:
  342. panic(fmt.Sprintf("unknown connectivity state: %d", s))
  343. }
  344. }
  345. // ClientConn represents a client connection to an RPC server.
  346. type ClientConn struct {
  347. ctx context.Context
  348. cancel context.CancelFunc
  349. target string
  350. authority string
  351. dopts dialOptions
  352. mu sync.RWMutex
  353. conns map[Address]*addrConn
  354. }
  355. func (cc *ClientConn) lbWatcher() {
  356. for addrs := range cc.dopts.balancer.Notify() {
  357. var (
  358. add []Address // Addresses need to setup connections.
  359. del []*addrConn // Connections need to tear down.
  360. )
  361. cc.mu.Lock()
  362. for _, a := range addrs {
  363. if _, ok := cc.conns[a]; !ok {
  364. add = append(add, a)
  365. }
  366. }
  367. for k, c := range cc.conns {
  368. var keep bool
  369. for _, a := range addrs {
  370. if k == a {
  371. keep = true
  372. break
  373. }
  374. }
  375. if !keep {
  376. del = append(del, c)
  377. delete(cc.conns, c.addr)
  378. }
  379. }
  380. cc.mu.Unlock()
  381. for _, a := range add {
  382. cc.resetAddrConn(a, true, nil)
  383. }
  384. for _, c := range del {
  385. c.tearDown(errConnDrain)
  386. }
  387. }
  388. }
  389. // resetAddrConn creates an addrConn for addr and adds it to cc.conns.
  390. // If there is an old addrConn for addr, it will be torn down, using tearDownErr as the reason.
  391. // If tearDownErr is nil, errConnDrain will be used instead.
  392. func (cc *ClientConn) resetAddrConn(addr Address, skipWait bool, tearDownErr error) error {
  393. ac := &addrConn{
  394. cc: cc,
  395. addr: addr,
  396. dopts: cc.dopts,
  397. }
  398. ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
  399. ac.stateCV = sync.NewCond(&ac.mu)
  400. if EnableTracing {
  401. ac.events = trace.NewEventLog("grpc.ClientConn", ac.addr.Addr)
  402. }
  403. if !ac.dopts.insecure {
  404. if ac.dopts.copts.TransportCredentials == nil {
  405. return errNoTransportSecurity
  406. }
  407. } else {
  408. if ac.dopts.copts.TransportCredentials != nil {
  409. return errCredentialsConflict
  410. }
  411. for _, cd := range ac.dopts.copts.PerRPCCredentials {
  412. if cd.RequireTransportSecurity() {
  413. return errTransportCredentialsMissing
  414. }
  415. }
  416. }
  417. // Track ac in cc. This needs to be done before any getTransport(...) is called.
  418. cc.mu.Lock()
  419. if cc.conns == nil {
  420. cc.mu.Unlock()
  421. return ErrClientConnClosing
  422. }
  423. stale := cc.conns[ac.addr]
  424. cc.conns[ac.addr] = ac
  425. cc.mu.Unlock()
  426. if stale != nil {
  427. // There is an addrConn alive on ac.addr already. This could be due to
  428. // 1) a buggy Balancer notifies duplicated Addresses;
  429. // 2) goaway was received, a new ac will replace the old ac.
  430. // The old ac should be deleted from cc.conns, but the
  431. // underlying transport should drain rather than close.
  432. if tearDownErr == nil {
  433. // tearDownErr is nil if resetAddrConn is called by
  434. // 1) Dial
  435. // 2) lbWatcher
  436. // In both cases, the stale ac should drain, not close.
  437. stale.tearDown(errConnDrain)
  438. } else {
  439. stale.tearDown(tearDownErr)
  440. }
  441. }
  442. // skipWait may overwrite the decision in ac.dopts.block.
  443. if ac.dopts.block && !skipWait {
  444. if err := ac.resetTransport(false); err != nil {
  445. if err != errConnClosing {
  446. // Tear down ac and delete it from cc.conns.
  447. cc.mu.Lock()
  448. delete(cc.conns, ac.addr)
  449. cc.mu.Unlock()
  450. ac.tearDown(err)
  451. }
  452. if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
  453. return e.Origin()
  454. }
  455. return err
  456. }
  457. // Start to monitor the error status of transport.
  458. go ac.transportMonitor()
  459. } else {
  460. // Start a goroutine connecting to the server asynchronously.
  461. go func() {
  462. if err := ac.resetTransport(false); err != nil {
  463. grpclog.Printf("Failed to dial %s: %v; please retry.", ac.addr.Addr, err)
  464. if err != errConnClosing {
  465. // Keep this ac in cc.conns, to get the reason it's torn down.
  466. ac.tearDown(err)
  467. }
  468. return
  469. }
  470. ac.transportMonitor()
  471. }()
  472. }
  473. return nil
  474. }
  475. func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) {
  476. var (
  477. ac *addrConn
  478. ok bool
  479. put func()
  480. )
  481. if cc.dopts.balancer == nil {
  482. // If balancer is nil, there should be only one addrConn available.
  483. cc.mu.RLock()
  484. if cc.conns == nil {
  485. cc.mu.RUnlock()
  486. return nil, nil, toRPCErr(ErrClientConnClosing)
  487. }
  488. for _, ac = range cc.conns {
  489. // Break after the first iteration to get the first addrConn.
  490. ok = true
  491. break
  492. }
  493. cc.mu.RUnlock()
  494. } else {
  495. var (
  496. addr Address
  497. err error
  498. )
  499. addr, put, err = cc.dopts.balancer.Get(ctx, opts)
  500. if err != nil {
  501. return nil, nil, toRPCErr(err)
  502. }
  503. cc.mu.RLock()
  504. if cc.conns == nil {
  505. cc.mu.RUnlock()
  506. return nil, nil, toRPCErr(ErrClientConnClosing)
  507. }
  508. ac, ok = cc.conns[addr]
  509. cc.mu.RUnlock()
  510. }
  511. if !ok {
  512. if put != nil {
  513. put()
  514. }
  515. return nil, nil, errConnClosing
  516. }
  517. t, err := ac.wait(ctx, cc.dopts.balancer != nil, !opts.BlockingWait)
  518. if err != nil {
  519. if put != nil {
  520. put()
  521. }
  522. return nil, nil, err
  523. }
  524. return t, put, nil
  525. }
  526. // Close tears down the ClientConn and all underlying connections.
  527. func (cc *ClientConn) Close() error {
  528. cc.cancel()
  529. cc.mu.Lock()
  530. if cc.conns == nil {
  531. cc.mu.Unlock()
  532. return ErrClientConnClosing
  533. }
  534. conns := cc.conns
  535. cc.conns = nil
  536. cc.mu.Unlock()
  537. if cc.dopts.balancer != nil {
  538. cc.dopts.balancer.Close()
  539. }
  540. for _, ac := range conns {
  541. ac.tearDown(ErrClientConnClosing)
  542. }
  543. return nil
  544. }
  545. // addrConn is a network connection to a given address.
  546. type addrConn struct {
  547. ctx context.Context
  548. cancel context.CancelFunc
  549. cc *ClientConn
  550. addr Address
  551. dopts dialOptions
  552. events trace.EventLog
  553. mu sync.Mutex
  554. state ConnectivityState
  555. stateCV *sync.Cond
  556. down func(error) // the handler called when a connection is down.
  557. // ready is closed and becomes nil when a new transport is up or failed
  558. // due to timeout.
  559. ready chan struct{}
  560. transport transport.ClientTransport
  561. // The reason this addrConn is torn down.
  562. tearDownErr error
  563. }
  564. // printf records an event in ac's event log, unless ac has been closed.
  565. // REQUIRES ac.mu is held.
  566. func (ac *addrConn) printf(format string, a ...interface{}) {
  567. if ac.events != nil {
  568. ac.events.Printf(format, a...)
  569. }
  570. }
  571. // errorf records an error in ac's event log, unless ac has been closed.
  572. // REQUIRES ac.mu is held.
  573. func (ac *addrConn) errorf(format string, a ...interface{}) {
  574. if ac.events != nil {
  575. ac.events.Errorf(format, a...)
  576. }
  577. }
  578. // getState returns the connectivity state of the Conn
  579. func (ac *addrConn) getState() ConnectivityState {
  580. ac.mu.Lock()
  581. defer ac.mu.Unlock()
  582. return ac.state
  583. }
  584. // waitForStateChange blocks until the state changes to something other than the sourceState.
  585. func (ac *addrConn) waitForStateChange(ctx context.Context, sourceState ConnectivityState) (ConnectivityState, error) {
  586. ac.mu.Lock()
  587. defer ac.mu.Unlock()
  588. if sourceState != ac.state {
  589. return ac.state, nil
  590. }
  591. done := make(chan struct{})
  592. var err error
  593. go func() {
  594. select {
  595. case <-ctx.Done():
  596. ac.mu.Lock()
  597. err = ctx.Err()
  598. ac.stateCV.Broadcast()
  599. ac.mu.Unlock()
  600. case <-done:
  601. }
  602. }()
  603. defer close(done)
  604. for sourceState == ac.state {
  605. ac.stateCV.Wait()
  606. if err != nil {
  607. return ac.state, err
  608. }
  609. }
  610. return ac.state, nil
  611. }
  612. func (ac *addrConn) resetTransport(closeTransport bool) error {
  613. for retries := 0; ; retries++ {
  614. ac.mu.Lock()
  615. ac.printf("connecting")
  616. if ac.state == Shutdown {
  617. // ac.tearDown(...) has been invoked.
  618. ac.mu.Unlock()
  619. return errConnClosing
  620. }
  621. if ac.down != nil {
  622. ac.down(downErrorf(false, true, "%v", errNetworkIO))
  623. ac.down = nil
  624. }
  625. ac.state = Connecting
  626. ac.stateCV.Broadcast()
  627. t := ac.transport
  628. ac.mu.Unlock()
  629. if closeTransport && t != nil {
  630. t.Close()
  631. }
  632. sleepTime := ac.dopts.bs.backoff(retries)
  633. timeout := minConnectTimeout
  634. if timeout < sleepTime {
  635. timeout = sleepTime
  636. }
  637. ctx, cancel := context.WithTimeout(ac.ctx, timeout)
  638. connectTime := time.Now()
  639. sinfo := transport.TargetInfo{
  640. Addr: ac.addr.Addr,
  641. Metadata: ac.addr.Metadata,
  642. }
  643. newTransport, err := transport.NewClientTransport(ctx, sinfo, ac.dopts.copts)
  644. if err != nil {
  645. cancel()
  646. if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
  647. return err
  648. }
  649. grpclog.Printf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %v", err, ac.addr)
  650. ac.mu.Lock()
  651. if ac.state == Shutdown {
  652. // ac.tearDown(...) has been invoked.
  653. ac.mu.Unlock()
  654. return errConnClosing
  655. }
  656. ac.errorf("transient failure: %v", err)
  657. ac.state = TransientFailure
  658. ac.stateCV.Broadcast()
  659. if ac.ready != nil {
  660. close(ac.ready)
  661. ac.ready = nil
  662. }
  663. ac.mu.Unlock()
  664. closeTransport = false
  665. select {
  666. case <-time.After(sleepTime - time.Since(connectTime)):
  667. case <-ac.ctx.Done():
  668. return ac.ctx.Err()
  669. }
  670. continue
  671. }
  672. ac.mu.Lock()
  673. ac.printf("ready")
  674. if ac.state == Shutdown {
  675. // ac.tearDown(...) has been invoked.
  676. ac.mu.Unlock()
  677. newTransport.Close()
  678. return errConnClosing
  679. }
  680. ac.state = Ready
  681. ac.stateCV.Broadcast()
  682. ac.transport = newTransport
  683. if ac.ready != nil {
  684. close(ac.ready)
  685. ac.ready = nil
  686. }
  687. if ac.cc.dopts.balancer != nil {
  688. ac.down = ac.cc.dopts.balancer.Up(ac.addr)
  689. }
  690. ac.mu.Unlock()
  691. return nil
  692. }
  693. }
  694. // Run in a goroutine to track the error in transport and create the
  695. // new transport if an error happens. It returns when the channel is closing.
  696. func (ac *addrConn) transportMonitor() {
  697. for {
  698. ac.mu.Lock()
  699. t := ac.transport
  700. ac.mu.Unlock()
  701. select {
  702. // This is needed to detect the teardown when
  703. // the addrConn is idle (i.e., no RPC in flight).
  704. case <-ac.ctx.Done():
  705. select {
  706. case <-t.Error():
  707. t.Close()
  708. default:
  709. }
  710. return
  711. case <-t.GoAway():
  712. // If GoAway happens without any network I/O error, ac is closed without shutting down the
  713. // underlying transport (the transport will be closed when all the pending RPCs finished or
  714. // failed.).
  715. // If GoAway and some network I/O error happen concurrently, ac and its underlying transport
  716. // are closed.
  717. // In both cases, a new ac is created.
  718. select {
  719. case <-t.Error():
  720. ac.cc.resetAddrConn(ac.addr, true, errNetworkIO)
  721. default:
  722. ac.cc.resetAddrConn(ac.addr, true, errConnDrain)
  723. }
  724. return
  725. case <-t.Error():
  726. select {
  727. case <-ac.ctx.Done():
  728. t.Close()
  729. return
  730. case <-t.GoAway():
  731. ac.cc.resetAddrConn(ac.addr, true, errNetworkIO)
  732. return
  733. default:
  734. }
  735. ac.mu.Lock()
  736. if ac.state == Shutdown {
  737. // ac has been shutdown.
  738. ac.mu.Unlock()
  739. return
  740. }
  741. ac.state = TransientFailure
  742. ac.stateCV.Broadcast()
  743. ac.mu.Unlock()
  744. if err := ac.resetTransport(true); err != nil {
  745. ac.mu.Lock()
  746. ac.printf("transport exiting: %v", err)
  747. ac.mu.Unlock()
  748. grpclog.Printf("grpc: addrConn.transportMonitor exits due to: %v", err)
  749. if err != errConnClosing {
  750. // Keep this ac in cc.conns, to get the reason it's torn down.
  751. ac.tearDown(err)
  752. }
  753. return
  754. }
  755. }
  756. }
  757. }
  758. // wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or
  759. // iv) transport is in TransientFailure and there is a balancer/failfast is true.
  760. func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (transport.ClientTransport, error) {
  761. for {
  762. ac.mu.Lock()
  763. switch {
  764. case ac.state == Shutdown:
  765. if failfast || !hasBalancer {
  766. // RPC is failfast or balancer is nil. This RPC should fail with ac.tearDownErr.
  767. err := ac.tearDownErr
  768. ac.mu.Unlock()
  769. return nil, err
  770. }
  771. ac.mu.Unlock()
  772. return nil, errConnClosing
  773. case ac.state == Ready:
  774. ct := ac.transport
  775. ac.mu.Unlock()
  776. return ct, nil
  777. case ac.state == TransientFailure:
  778. if failfast || hasBalancer {
  779. ac.mu.Unlock()
  780. return nil, errConnUnavailable
  781. }
  782. }
  783. ready := ac.ready
  784. if ready == nil {
  785. ready = make(chan struct{})
  786. ac.ready = ready
  787. }
  788. ac.mu.Unlock()
  789. select {
  790. case <-ctx.Done():
  791. return nil, toRPCErr(ctx.Err())
  792. // Wait until the new transport is ready or failed.
  793. case <-ready:
  794. }
  795. }
  796. }
  797. // tearDown starts to tear down the addrConn.
  798. // TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
  799. // some edge cases (e.g., the caller opens and closes many addrConn's in a
  800. // tight loop.
  801. // tearDown doesn't remove ac from ac.cc.conns.
  802. func (ac *addrConn) tearDown(err error) {
  803. ac.cancel()
  804. ac.mu.Lock()
  805. defer ac.mu.Unlock()
  806. if ac.down != nil {
  807. ac.down(downErrorf(false, false, "%v", err))
  808. ac.down = nil
  809. }
  810. if err == errConnDrain && ac.transport != nil {
  811. // GracefulClose(...) may be executed multiple times when
  812. // i) receiving multiple GoAway frames from the server; or
  813. // ii) there are concurrent name resolver/Balancer triggered
  814. // address removal and GoAway.
  815. ac.transport.GracefulClose()
  816. }
  817. if ac.state == Shutdown {
  818. return
  819. }
  820. ac.state = Shutdown
  821. ac.tearDownErr = err
  822. ac.stateCV.Broadcast()
  823. if ac.events != nil {
  824. ac.events.Finish()
  825. ac.events = nil
  826. }
  827. if ac.ready != nil {
  828. close(ac.ready)
  829. ac.ready = nil
  830. }
  831. if ac.transport != nil && err != errConnDrain {
  832. ac.transport.Close()
  833. }
  834. return
  835. }