clientconn.go 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080
  1. /*
  2. *
  3. * Copyright 2014, Google Inc.
  4. * All rights reserved.
  5. *
  6. * Redistribution and use in source and binary forms, with or without
  7. * modification, are permitted provided that the following conditions are
  8. * met:
  9. *
  10. * * Redistributions of source code must retain the above copyright
  11. * notice, this list of conditions and the following disclaimer.
  12. * * Redistributions in binary form must reproduce the above
  13. * copyright notice, this list of conditions and the following disclaimer
  14. * in the documentation and/or other materials provided with the
  15. * distribution.
  16. * * Neither the name of Google Inc. nor the names of its
  17. * contributors may be used to endorse or promote products derived from
  18. * this software without specific prior written permission.
  19. *
  20. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  21. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  22. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  23. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  24. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  25. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  26. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  27. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  28. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  29. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  30. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  31. *
  32. */
  33. package grpc
  34. import (
  35. "errors"
  36. "fmt"
  37. "net"
  38. "strings"
  39. "sync"
  40. "time"
  41. "golang.org/x/net/context"
  42. "golang.org/x/net/trace"
  43. "google.golang.org/grpc/credentials"
  44. "google.golang.org/grpc/grpclog"
  45. "google.golang.org/grpc/keepalive"
  46. "google.golang.org/grpc/stats"
  47. "google.golang.org/grpc/transport"
  48. )
  49. var (
  50. // ErrClientConnClosing indicates that the operation is illegal because
  51. // the ClientConn is closing.
  52. ErrClientConnClosing = errors.New("grpc: the client connection is closing")
  53. // ErrClientConnTimeout indicates that the ClientConn cannot establish the
  54. // underlying connections within the specified timeout.
  55. // DEPRECATED: Please use context.DeadlineExceeded instead.
  56. ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
  57. // errNoTransportSecurity indicates that there is no transport security
  58. // being set for ClientConn. Users should either set one or explicitly
  59. // call WithInsecure DialOption to disable security.
  60. errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")
  61. // errTransportCredentialsMissing indicates that users want to transmit security
  62. // information (e.g., oauth2 token) which requires secure connection on an insecure
  63. // connection.
  64. errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
  65. // errCredentialsConflict indicates that grpc.WithTransportCredentials()
  66. // and grpc.WithInsecure() are both called for a connection.
  67. errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
  68. // errNetworkIO indicates that the connection is down due to some network I/O error.
  69. errNetworkIO = errors.New("grpc: failed with network I/O error")
  70. // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
  71. errConnDrain = errors.New("grpc: the connection is drained")
  72. // errConnClosing indicates that the connection is closing.
  73. errConnClosing = errors.New("grpc: the connection is closing")
  74. // errConnUnavailable indicates that the connection is unavailable.
  75. errConnUnavailable = errors.New("grpc: the connection is unavailable")
  76. // errBalancerClosed indicates that the balancer is closed.
  77. errBalancerClosed = errors.New("grpc: balancer is closed")
  78. // minimum time to give a connection to complete
  79. minConnectTimeout = 20 * time.Second
  80. )
  81. // dialOptions configure a Dial call. dialOptions are set by the DialOption
  82. // values passed to Dial.
  83. type dialOptions struct {
  84. unaryInt UnaryClientInterceptor
  85. streamInt StreamClientInterceptor
  86. codec Codec
  87. cp Compressor
  88. dc Decompressor
  89. bs backoffStrategy
  90. balancer Balancer
  91. block bool
  92. insecure bool
  93. timeout time.Duration
  94. scChan <-chan ServiceConfig
  95. copts transport.ConnectOptions
  96. callOptions []CallOption
  97. }
  98. const (
  99. defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
  100. defaultClientMaxSendMessageSize = 1024 * 1024 * 4
  101. )
  102. // DialOption configures how we set up the connection.
  103. type DialOption func(*dialOptions)
  104. // WithInitialWindowSize returns a DialOption which sets the value for initial window size on a stream.
  105. // The lower bound for window size is 64K and any value smaller than that will be ignored.
  106. func WithInitialWindowSize(s int32) DialOption {
  107. return func(o *dialOptions) {
  108. o.copts.InitialWindowSize = s
  109. }
  110. }
  111. // WithInitialConnWindowSize returns a DialOption which sets the value for initial window size on a connection.
  112. // The lower bound for window size is 64K and any value smaller than that will be ignored.
  113. func WithInitialConnWindowSize(s int32) DialOption {
  114. return func(o *dialOptions) {
  115. o.copts.InitialConnWindowSize = s
  116. }
  117. }
  118. // WithMaxMsgSize returns a DialOption which sets the maximum message size the client can receive. Deprecated: use WithDefaultCallOptions(MaxCallRecvMsgSize(s)) instead.
  119. func WithMaxMsgSize(s int) DialOption {
  120. return WithDefaultCallOptions(MaxCallRecvMsgSize(s))
  121. }
  122. // WithDefaultCallOptions returns a DialOption which sets the default CallOptions for calls over the connection.
  123. func WithDefaultCallOptions(cos ...CallOption) DialOption {
  124. return func(o *dialOptions) {
  125. o.callOptions = append(o.callOptions, cos...)
  126. }
  127. }
  128. // WithCodec returns a DialOption which sets a codec for message marshaling and unmarshaling.
  129. func WithCodec(c Codec) DialOption {
  130. return func(o *dialOptions) {
  131. o.codec = c
  132. }
  133. }
  134. // WithCompressor returns a DialOption which sets a CompressorGenerator for generating message
  135. // compressor.
  136. func WithCompressor(cp Compressor) DialOption {
  137. return func(o *dialOptions) {
  138. o.cp = cp
  139. }
  140. }
  141. // WithDecompressor returns a DialOption which sets a DecompressorGenerator for generating
  142. // message decompressor.
  143. func WithDecompressor(dc Decompressor) DialOption {
  144. return func(o *dialOptions) {
  145. o.dc = dc
  146. }
  147. }
  148. // WithBalancer returns a DialOption which sets a load balancer.
  149. func WithBalancer(b Balancer) DialOption {
  150. return func(o *dialOptions) {
  151. o.balancer = b
  152. }
  153. }
  154. // WithServiceConfig returns a DialOption which has a channel to read the service configuration.
  155. func WithServiceConfig(c <-chan ServiceConfig) DialOption {
  156. return func(o *dialOptions) {
  157. o.scChan = c
  158. }
  159. }
  160. // WithBackoffMaxDelay configures the dialer to use the provided maximum delay
  161. // when backing off after failed connection attempts.
  162. func WithBackoffMaxDelay(md time.Duration) DialOption {
  163. return WithBackoffConfig(BackoffConfig{MaxDelay: md})
  164. }
  165. // WithBackoffConfig configures the dialer to use the provided backoff
  166. // parameters after connection failures.
  167. //
  168. // Use WithBackoffMaxDelay until more parameters on BackoffConfig are opened up
  169. // for use.
  170. func WithBackoffConfig(b BackoffConfig) DialOption {
  171. // Set defaults to ensure that provided BackoffConfig is valid and
  172. // unexported fields get default values.
  173. setDefaults(&b)
  174. return withBackoff(b)
  175. }
  176. // withBackoff sets the backoff strategy used for retries after a
  177. // failed connection attempt.
  178. //
  179. // This can be exported if arbitrary backoff strategies are allowed by gRPC.
  180. func withBackoff(bs backoffStrategy) DialOption {
  181. return func(o *dialOptions) {
  182. o.bs = bs
  183. }
  184. }
  185. // WithBlock returns a DialOption which makes caller of Dial blocks until the underlying
  186. // connection is up. Without this, Dial returns immediately and connecting the server
  187. // happens in background.
  188. func WithBlock() DialOption {
  189. return func(o *dialOptions) {
  190. o.block = true
  191. }
  192. }
  193. // WithInsecure returns a DialOption which disables transport security for this ClientConn.
  194. // Note that transport security is required unless WithInsecure is set.
  195. func WithInsecure() DialOption {
  196. return func(o *dialOptions) {
  197. o.insecure = true
  198. }
  199. }
  200. // WithTransportCredentials returns a DialOption which configures a
  201. // connection level security credentials (e.g., TLS/SSL).
  202. func WithTransportCredentials(creds credentials.TransportCredentials) DialOption {
  203. return func(o *dialOptions) {
  204. o.copts.TransportCredentials = creds
  205. }
  206. }
  207. // WithPerRPCCredentials returns a DialOption which sets
  208. // credentials and places auth state on each outbound RPC.
  209. func WithPerRPCCredentials(creds credentials.PerRPCCredentials) DialOption {
  210. return func(o *dialOptions) {
  211. o.copts.PerRPCCredentials = append(o.copts.PerRPCCredentials, creds)
  212. }
  213. }
  214. // WithTimeout returns a DialOption that configures a timeout for dialing a ClientConn
  215. // initially. This is valid if and only if WithBlock() is present.
  216. func WithTimeout(d time.Duration) DialOption {
  217. return func(o *dialOptions) {
  218. o.timeout = d
  219. }
  220. }
  221. // WithDialer returns a DialOption that specifies a function to use for dialing network addresses.
  222. // If FailOnNonTempDialError() is set to true, and an error is returned by f, gRPC checks the error's
  223. // Temporary() method to decide if it should try to reconnect to the network address.
  224. func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption {
  225. return func(o *dialOptions) {
  226. o.copts.Dialer = func(ctx context.Context, addr string) (net.Conn, error) {
  227. if deadline, ok := ctx.Deadline(); ok {
  228. return f(addr, deadline.Sub(time.Now()))
  229. }
  230. return f(addr, 0)
  231. }
  232. }
  233. }
  234. // WithStatsHandler returns a DialOption that specifies the stats handler
  235. // for all the RPCs and underlying network connections in this ClientConn.
  236. func WithStatsHandler(h stats.Handler) DialOption {
  237. return func(o *dialOptions) {
  238. o.copts.StatsHandler = h
  239. }
  240. }
  241. // FailOnNonTempDialError returns a DialOption that specifies if gRPC fails on non-temporary dial errors.
  242. // If f is true, and dialer returns a non-temporary error, gRPC will fail the connection to the network
  243. // address and won't try to reconnect.
  244. // The default value of FailOnNonTempDialError is false.
  245. // This is an EXPERIMENTAL API.
  246. func FailOnNonTempDialError(f bool) DialOption {
  247. return func(o *dialOptions) {
  248. o.copts.FailOnNonTempDialError = f
  249. }
  250. }
  251. // WithUserAgent returns a DialOption that specifies a user agent string for all the RPCs.
  252. func WithUserAgent(s string) DialOption {
  253. return func(o *dialOptions) {
  254. o.copts.UserAgent = s
  255. }
  256. }
  257. // WithKeepaliveParams returns a DialOption that specifies keepalive paramaters for the client transport.
  258. func WithKeepaliveParams(kp keepalive.ClientParameters) DialOption {
  259. return func(o *dialOptions) {
  260. o.copts.KeepaliveParams = kp
  261. }
  262. }
  263. // WithUnaryInterceptor returns a DialOption that specifies the interceptor for unary RPCs.
  264. func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption {
  265. return func(o *dialOptions) {
  266. o.unaryInt = f
  267. }
  268. }
  269. // WithStreamInterceptor returns a DialOption that specifies the interceptor for streaming RPCs.
  270. func WithStreamInterceptor(f StreamClientInterceptor) DialOption {
  271. return func(o *dialOptions) {
  272. o.streamInt = f
  273. }
  274. }
  275. // WithAuthority returns a DialOption that specifies the value to be used as
  276. // the :authority pseudo-header. This value only works with WithInsecure and
  277. // has no effect if TransportCredentials are present.
  278. func WithAuthority(a string) DialOption {
  279. return func(o *dialOptions) {
  280. o.copts.Authority = a
  281. }
  282. }
  283. // Dial creates a client connection to the given target.
  284. func Dial(target string, opts ...DialOption) (*ClientConn, error) {
  285. return DialContext(context.Background(), target, opts...)
  286. }
  287. // DialContext creates a client connection to the given target. ctx can be used to
  288. // cancel or expire the pending connection. Once this function returns, the
  289. // cancellation and expiration of ctx will be noop. Users should call ClientConn.Close
  290. // to terminate all the pending operations after this function returns.
  291. func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
  292. cc := &ClientConn{
  293. target: target,
  294. conns: make(map[Address]*addrConn),
  295. }
  296. cc.ctx, cc.cancel = context.WithCancel(context.Background())
  297. for _, opt := range opts {
  298. opt(&cc.dopts)
  299. }
  300. cc.mkp = cc.dopts.copts.KeepaliveParams
  301. if cc.dopts.copts.Dialer == nil {
  302. cc.dopts.copts.Dialer = newProxyDialer(
  303. func(ctx context.Context, addr string) (net.Conn, error) {
  304. return dialContext(ctx, "tcp", addr)
  305. },
  306. )
  307. }
  308. if cc.dopts.copts.UserAgent != "" {
  309. cc.dopts.copts.UserAgent += " " + grpcUA
  310. } else {
  311. cc.dopts.copts.UserAgent = grpcUA
  312. }
  313. if cc.dopts.timeout > 0 {
  314. var cancel context.CancelFunc
  315. ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
  316. defer cancel()
  317. }
  318. defer func() {
  319. select {
  320. case <-ctx.Done():
  321. conn, err = nil, ctx.Err()
  322. default:
  323. }
  324. if err != nil {
  325. cc.Close()
  326. }
  327. }()
  328. scSet := false
  329. if cc.dopts.scChan != nil {
  330. // Try to get an initial service config.
  331. select {
  332. case sc, ok := <-cc.dopts.scChan:
  333. if ok {
  334. cc.sc = sc
  335. scSet = true
  336. }
  337. default:
  338. }
  339. }
  340. // Set defaults.
  341. if cc.dopts.codec == nil {
  342. cc.dopts.codec = protoCodec{}
  343. }
  344. if cc.dopts.bs == nil {
  345. cc.dopts.bs = DefaultBackoffConfig
  346. }
  347. creds := cc.dopts.copts.TransportCredentials
  348. if creds != nil && creds.Info().ServerName != "" {
  349. cc.authority = creds.Info().ServerName
  350. } else if cc.dopts.insecure && cc.dopts.copts.Authority != "" {
  351. cc.authority = cc.dopts.copts.Authority
  352. } else {
  353. cc.authority = target
  354. }
  355. waitC := make(chan error, 1)
  356. go func() {
  357. defer close(waitC)
  358. if cc.dopts.balancer == nil && cc.sc.LB != nil {
  359. cc.dopts.balancer = cc.sc.LB
  360. }
  361. if cc.dopts.balancer != nil {
  362. var credsClone credentials.TransportCredentials
  363. if creds != nil {
  364. credsClone = creds.Clone()
  365. }
  366. config := BalancerConfig{
  367. DialCreds: credsClone,
  368. Dialer: cc.dopts.copts.Dialer,
  369. }
  370. if err := cc.dopts.balancer.Start(target, config); err != nil {
  371. waitC <- err
  372. return
  373. }
  374. ch := cc.dopts.balancer.Notify()
  375. if ch != nil {
  376. if cc.dopts.block {
  377. doneChan := make(chan struct{})
  378. go cc.lbWatcher(doneChan)
  379. <-doneChan
  380. } else {
  381. go cc.lbWatcher(nil)
  382. }
  383. return
  384. }
  385. }
  386. // No balancer, or no resolver within the balancer. Connect directly.
  387. if err := cc.resetAddrConn(Address{Addr: target}, cc.dopts.block, nil); err != nil {
  388. waitC <- err
  389. return
  390. }
  391. }()
  392. select {
  393. case <-ctx.Done():
  394. return nil, ctx.Err()
  395. case err := <-waitC:
  396. if err != nil {
  397. return nil, err
  398. }
  399. }
  400. if cc.dopts.scChan != nil && !scSet {
  401. // Blocking wait for the initial service config.
  402. select {
  403. case sc, ok := <-cc.dopts.scChan:
  404. if ok {
  405. cc.sc = sc
  406. }
  407. case <-ctx.Done():
  408. return nil, ctx.Err()
  409. }
  410. }
  411. if cc.dopts.scChan != nil {
  412. go cc.scWatcher()
  413. }
  414. return cc, nil
  415. }
  416. // ConnectivityState indicates the state of a client connection.
  417. type ConnectivityState int
  418. const (
  419. // Idle indicates the ClientConn is idle.
  420. Idle ConnectivityState = iota
  421. // Connecting indicates the ClienConn is connecting.
  422. Connecting
  423. // Ready indicates the ClientConn is ready for work.
  424. Ready
  425. // TransientFailure indicates the ClientConn has seen a failure but expects to recover.
  426. TransientFailure
  427. // Shutdown indicates the ClientConn has started shutting down.
  428. Shutdown
  429. )
  430. func (s ConnectivityState) String() string {
  431. switch s {
  432. case Idle:
  433. return "IDLE"
  434. case Connecting:
  435. return "CONNECTING"
  436. case Ready:
  437. return "READY"
  438. case TransientFailure:
  439. return "TRANSIENT_FAILURE"
  440. case Shutdown:
  441. return "SHUTDOWN"
  442. default:
  443. panic(fmt.Sprintf("unknown connectivity state: %d", s))
  444. }
  445. }
  446. // ClientConn represents a client connection to an RPC server.
  447. type ClientConn struct {
  448. ctx context.Context
  449. cancel context.CancelFunc
  450. target string
  451. authority string
  452. dopts dialOptions
  453. mu sync.RWMutex
  454. sc ServiceConfig
  455. conns map[Address]*addrConn
  456. // Keepalive parameter can be udated if a GoAway is received.
  457. mkp keepalive.ClientParameters
  458. }
  459. // lbWatcher watches the Notify channel of the balancer in cc and manages
  460. // connections accordingly. If doneChan is not nil, it is closed after the
  461. // first successfull connection is made.
  462. func (cc *ClientConn) lbWatcher(doneChan chan struct{}) {
  463. for addrs := range cc.dopts.balancer.Notify() {
  464. var (
  465. add []Address // Addresses need to setup connections.
  466. del []*addrConn // Connections need to tear down.
  467. )
  468. cc.mu.Lock()
  469. for _, a := range addrs {
  470. if _, ok := cc.conns[a]; !ok {
  471. add = append(add, a)
  472. }
  473. }
  474. for k, c := range cc.conns {
  475. var keep bool
  476. for _, a := range addrs {
  477. if k == a {
  478. keep = true
  479. break
  480. }
  481. }
  482. if !keep {
  483. del = append(del, c)
  484. delete(cc.conns, c.addr)
  485. }
  486. }
  487. cc.mu.Unlock()
  488. for _, a := range add {
  489. if doneChan != nil {
  490. err := cc.resetAddrConn(a, true, nil)
  491. if err == nil {
  492. close(doneChan)
  493. doneChan = nil
  494. }
  495. } else {
  496. cc.resetAddrConn(a, false, nil)
  497. }
  498. }
  499. for _, c := range del {
  500. c.tearDown(errConnDrain)
  501. }
  502. }
  503. }
  504. func (cc *ClientConn) scWatcher() {
  505. for {
  506. select {
  507. case sc, ok := <-cc.dopts.scChan:
  508. if !ok {
  509. return
  510. }
  511. cc.mu.Lock()
  512. // TODO: load balance policy runtime change is ignored.
  513. // We may revist this decision in the future.
  514. cc.sc = sc
  515. cc.mu.Unlock()
  516. case <-cc.ctx.Done():
  517. return
  518. }
  519. }
  520. }
  521. // resetAddrConn creates an addrConn for addr and adds it to cc.conns.
  522. // If there is an old addrConn for addr, it will be torn down, using tearDownErr as the reason.
  523. // If tearDownErr is nil, errConnDrain will be used instead.
  524. func (cc *ClientConn) resetAddrConn(addr Address, block bool, tearDownErr error) error {
  525. ac := &addrConn{
  526. cc: cc,
  527. addr: addr,
  528. dopts: cc.dopts,
  529. }
  530. cc.mu.RLock()
  531. ac.dopts.copts.KeepaliveParams = cc.mkp
  532. cc.mu.RUnlock()
  533. ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
  534. ac.stateCV = sync.NewCond(&ac.mu)
  535. if EnableTracing {
  536. ac.events = trace.NewEventLog("grpc.ClientConn", ac.addr.Addr)
  537. }
  538. if !ac.dopts.insecure {
  539. if ac.dopts.copts.TransportCredentials == nil {
  540. return errNoTransportSecurity
  541. }
  542. } else {
  543. if ac.dopts.copts.TransportCredentials != nil {
  544. return errCredentialsConflict
  545. }
  546. for _, cd := range ac.dopts.copts.PerRPCCredentials {
  547. if cd.RequireTransportSecurity() {
  548. return errTransportCredentialsMissing
  549. }
  550. }
  551. }
  552. // Track ac in cc. This needs to be done before any getTransport(...) is called.
  553. cc.mu.Lock()
  554. if cc.conns == nil {
  555. cc.mu.Unlock()
  556. return ErrClientConnClosing
  557. }
  558. stale := cc.conns[ac.addr]
  559. cc.conns[ac.addr] = ac
  560. cc.mu.Unlock()
  561. if stale != nil {
  562. // There is an addrConn alive on ac.addr already. This could be due to
  563. // 1) a buggy Balancer notifies duplicated Addresses;
  564. // 2) goaway was received, a new ac will replace the old ac.
  565. // The old ac should be deleted from cc.conns, but the
  566. // underlying transport should drain rather than close.
  567. if tearDownErr == nil {
  568. // tearDownErr is nil if resetAddrConn is called by
  569. // 1) Dial
  570. // 2) lbWatcher
  571. // In both cases, the stale ac should drain, not close.
  572. stale.tearDown(errConnDrain)
  573. } else {
  574. stale.tearDown(tearDownErr)
  575. }
  576. }
  577. if block {
  578. if err := ac.resetTransport(false); err != nil {
  579. if err != errConnClosing {
  580. // Tear down ac and delete it from cc.conns.
  581. cc.mu.Lock()
  582. delete(cc.conns, ac.addr)
  583. cc.mu.Unlock()
  584. ac.tearDown(err)
  585. }
  586. if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
  587. return e.Origin()
  588. }
  589. return err
  590. }
  591. // Start to monitor the error status of transport.
  592. go ac.transportMonitor()
  593. } else {
  594. // Start a goroutine connecting to the server asynchronously.
  595. go func() {
  596. if err := ac.resetTransport(false); err != nil {
  597. grpclog.Printf("Failed to dial %s: %v; please retry.", ac.addr.Addr, err)
  598. if err != errConnClosing {
  599. // Keep this ac in cc.conns, to get the reason it's torn down.
  600. ac.tearDown(err)
  601. }
  602. return
  603. }
  604. ac.transportMonitor()
  605. }()
  606. }
  607. return nil
  608. }
  609. // GetMethodConfig gets the method config of the input method.
  610. // If there's an exact match for input method (i.e. /service/method), we return
  611. // the corresponding MethodConfig.
  612. // If there isn't an exact match for the input method, we look for the default config
  613. // under the service (i.e /service/). If there is a default MethodConfig for
  614. // the serivce, we return it.
  615. // Otherwise, we return an empty MethodConfig.
  616. func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
  617. // TODO: Avoid the locking here.
  618. cc.mu.RLock()
  619. defer cc.mu.RUnlock()
  620. m, ok := cc.sc.Methods[method]
  621. if !ok {
  622. i := strings.LastIndex(method, "/")
  623. m, _ = cc.sc.Methods[method[:i+1]]
  624. }
  625. return m
  626. }
  627. func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) {
  628. var (
  629. ac *addrConn
  630. ok bool
  631. put func()
  632. )
  633. if cc.dopts.balancer == nil {
  634. // If balancer is nil, there should be only one addrConn available.
  635. cc.mu.RLock()
  636. if cc.conns == nil {
  637. cc.mu.RUnlock()
  638. return nil, nil, toRPCErr(ErrClientConnClosing)
  639. }
  640. for _, ac = range cc.conns {
  641. // Break after the first iteration to get the first addrConn.
  642. ok = true
  643. break
  644. }
  645. cc.mu.RUnlock()
  646. } else {
  647. var (
  648. addr Address
  649. err error
  650. )
  651. addr, put, err = cc.dopts.balancer.Get(ctx, opts)
  652. if err != nil {
  653. return nil, nil, toRPCErr(err)
  654. }
  655. cc.mu.RLock()
  656. if cc.conns == nil {
  657. cc.mu.RUnlock()
  658. return nil, nil, toRPCErr(ErrClientConnClosing)
  659. }
  660. ac, ok = cc.conns[addr]
  661. cc.mu.RUnlock()
  662. }
  663. if !ok {
  664. if put != nil {
  665. updateRPCInfoInContext(ctx, rpcInfo{bytesSent: false, bytesReceived: false})
  666. put()
  667. }
  668. return nil, nil, errConnClosing
  669. }
  670. t, err := ac.wait(ctx, cc.dopts.balancer != nil, !opts.BlockingWait)
  671. if err != nil {
  672. if put != nil {
  673. updateRPCInfoInContext(ctx, rpcInfo{bytesSent: false, bytesReceived: false})
  674. put()
  675. }
  676. return nil, nil, err
  677. }
  678. return t, put, nil
  679. }
  680. // Close tears down the ClientConn and all underlying connections.
  681. func (cc *ClientConn) Close() error {
  682. cc.cancel()
  683. cc.mu.Lock()
  684. if cc.conns == nil {
  685. cc.mu.Unlock()
  686. return ErrClientConnClosing
  687. }
  688. conns := cc.conns
  689. cc.conns = nil
  690. cc.mu.Unlock()
  691. if cc.dopts.balancer != nil {
  692. cc.dopts.balancer.Close()
  693. }
  694. for _, ac := range conns {
  695. ac.tearDown(ErrClientConnClosing)
  696. }
  697. return nil
  698. }
  699. // addrConn is a network connection to a given address.
  700. type addrConn struct {
  701. ctx context.Context
  702. cancel context.CancelFunc
  703. cc *ClientConn
  704. addr Address
  705. dopts dialOptions
  706. events trace.EventLog
  707. mu sync.Mutex
  708. state ConnectivityState
  709. stateCV *sync.Cond
  710. down func(error) // the handler called when a connection is down.
  711. // ready is closed and becomes nil when a new transport is up or failed
  712. // due to timeout.
  713. ready chan struct{}
  714. transport transport.ClientTransport
  715. // The reason this addrConn is torn down.
  716. tearDownErr error
  717. }
  718. // adjustParams updates parameters used to create transports upon
  719. // receiving a GoAway.
  720. func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
  721. switch r {
  722. case transport.TooManyPings:
  723. v := 2 * ac.dopts.copts.KeepaliveParams.Time
  724. ac.cc.mu.Lock()
  725. if v > ac.cc.mkp.Time {
  726. ac.cc.mkp.Time = v
  727. }
  728. ac.cc.mu.Unlock()
  729. }
  730. }
  731. // printf records an event in ac's event log, unless ac has been closed.
  732. // REQUIRES ac.mu is held.
  733. func (ac *addrConn) printf(format string, a ...interface{}) {
  734. if ac.events != nil {
  735. ac.events.Printf(format, a...)
  736. }
  737. }
  738. // errorf records an error in ac's event log, unless ac has been closed.
  739. // REQUIRES ac.mu is held.
  740. func (ac *addrConn) errorf(format string, a ...interface{}) {
  741. if ac.events != nil {
  742. ac.events.Errorf(format, a...)
  743. }
  744. }
  745. // getState returns the connectivity state of the Conn
  746. func (ac *addrConn) getState() ConnectivityState {
  747. ac.mu.Lock()
  748. defer ac.mu.Unlock()
  749. return ac.state
  750. }
  751. // waitForStateChange blocks until the state changes to something other than the sourceState.
  752. func (ac *addrConn) waitForStateChange(ctx context.Context, sourceState ConnectivityState) (ConnectivityState, error) {
  753. ac.mu.Lock()
  754. defer ac.mu.Unlock()
  755. if sourceState != ac.state {
  756. return ac.state, nil
  757. }
  758. done := make(chan struct{})
  759. var err error
  760. go func() {
  761. select {
  762. case <-ctx.Done():
  763. ac.mu.Lock()
  764. err = ctx.Err()
  765. ac.stateCV.Broadcast()
  766. ac.mu.Unlock()
  767. case <-done:
  768. }
  769. }()
  770. defer close(done)
  771. for sourceState == ac.state {
  772. ac.stateCV.Wait()
  773. if err != nil {
  774. return ac.state, err
  775. }
  776. }
  777. return ac.state, nil
  778. }
  779. func (ac *addrConn) resetTransport(closeTransport bool) error {
  780. for retries := 0; ; retries++ {
  781. ac.mu.Lock()
  782. ac.printf("connecting")
  783. if ac.state == Shutdown {
  784. // ac.tearDown(...) has been invoked.
  785. ac.mu.Unlock()
  786. return errConnClosing
  787. }
  788. if ac.down != nil {
  789. ac.down(downErrorf(false, true, "%v", errNetworkIO))
  790. ac.down = nil
  791. }
  792. ac.state = Connecting
  793. ac.stateCV.Broadcast()
  794. t := ac.transport
  795. ac.mu.Unlock()
  796. if closeTransport && t != nil {
  797. t.Close()
  798. }
  799. sleepTime := ac.dopts.bs.backoff(retries)
  800. timeout := minConnectTimeout
  801. if timeout < sleepTime {
  802. timeout = sleepTime
  803. }
  804. ctx, cancel := context.WithTimeout(ac.ctx, timeout)
  805. connectTime := time.Now()
  806. sinfo := transport.TargetInfo{
  807. Addr: ac.addr.Addr,
  808. Metadata: ac.addr.Metadata,
  809. }
  810. newTransport, err := transport.NewClientTransport(ctx, sinfo, ac.dopts.copts)
  811. // Don't call cancel in success path due to a race in Go 1.6:
  812. // https://github.com/golang/go/issues/15078.
  813. if err != nil {
  814. cancel()
  815. if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
  816. return err
  817. }
  818. grpclog.Printf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %v", err, ac.addr)
  819. ac.mu.Lock()
  820. if ac.state == Shutdown {
  821. // ac.tearDown(...) has been invoked.
  822. ac.mu.Unlock()
  823. return errConnClosing
  824. }
  825. ac.errorf("transient failure: %v", err)
  826. ac.state = TransientFailure
  827. ac.stateCV.Broadcast()
  828. if ac.ready != nil {
  829. close(ac.ready)
  830. ac.ready = nil
  831. }
  832. ac.mu.Unlock()
  833. closeTransport = false
  834. timer := time.NewTimer(sleepTime - time.Since(connectTime))
  835. select {
  836. case <-timer.C:
  837. case <-ac.ctx.Done():
  838. timer.Stop()
  839. return ac.ctx.Err()
  840. }
  841. timer.Stop()
  842. continue
  843. }
  844. ac.mu.Lock()
  845. ac.printf("ready")
  846. if ac.state == Shutdown {
  847. // ac.tearDown(...) has been invoked.
  848. ac.mu.Unlock()
  849. newTransport.Close()
  850. return errConnClosing
  851. }
  852. ac.state = Ready
  853. ac.stateCV.Broadcast()
  854. ac.transport = newTransport
  855. if ac.ready != nil {
  856. close(ac.ready)
  857. ac.ready = nil
  858. }
  859. if ac.cc.dopts.balancer != nil {
  860. ac.down = ac.cc.dopts.balancer.Up(ac.addr)
  861. }
  862. ac.mu.Unlock()
  863. return nil
  864. }
  865. }
  866. // Run in a goroutine to track the error in transport and create the
  867. // new transport if an error happens. It returns when the channel is closing.
  868. func (ac *addrConn) transportMonitor() {
  869. for {
  870. ac.mu.Lock()
  871. t := ac.transport
  872. ac.mu.Unlock()
  873. select {
  874. // This is needed to detect the teardown when
  875. // the addrConn is idle (i.e., no RPC in flight).
  876. case <-ac.ctx.Done():
  877. select {
  878. case <-t.Error():
  879. t.Close()
  880. default:
  881. }
  882. return
  883. case <-t.GoAway():
  884. ac.adjustParams(t.GetGoAwayReason())
  885. // If GoAway happens without any network I/O error, ac is closed without shutting down the
  886. // underlying transport (the transport will be closed when all the pending RPCs finished or
  887. // failed.).
  888. // If GoAway and some network I/O error happen concurrently, ac and its underlying transport
  889. // are closed.
  890. // In both cases, a new ac is created.
  891. select {
  892. case <-t.Error():
  893. ac.cc.resetAddrConn(ac.addr, false, errNetworkIO)
  894. default:
  895. ac.cc.resetAddrConn(ac.addr, false, errConnDrain)
  896. }
  897. return
  898. case <-t.Error():
  899. select {
  900. case <-ac.ctx.Done():
  901. t.Close()
  902. return
  903. case <-t.GoAway():
  904. ac.adjustParams(t.GetGoAwayReason())
  905. ac.cc.resetAddrConn(ac.addr, false, errNetworkIO)
  906. return
  907. default:
  908. }
  909. ac.mu.Lock()
  910. if ac.state == Shutdown {
  911. // ac has been shutdown.
  912. ac.mu.Unlock()
  913. return
  914. }
  915. ac.state = TransientFailure
  916. ac.stateCV.Broadcast()
  917. ac.mu.Unlock()
  918. if err := ac.resetTransport(true); err != nil {
  919. ac.mu.Lock()
  920. ac.printf("transport exiting: %v", err)
  921. ac.mu.Unlock()
  922. grpclog.Printf("grpc: addrConn.transportMonitor exits due to: %v", err)
  923. if err != errConnClosing {
  924. // Keep this ac in cc.conns, to get the reason it's torn down.
  925. ac.tearDown(err)
  926. }
  927. return
  928. }
  929. }
  930. }
  931. }
  932. // wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or
  933. // iv) transport is in TransientFailure and there is a balancer/failfast is true.
  934. func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (transport.ClientTransport, error) {
  935. for {
  936. ac.mu.Lock()
  937. switch {
  938. case ac.state == Shutdown:
  939. if failfast || !hasBalancer {
  940. // RPC is failfast or balancer is nil. This RPC should fail with ac.tearDownErr.
  941. err := ac.tearDownErr
  942. ac.mu.Unlock()
  943. return nil, err
  944. }
  945. ac.mu.Unlock()
  946. return nil, errConnClosing
  947. case ac.state == Ready:
  948. ct := ac.transport
  949. ac.mu.Unlock()
  950. return ct, nil
  951. case ac.state == TransientFailure:
  952. if failfast || hasBalancer {
  953. ac.mu.Unlock()
  954. return nil, errConnUnavailable
  955. }
  956. }
  957. ready := ac.ready
  958. if ready == nil {
  959. ready = make(chan struct{})
  960. ac.ready = ready
  961. }
  962. ac.mu.Unlock()
  963. select {
  964. case <-ctx.Done():
  965. return nil, toRPCErr(ctx.Err())
  966. // Wait until the new transport is ready or failed.
  967. case <-ready:
  968. }
  969. }
  970. }
  971. // tearDown starts to tear down the addrConn.
  972. // TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
  973. // some edge cases (e.g., the caller opens and closes many addrConn's in a
  974. // tight loop.
  975. // tearDown doesn't remove ac from ac.cc.conns.
  976. func (ac *addrConn) tearDown(err error) {
  977. ac.cancel()
  978. ac.mu.Lock()
  979. defer ac.mu.Unlock()
  980. if ac.down != nil {
  981. ac.down(downErrorf(false, false, "%v", err))
  982. ac.down = nil
  983. }
  984. if err == errConnDrain && ac.transport != nil {
  985. // GracefulClose(...) may be executed multiple times when
  986. // i) receiving multiple GoAway frames from the server; or
  987. // ii) there are concurrent name resolver/Balancer triggered
  988. // address removal and GoAway.
  989. ac.transport.GracefulClose()
  990. }
  991. if ac.state == Shutdown {
  992. return
  993. }
  994. ac.state = Shutdown
  995. ac.tearDownErr = err
  996. ac.stateCV.Broadcast()
  997. if ac.events != nil {
  998. ac.events.Finish()
  999. ac.events = nil
  1000. }
  1001. if ac.ready != nil {
  1002. close(ac.ready)
  1003. ac.ready = nil
  1004. }
  1005. if ac.transport != nil && err != errConnDrain {
  1006. ac.transport.Close()
  1007. }
  1008. return
  1009. }