clientconn.go 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "errors"
  21. "fmt"
  22. "math"
  23. "net"
  24. "reflect"
  25. "strings"
  26. "sync"
  27. "time"
  28. "golang.org/x/net/context"
  29. "golang.org/x/net/trace"
  30. "google.golang.org/grpc/balancer"
  31. "google.golang.org/grpc/connectivity"
  32. "google.golang.org/grpc/credentials"
  33. "google.golang.org/grpc/grpclog"
  34. "google.golang.org/grpc/keepalive"
  35. "google.golang.org/grpc/resolver"
  36. "google.golang.org/grpc/stats"
  37. "google.golang.org/grpc/transport"
  38. )
  39. var (
  40. // ErrClientConnClosing indicates that the operation is illegal because
  41. // the ClientConn is closing.
  42. ErrClientConnClosing = errors.New("grpc: the client connection is closing")
  43. // ErrClientConnTimeout indicates that the ClientConn cannot establish the
  44. // underlying connections within the specified timeout.
  45. // DEPRECATED: Please use context.DeadlineExceeded instead.
  46. ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
  47. // errNoTransportSecurity indicates that there is no transport security
  48. // being set for ClientConn. Users should either set one or explicitly
  49. // call WithInsecure DialOption to disable security.
  50. errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")
  51. // errTransportCredentialsMissing indicates that users want to transmit security
  52. // information (e.g., oauth2 token) which requires secure connection on an insecure
  53. // connection.
  54. errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
  55. // errCredentialsConflict indicates that grpc.WithTransportCredentials()
  56. // and grpc.WithInsecure() are both called for a connection.
  57. errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
  58. // errNetworkIO indicates that the connection is down due to some network I/O error.
  59. errNetworkIO = errors.New("grpc: failed with network I/O error")
  60. // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
  61. errConnDrain = errors.New("grpc: the connection is drained")
  62. // errConnClosing indicates that the connection is closing.
  63. errConnClosing = errors.New("grpc: the connection is closing")
  64. // errConnUnavailable indicates that the connection is unavailable.
  65. errConnUnavailable = errors.New("grpc: the connection is unavailable")
  66. // errBalancerClosed indicates that the balancer is closed.
  67. errBalancerClosed = errors.New("grpc: balancer is closed")
  68. // minimum time to give a connection to complete
  69. minConnectTimeout = 20 * time.Second
  70. )
  71. // dialOptions configure a Dial call. dialOptions are set by the DialOption
  72. // values passed to Dial.
  73. type dialOptions struct {
  74. unaryInt UnaryClientInterceptor
  75. streamInt StreamClientInterceptor
  76. codec Codec
  77. cp Compressor
  78. dc Decompressor
  79. bs backoffStrategy
  80. block bool
  81. insecure bool
  82. timeout time.Duration
  83. scChan <-chan ServiceConfig
  84. copts transport.ConnectOptions
  85. callOptions []CallOption
  86. // This is to support v1 balancer.
  87. balancerBuilder balancer.Builder
  88. }
  89. const (
  90. defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
  91. defaultClientMaxSendMessageSize = math.MaxInt32
  92. )
  93. // DialOption configures how we set up the connection.
  94. type DialOption func(*dialOptions)
  95. // WithWriteBufferSize lets you set the size of write buffer, this determines how much data can be batched
  96. // before doing a write on the wire.
  97. func WithWriteBufferSize(s int) DialOption {
  98. return func(o *dialOptions) {
  99. o.copts.WriteBufferSize = s
  100. }
  101. }
  102. // WithReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
  103. // for each read syscall.
  104. func WithReadBufferSize(s int) DialOption {
  105. return func(o *dialOptions) {
  106. o.copts.ReadBufferSize = s
  107. }
  108. }
  109. // WithInitialWindowSize returns a DialOption which sets the value for initial window size on a stream.
  110. // The lower bound for window size is 64K and any value smaller than that will be ignored.
  111. func WithInitialWindowSize(s int32) DialOption {
  112. return func(o *dialOptions) {
  113. o.copts.InitialWindowSize = s
  114. }
  115. }
  116. // WithInitialConnWindowSize returns a DialOption which sets the value for initial window size on a connection.
  117. // The lower bound for window size is 64K and any value smaller than that will be ignored.
  118. func WithInitialConnWindowSize(s int32) DialOption {
  119. return func(o *dialOptions) {
  120. o.copts.InitialConnWindowSize = s
  121. }
  122. }
  123. // WithMaxMsgSize returns a DialOption which sets the maximum message size the client can receive. Deprecated: use WithDefaultCallOptions(MaxCallRecvMsgSize(s)) instead.
  124. func WithMaxMsgSize(s int) DialOption {
  125. return WithDefaultCallOptions(MaxCallRecvMsgSize(s))
  126. }
  127. // WithDefaultCallOptions returns a DialOption which sets the default CallOptions for calls over the connection.
  128. func WithDefaultCallOptions(cos ...CallOption) DialOption {
  129. return func(o *dialOptions) {
  130. o.callOptions = append(o.callOptions, cos...)
  131. }
  132. }
  133. // WithCodec returns a DialOption which sets a codec for message marshaling and unmarshaling.
  134. func WithCodec(c Codec) DialOption {
  135. return func(o *dialOptions) {
  136. o.codec = c
  137. }
  138. }
  139. // WithCompressor returns a DialOption which sets a CompressorGenerator for generating message
  140. // compressor.
  141. func WithCompressor(cp Compressor) DialOption {
  142. return func(o *dialOptions) {
  143. o.cp = cp
  144. }
  145. }
  146. // WithDecompressor returns a DialOption which sets a DecompressorGenerator for generating
  147. // message decompressor.
  148. func WithDecompressor(dc Decompressor) DialOption {
  149. return func(o *dialOptions) {
  150. o.dc = dc
  151. }
  152. }
  153. // WithBalancer returns a DialOption which sets a load balancer with the v1 API.
  154. // Name resolver will be ignored if this DialOption is specified.
  155. // Deprecated: use the new balancer APIs in balancer package instead.
  156. func WithBalancer(b Balancer) DialOption {
  157. return func(o *dialOptions) {
  158. o.balancerBuilder = &balancerWrapperBuilder{
  159. b: b,
  160. }
  161. }
  162. }
  163. // WithBalancerBuilder is for testing only. Users using custom balancers should
  164. // register their balancer and use service config to choose the balancer to use.
  165. func WithBalancerBuilder(b balancer.Builder) DialOption {
  166. // TODO(bar) remove this when switching balancer is done.
  167. return func(o *dialOptions) {
  168. o.balancerBuilder = b
  169. }
  170. }
  171. // WithServiceConfig returns a DialOption which has a channel to read the service configuration.
  172. func WithServiceConfig(c <-chan ServiceConfig) DialOption {
  173. return func(o *dialOptions) {
  174. o.scChan = c
  175. }
  176. }
  177. // WithBackoffMaxDelay configures the dialer to use the provided maximum delay
  178. // when backing off after failed connection attempts.
  179. func WithBackoffMaxDelay(md time.Duration) DialOption {
  180. return WithBackoffConfig(BackoffConfig{MaxDelay: md})
  181. }
  182. // WithBackoffConfig configures the dialer to use the provided backoff
  183. // parameters after connection failures.
  184. //
  185. // Use WithBackoffMaxDelay until more parameters on BackoffConfig are opened up
  186. // for use.
  187. func WithBackoffConfig(b BackoffConfig) DialOption {
  188. // Set defaults to ensure that provided BackoffConfig is valid and
  189. // unexported fields get default values.
  190. setDefaults(&b)
  191. return withBackoff(b)
  192. }
  193. // withBackoff sets the backoff strategy used for retries after a
  194. // failed connection attempt.
  195. //
  196. // This can be exported if arbitrary backoff strategies are allowed by gRPC.
  197. func withBackoff(bs backoffStrategy) DialOption {
  198. return func(o *dialOptions) {
  199. o.bs = bs
  200. }
  201. }
  202. // WithBlock returns a DialOption which makes caller of Dial blocks until the underlying
  203. // connection is up. Without this, Dial returns immediately and connecting the server
  204. // happens in background.
  205. func WithBlock() DialOption {
  206. return func(o *dialOptions) {
  207. o.block = true
  208. }
  209. }
  210. // WithInsecure returns a DialOption which disables transport security for this ClientConn.
  211. // Note that transport security is required unless WithInsecure is set.
  212. func WithInsecure() DialOption {
  213. return func(o *dialOptions) {
  214. o.insecure = true
  215. }
  216. }
  217. // WithTransportCredentials returns a DialOption which configures a
  218. // connection level security credentials (e.g., TLS/SSL).
  219. func WithTransportCredentials(creds credentials.TransportCredentials) DialOption {
  220. return func(o *dialOptions) {
  221. o.copts.TransportCredentials = creds
  222. }
  223. }
  224. // WithPerRPCCredentials returns a DialOption which sets
  225. // credentials and places auth state on each outbound RPC.
  226. func WithPerRPCCredentials(creds credentials.PerRPCCredentials) DialOption {
  227. return func(o *dialOptions) {
  228. o.copts.PerRPCCredentials = append(o.copts.PerRPCCredentials, creds)
  229. }
  230. }
  231. // WithTimeout returns a DialOption that configures a timeout for dialing a ClientConn
  232. // initially. This is valid if and only if WithBlock() is present.
  233. // Deprecated: use DialContext and context.WithTimeout instead.
  234. func WithTimeout(d time.Duration) DialOption {
  235. return func(o *dialOptions) {
  236. o.timeout = d
  237. }
  238. }
  239. // WithDialer returns a DialOption that specifies a function to use for dialing network addresses.
  240. // If FailOnNonTempDialError() is set to true, and an error is returned by f, gRPC checks the error's
  241. // Temporary() method to decide if it should try to reconnect to the network address.
  242. func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption {
  243. return func(o *dialOptions) {
  244. o.copts.Dialer = func(ctx context.Context, addr string) (net.Conn, error) {
  245. if deadline, ok := ctx.Deadline(); ok {
  246. return f(addr, deadline.Sub(time.Now()))
  247. }
  248. return f(addr, 0)
  249. }
  250. }
  251. }
  252. // WithStatsHandler returns a DialOption that specifies the stats handler
  253. // for all the RPCs and underlying network connections in this ClientConn.
  254. func WithStatsHandler(h stats.Handler) DialOption {
  255. return func(o *dialOptions) {
  256. o.copts.StatsHandler = h
  257. }
  258. }
  259. // FailOnNonTempDialError returns a DialOption that specifies if gRPC fails on non-temporary dial errors.
  260. // If f is true, and dialer returns a non-temporary error, gRPC will fail the connection to the network
  261. // address and won't try to reconnect.
  262. // The default value of FailOnNonTempDialError is false.
  263. // This is an EXPERIMENTAL API.
  264. func FailOnNonTempDialError(f bool) DialOption {
  265. return func(o *dialOptions) {
  266. o.copts.FailOnNonTempDialError = f
  267. }
  268. }
  269. // WithUserAgent returns a DialOption that specifies a user agent string for all the RPCs.
  270. func WithUserAgent(s string) DialOption {
  271. return func(o *dialOptions) {
  272. o.copts.UserAgent = s
  273. }
  274. }
  275. // WithKeepaliveParams returns a DialOption that specifies keepalive parameters for the client transport.
  276. func WithKeepaliveParams(kp keepalive.ClientParameters) DialOption {
  277. return func(o *dialOptions) {
  278. o.copts.KeepaliveParams = kp
  279. }
  280. }
  281. // WithUnaryInterceptor returns a DialOption that specifies the interceptor for unary RPCs.
  282. func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption {
  283. return func(o *dialOptions) {
  284. o.unaryInt = f
  285. }
  286. }
  287. // WithStreamInterceptor returns a DialOption that specifies the interceptor for streaming RPCs.
  288. func WithStreamInterceptor(f StreamClientInterceptor) DialOption {
  289. return func(o *dialOptions) {
  290. o.streamInt = f
  291. }
  292. }
  293. // WithAuthority returns a DialOption that specifies the value to be used as
  294. // the :authority pseudo-header. This value only works with WithInsecure and
  295. // has no effect if TransportCredentials are present.
  296. func WithAuthority(a string) DialOption {
  297. return func(o *dialOptions) {
  298. o.copts.Authority = a
  299. }
  300. }
  301. // Dial creates a client connection to the given target.
  302. func Dial(target string, opts ...DialOption) (*ClientConn, error) {
  303. return DialContext(context.Background(), target, opts...)
  304. }
  305. // DialContext creates a client connection to the given target. ctx can be used to
  306. // cancel or expire the pending connection. Once this function returns, the
  307. // cancellation and expiration of ctx will be noop. Users should call ClientConn.Close
  308. // to terminate all the pending operations after this function returns.
  309. func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
  310. cc := &ClientConn{
  311. target: target,
  312. csMgr: &connectivityStateManager{},
  313. conns: make(map[*addrConn]struct{}),
  314. blockingpicker: newPickerWrapper(),
  315. }
  316. cc.ctx, cc.cancel = context.WithCancel(context.Background())
  317. for _, opt := range opts {
  318. opt(&cc.dopts)
  319. }
  320. if !cc.dopts.insecure {
  321. if cc.dopts.copts.TransportCredentials == nil {
  322. return nil, errNoTransportSecurity
  323. }
  324. } else {
  325. if cc.dopts.copts.TransportCredentials != nil {
  326. return nil, errCredentialsConflict
  327. }
  328. for _, cd := range cc.dopts.copts.PerRPCCredentials {
  329. if cd.RequireTransportSecurity() {
  330. return nil, errTransportCredentialsMissing
  331. }
  332. }
  333. }
  334. cc.mkp = cc.dopts.copts.KeepaliveParams
  335. if cc.dopts.copts.Dialer == nil {
  336. cc.dopts.copts.Dialer = newProxyDialer(
  337. func(ctx context.Context, addr string) (net.Conn, error) {
  338. return (&net.Dialer{}).DialContext(ctx, "tcp", addr)
  339. },
  340. )
  341. }
  342. if cc.dopts.copts.UserAgent != "" {
  343. cc.dopts.copts.UserAgent += " " + grpcUA
  344. } else {
  345. cc.dopts.copts.UserAgent = grpcUA
  346. }
  347. if cc.dopts.timeout > 0 {
  348. var cancel context.CancelFunc
  349. ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
  350. defer cancel()
  351. }
  352. defer func() {
  353. select {
  354. case <-ctx.Done():
  355. conn, err = nil, ctx.Err()
  356. default:
  357. }
  358. if err != nil {
  359. cc.Close()
  360. }
  361. }()
  362. scSet := false
  363. if cc.dopts.scChan != nil {
  364. // Try to get an initial service config.
  365. select {
  366. case sc, ok := <-cc.dopts.scChan:
  367. if ok {
  368. cc.sc = sc
  369. scSet = true
  370. }
  371. default:
  372. }
  373. }
  374. // Set defaults.
  375. if cc.dopts.codec == nil {
  376. cc.dopts.codec = protoCodec{}
  377. }
  378. if cc.dopts.bs == nil {
  379. cc.dopts.bs = DefaultBackoffConfig
  380. }
  381. creds := cc.dopts.copts.TransportCredentials
  382. if creds != nil && creds.Info().ServerName != "" {
  383. cc.authority = creds.Info().ServerName
  384. } else if cc.dopts.insecure && cc.dopts.copts.Authority != "" {
  385. cc.authority = cc.dopts.copts.Authority
  386. } else {
  387. cc.authority = target
  388. }
  389. if cc.dopts.balancerBuilder != nil {
  390. var credsClone credentials.TransportCredentials
  391. if creds != nil {
  392. credsClone = creds.Clone()
  393. }
  394. buildOpts := balancer.BuildOptions{
  395. DialCreds: credsClone,
  396. Dialer: cc.dopts.copts.Dialer,
  397. }
  398. // Build should not take long time. So it's ok to not have a goroutine for it.
  399. // TODO(bar) init balancer after first resolver result to support service config balancer.
  400. cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, buildOpts)
  401. } else {
  402. waitC := make(chan error, 1)
  403. go func() {
  404. defer close(waitC)
  405. // No balancer, or no resolver within the balancer. Connect directly.
  406. ac, err := cc.newAddrConn([]resolver.Address{{Addr: target}})
  407. if err != nil {
  408. waitC <- err
  409. return
  410. }
  411. if err := ac.connect(cc.dopts.block); err != nil {
  412. waitC <- err
  413. return
  414. }
  415. }()
  416. select {
  417. case <-ctx.Done():
  418. return nil, ctx.Err()
  419. case err := <-waitC:
  420. if err != nil {
  421. return nil, err
  422. }
  423. }
  424. }
  425. if cc.dopts.scChan != nil && !scSet {
  426. // Blocking wait for the initial service config.
  427. select {
  428. case sc, ok := <-cc.dopts.scChan:
  429. if ok {
  430. cc.sc = sc
  431. }
  432. case <-ctx.Done():
  433. return nil, ctx.Err()
  434. }
  435. }
  436. if cc.dopts.scChan != nil {
  437. go cc.scWatcher()
  438. }
  439. // Build the resolver.
  440. cc.resolverWrapper, err = newCCResolverWrapper(cc)
  441. if err != nil {
  442. return nil, fmt.Errorf("failed to build resolver: %v", err)
  443. }
  444. if cc.balancerWrapper != nil && cc.resolverWrapper == nil {
  445. // TODO(bar) there should always be a resolver (DNS as the default).
  446. // Unblock balancer initialization with a fake resolver update if there's no resolver.
  447. // The balancer wrapper will not read the addresses, so an empty list works.
  448. // TODO(bar) remove this after the real resolver is started.
  449. cc.balancerWrapper.handleResolvedAddrs([]resolver.Address{}, nil)
  450. }
  451. // A blocking dial blocks until the clientConn is ready.
  452. if cc.dopts.block {
  453. for {
  454. s := cc.GetState()
  455. if s == connectivity.Ready {
  456. break
  457. }
  458. if !cc.WaitForStateChange(ctx, s) {
  459. // ctx got timeout or canceled.
  460. return nil, ctx.Err()
  461. }
  462. }
  463. }
  464. return cc, nil
  465. }
  466. // connectivityStateManager keeps the connectivity.State of ClientConn.
  467. // This struct will eventually be exported so the balancers can access it.
  468. type connectivityStateManager struct {
  469. mu sync.Mutex
  470. state connectivity.State
  471. notifyChan chan struct{}
  472. }
  473. // updateState updates the connectivity.State of ClientConn.
  474. // If there's a change it notifies goroutines waiting on state change to
  475. // happen.
  476. func (csm *connectivityStateManager) updateState(state connectivity.State) {
  477. csm.mu.Lock()
  478. defer csm.mu.Unlock()
  479. if csm.state == connectivity.Shutdown {
  480. return
  481. }
  482. if csm.state == state {
  483. return
  484. }
  485. csm.state = state
  486. if csm.notifyChan != nil {
  487. // There are other goroutines waiting on this channel.
  488. close(csm.notifyChan)
  489. csm.notifyChan = nil
  490. }
  491. }
  492. func (csm *connectivityStateManager) getState() connectivity.State {
  493. csm.mu.Lock()
  494. defer csm.mu.Unlock()
  495. return csm.state
  496. }
  497. func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {
  498. csm.mu.Lock()
  499. defer csm.mu.Unlock()
  500. if csm.notifyChan == nil {
  501. csm.notifyChan = make(chan struct{})
  502. }
  503. return csm.notifyChan
  504. }
  505. // ClientConn represents a client connection to an RPC server.
  506. type ClientConn struct {
  507. ctx context.Context
  508. cancel context.CancelFunc
  509. target string
  510. authority string
  511. dopts dialOptions
  512. csMgr *connectivityStateManager
  513. balancerWrapper *ccBalancerWrapper
  514. resolverWrapper *ccResolverWrapper
  515. blockingpicker *pickerWrapper
  516. mu sync.RWMutex
  517. sc ServiceConfig
  518. conns map[*addrConn]struct{}
  519. // Keepalive parameter can be updated if a GoAway is received.
  520. mkp keepalive.ClientParameters
  521. }
  522. // WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
  523. // ctx expires. A true value is returned in former case and false in latter.
  524. // This is an EXPERIMENTAL API.
  525. func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {
  526. ch := cc.csMgr.getNotifyChan()
  527. if cc.csMgr.getState() != sourceState {
  528. return true
  529. }
  530. select {
  531. case <-ctx.Done():
  532. return false
  533. case <-ch:
  534. return true
  535. }
  536. }
  537. // GetState returns the connectivity.State of ClientConn.
  538. // This is an EXPERIMENTAL API.
  539. func (cc *ClientConn) GetState() connectivity.State {
  540. return cc.csMgr.getState()
  541. }
  542. func (cc *ClientConn) scWatcher() {
  543. for {
  544. select {
  545. case sc, ok := <-cc.dopts.scChan:
  546. if !ok {
  547. return
  548. }
  549. cc.mu.Lock()
  550. // TODO: load balance policy runtime change is ignored.
  551. // We may revist this decision in the future.
  552. cc.sc = sc
  553. cc.mu.Unlock()
  554. case <-cc.ctx.Done():
  555. return
  556. }
  557. }
  558. }
  559. // newAddrConn creates an addrConn for addrs and adds it to cc.conns.
  560. func (cc *ClientConn) newAddrConn(addrs []resolver.Address) (*addrConn, error) {
  561. ac := &addrConn{
  562. cc: cc,
  563. addrs: addrs,
  564. dopts: cc.dopts,
  565. }
  566. ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
  567. // Track ac in cc. This needs to be done before any getTransport(...) is called.
  568. cc.mu.Lock()
  569. if cc.conns == nil {
  570. cc.mu.Unlock()
  571. return nil, ErrClientConnClosing
  572. }
  573. cc.conns[ac] = struct{}{}
  574. cc.mu.Unlock()
  575. return ac, nil
  576. }
  577. // removeAddrConn removes the addrConn in the subConn from clientConn.
  578. // It also tears down the ac with the given error.
  579. func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) {
  580. cc.mu.Lock()
  581. if cc.conns == nil {
  582. cc.mu.Unlock()
  583. return
  584. }
  585. delete(cc.conns, ac)
  586. cc.mu.Unlock()
  587. ac.tearDown(err)
  588. }
  589. // connect starts to creating transport and also starts the transport monitor
  590. // goroutine for this ac.
  591. // It does nothing if the ac is not IDLE.
  592. // TODO(bar) Move this to the addrConn section.
  593. // This was part of resetAddrConn, keep it here to make the diff look clean.
  594. func (ac *addrConn) connect(block bool) error {
  595. ac.mu.Lock()
  596. if ac.state == connectivity.Shutdown {
  597. ac.mu.Unlock()
  598. return errConnClosing
  599. }
  600. if ac.state != connectivity.Idle {
  601. ac.mu.Unlock()
  602. return nil
  603. }
  604. ac.state = connectivity.Connecting
  605. if ac.cc.balancerWrapper != nil {
  606. ac.cc.balancerWrapper.handleSubConnStateChange(ac.acbw, ac.state)
  607. } else {
  608. ac.cc.csMgr.updateState(ac.state)
  609. }
  610. ac.mu.Unlock()
  611. if block {
  612. if err := ac.resetTransport(); err != nil {
  613. if err != errConnClosing {
  614. ac.tearDown(err)
  615. }
  616. if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
  617. return e.Origin()
  618. }
  619. return err
  620. }
  621. // Start to monitor the error status of transport.
  622. go ac.transportMonitor()
  623. } else {
  624. // Start a goroutine connecting to the server asynchronously.
  625. go func() {
  626. if err := ac.resetTransport(); err != nil {
  627. grpclog.Warningf("Failed to dial %s: %v; please retry.", ac.addrs[0].Addr, err)
  628. if err != errConnClosing {
  629. // Keep this ac in cc.conns, to get the reason it's torn down.
  630. ac.tearDown(err)
  631. }
  632. return
  633. }
  634. ac.transportMonitor()
  635. }()
  636. }
  637. return nil
  638. }
  639. // tryUpdateAddrs tries to update ac.addrs with the new addresses list.
  640. //
  641. // It checks whether current connected address of ac is in the new addrs list.
  642. // - If true, it updates ac.addrs and returns true. The ac will keep using
  643. // the existing connection.
  644. // - If false, it does nothing and returns false.
  645. func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
  646. ac.mu.Lock()
  647. defer ac.mu.Unlock()
  648. grpclog.Infof("addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)
  649. if ac.state == connectivity.Shutdown {
  650. ac.addrs = addrs
  651. return true
  652. }
  653. var curAddrFound bool
  654. for _, a := range addrs {
  655. if reflect.DeepEqual(ac.curAddr, a) {
  656. curAddrFound = true
  657. break
  658. }
  659. }
  660. grpclog.Infof("addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
  661. if curAddrFound {
  662. ac.addrs = addrs
  663. }
  664. return curAddrFound
  665. }
  666. // GetMethodConfig gets the method config of the input method.
  667. // If there's an exact match for input method (i.e. /service/method), we return
  668. // the corresponding MethodConfig.
  669. // If there isn't an exact match for the input method, we look for the default config
  670. // under the service (i.e /service/). If there is a default MethodConfig for
  671. // the serivce, we return it.
  672. // Otherwise, we return an empty MethodConfig.
  673. func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
  674. // TODO: Avoid the locking here.
  675. cc.mu.RLock()
  676. defer cc.mu.RUnlock()
  677. m, ok := cc.sc.Methods[method]
  678. if !ok {
  679. i := strings.LastIndex(method, "/")
  680. m, _ = cc.sc.Methods[method[:i+1]]
  681. }
  682. return m
  683. }
  684. func (cc *ClientConn) getTransport(ctx context.Context, failfast bool) (transport.ClientTransport, func(balancer.DoneInfo), error) {
  685. if cc.balancerWrapper == nil {
  686. // If balancer is nil, there should be only one addrConn available.
  687. cc.mu.RLock()
  688. if cc.conns == nil {
  689. cc.mu.RUnlock()
  690. // TODO this function returns toRPCErr and non-toRPCErr. Clean up
  691. // the errors in ClientConn.
  692. return nil, nil, toRPCErr(ErrClientConnClosing)
  693. }
  694. var ac *addrConn
  695. for ac = range cc.conns {
  696. // Break after the first iteration to get the first addrConn.
  697. break
  698. }
  699. cc.mu.RUnlock()
  700. if ac == nil {
  701. return nil, nil, errConnClosing
  702. }
  703. t, err := ac.wait(ctx, false /*hasBalancer*/, failfast)
  704. if err != nil {
  705. return nil, nil, err
  706. }
  707. return t, nil, nil
  708. }
  709. t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickOptions{})
  710. if err != nil {
  711. return nil, nil, toRPCErr(err)
  712. }
  713. return t, done, nil
  714. }
  715. // Close tears down the ClientConn and all underlying connections.
  716. func (cc *ClientConn) Close() error {
  717. cc.cancel()
  718. cc.mu.Lock()
  719. if cc.conns == nil {
  720. cc.mu.Unlock()
  721. return ErrClientConnClosing
  722. }
  723. conns := cc.conns
  724. cc.conns = nil
  725. cc.csMgr.updateState(connectivity.Shutdown)
  726. cc.mu.Unlock()
  727. cc.blockingpicker.close()
  728. if cc.resolverWrapper != nil {
  729. cc.resolverWrapper.close()
  730. }
  731. if cc.balancerWrapper != nil {
  732. cc.balancerWrapper.close()
  733. }
  734. for ac := range conns {
  735. ac.tearDown(ErrClientConnClosing)
  736. }
  737. return nil
  738. }
  739. // addrConn is a network connection to a given address.
  740. type addrConn struct {
  741. ctx context.Context
  742. cancel context.CancelFunc
  743. cc *ClientConn
  744. curAddr resolver.Address
  745. addrs []resolver.Address
  746. dopts dialOptions
  747. events trace.EventLog
  748. acbw balancer.SubConn
  749. mu sync.Mutex
  750. state connectivity.State
  751. // ready is closed and becomes nil when a new transport is up or failed
  752. // due to timeout.
  753. ready chan struct{}
  754. transport transport.ClientTransport
  755. // The reason this addrConn is torn down.
  756. tearDownErr error
  757. }
  758. // adjustParams updates parameters used to create transports upon
  759. // receiving a GoAway.
  760. func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
  761. switch r {
  762. case transport.TooManyPings:
  763. v := 2 * ac.dopts.copts.KeepaliveParams.Time
  764. ac.cc.mu.Lock()
  765. if v > ac.cc.mkp.Time {
  766. ac.cc.mkp.Time = v
  767. }
  768. ac.cc.mu.Unlock()
  769. }
  770. }
  771. // printf records an event in ac's event log, unless ac has been closed.
  772. // REQUIRES ac.mu is held.
  773. func (ac *addrConn) printf(format string, a ...interface{}) {
  774. if ac.events != nil {
  775. ac.events.Printf(format, a...)
  776. }
  777. }
  778. // errorf records an error in ac's event log, unless ac has been closed.
  779. // REQUIRES ac.mu is held.
  780. func (ac *addrConn) errorf(format string, a ...interface{}) {
  781. if ac.events != nil {
  782. ac.events.Errorf(format, a...)
  783. }
  784. }
  785. // resetTransport recreates a transport to the address for ac. The old
  786. // transport will close itself on error or when the clientconn is closed.
  787. //
  788. // TODO(bar) make sure all state transitions are valid.
  789. func (ac *addrConn) resetTransport() error {
  790. ac.mu.Lock()
  791. if ac.state == connectivity.Shutdown {
  792. ac.mu.Unlock()
  793. return errConnClosing
  794. }
  795. if ac.ready != nil {
  796. close(ac.ready)
  797. ac.ready = nil
  798. }
  799. ac.transport = nil
  800. ac.curAddr = resolver.Address{}
  801. ac.mu.Unlock()
  802. ac.cc.mu.RLock()
  803. ac.dopts.copts.KeepaliveParams = ac.cc.mkp
  804. ac.cc.mu.RUnlock()
  805. for retries := 0; ; retries++ {
  806. sleepTime := ac.dopts.bs.backoff(retries)
  807. timeout := minConnectTimeout
  808. ac.mu.Lock()
  809. if timeout < time.Duration(int(sleepTime)/len(ac.addrs)) {
  810. timeout = time.Duration(int(sleepTime) / len(ac.addrs))
  811. }
  812. connectTime := time.Now()
  813. if ac.state == connectivity.Shutdown {
  814. ac.mu.Unlock()
  815. return errConnClosing
  816. }
  817. ac.printf("connecting")
  818. if ac.state != connectivity.Connecting {
  819. ac.state = connectivity.Connecting
  820. // TODO(bar) remove condition once we always have a balancer.
  821. if ac.cc.balancerWrapper != nil {
  822. ac.cc.balancerWrapper.handleSubConnStateChange(ac.acbw, ac.state)
  823. } else {
  824. ac.cc.csMgr.updateState(ac.state)
  825. }
  826. }
  827. // copy ac.addrs in case of race
  828. addrsIter := make([]resolver.Address, len(ac.addrs))
  829. copy(addrsIter, ac.addrs)
  830. copts := ac.dopts.copts
  831. ac.mu.Unlock()
  832. for _, addr := range addrsIter {
  833. ac.mu.Lock()
  834. if ac.state == connectivity.Shutdown {
  835. // ac.tearDown(...) has been invoked.
  836. ac.mu.Unlock()
  837. return errConnClosing
  838. }
  839. ac.mu.Unlock()
  840. sinfo := transport.TargetInfo{
  841. Addr: addr.Addr,
  842. Metadata: addr.Metadata,
  843. }
  844. newTransport, err := transport.NewClientTransport(ac.cc.ctx, sinfo, copts, timeout)
  845. if err != nil {
  846. if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
  847. return err
  848. }
  849. grpclog.Warningf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %v", err, addr)
  850. ac.mu.Lock()
  851. if ac.state == connectivity.Shutdown {
  852. // ac.tearDown(...) has been invoked.
  853. ac.mu.Unlock()
  854. return errConnClosing
  855. }
  856. ac.mu.Unlock()
  857. continue
  858. }
  859. ac.mu.Lock()
  860. ac.printf("ready")
  861. if ac.state == connectivity.Shutdown {
  862. // ac.tearDown(...) has been invoked.
  863. ac.mu.Unlock()
  864. newTransport.Close()
  865. return errConnClosing
  866. }
  867. ac.state = connectivity.Ready
  868. if ac.cc.balancerWrapper != nil {
  869. ac.cc.balancerWrapper.handleSubConnStateChange(ac.acbw, ac.state)
  870. } else {
  871. ac.cc.csMgr.updateState(ac.state)
  872. }
  873. t := ac.transport
  874. ac.transport = newTransport
  875. if t != nil {
  876. t.Close()
  877. }
  878. ac.curAddr = addr
  879. if ac.ready != nil {
  880. close(ac.ready)
  881. ac.ready = nil
  882. }
  883. ac.mu.Unlock()
  884. return nil
  885. }
  886. ac.mu.Lock()
  887. ac.state = connectivity.TransientFailure
  888. if ac.cc.balancerWrapper != nil {
  889. ac.cc.balancerWrapper.handleSubConnStateChange(ac.acbw, ac.state)
  890. } else {
  891. ac.cc.csMgr.updateState(ac.state)
  892. }
  893. if ac.ready != nil {
  894. close(ac.ready)
  895. ac.ready = nil
  896. }
  897. ac.mu.Unlock()
  898. timer := time.NewTimer(sleepTime - time.Since(connectTime))
  899. select {
  900. case <-timer.C:
  901. case <-ac.ctx.Done():
  902. timer.Stop()
  903. return ac.ctx.Err()
  904. }
  905. timer.Stop()
  906. }
  907. }
  908. // Run in a goroutine to track the error in transport and create the
  909. // new transport if an error happens. It returns when the channel is closing.
  910. func (ac *addrConn) transportMonitor() {
  911. for {
  912. ac.mu.Lock()
  913. t := ac.transport
  914. ac.mu.Unlock()
  915. // Block until we receive a goaway or an error occurs.
  916. select {
  917. case <-t.GoAway():
  918. case <-t.Error():
  919. }
  920. // If a GoAway happened, regardless of error, adjust our keepalive
  921. // parameters as appropriate.
  922. select {
  923. case <-t.GoAway():
  924. ac.adjustParams(t.GetGoAwayReason())
  925. default:
  926. }
  927. ac.mu.Lock()
  928. if ac.state == connectivity.Shutdown {
  929. ac.mu.Unlock()
  930. return
  931. }
  932. // Set connectivity state to TransientFailure before calling
  933. // resetTransport. Transition READY->CONNECTING is not valid.
  934. ac.state = connectivity.TransientFailure
  935. if ac.cc.balancerWrapper != nil {
  936. ac.cc.balancerWrapper.handleSubConnStateChange(ac.acbw, ac.state)
  937. } else {
  938. ac.cc.csMgr.updateState(ac.state)
  939. }
  940. ac.curAddr = resolver.Address{}
  941. ac.mu.Unlock()
  942. if err := ac.resetTransport(); err != nil {
  943. ac.mu.Lock()
  944. ac.printf("transport exiting: %v", err)
  945. ac.mu.Unlock()
  946. grpclog.Warningf("grpc: addrConn.transportMonitor exits due to: %v", err)
  947. if err != errConnClosing {
  948. // Keep this ac in cc.conns, to get the reason it's torn down.
  949. ac.tearDown(err)
  950. }
  951. return
  952. }
  953. }
  954. }
  955. // wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or
  956. // iv) transport is in connectivity.TransientFailure and there is a balancer/failfast is true.
  957. func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (transport.ClientTransport, error) {
  958. for {
  959. ac.mu.Lock()
  960. switch {
  961. case ac.state == connectivity.Shutdown:
  962. if failfast || !hasBalancer {
  963. // RPC is failfast or balancer is nil. This RPC should fail with ac.tearDownErr.
  964. err := ac.tearDownErr
  965. ac.mu.Unlock()
  966. return nil, err
  967. }
  968. ac.mu.Unlock()
  969. return nil, errConnClosing
  970. case ac.state == connectivity.Ready:
  971. ct := ac.transport
  972. ac.mu.Unlock()
  973. return ct, nil
  974. case ac.state == connectivity.TransientFailure:
  975. if failfast || hasBalancer {
  976. ac.mu.Unlock()
  977. return nil, errConnUnavailable
  978. }
  979. }
  980. ready := ac.ready
  981. if ready == nil {
  982. ready = make(chan struct{})
  983. ac.ready = ready
  984. }
  985. ac.mu.Unlock()
  986. select {
  987. case <-ctx.Done():
  988. return nil, toRPCErr(ctx.Err())
  989. // Wait until the new transport is ready or failed.
  990. case <-ready:
  991. }
  992. }
  993. }
  994. // getReadyTransport returns the transport if ac's state is READY.
  995. // Otherwise it returns nil, false.
  996. // If ac's state is IDLE, it will trigger ac to connect.
  997. func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
  998. ac.mu.Lock()
  999. if ac.state == connectivity.Ready {
  1000. t := ac.transport
  1001. ac.mu.Unlock()
  1002. return t, true
  1003. }
  1004. var idle bool
  1005. if ac.state == connectivity.Idle {
  1006. idle = true
  1007. }
  1008. ac.mu.Unlock()
  1009. // Trigger idle ac to connect.
  1010. if idle {
  1011. ac.connect(false)
  1012. }
  1013. return nil, false
  1014. }
  1015. // tearDown starts to tear down the addrConn.
  1016. // TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
  1017. // some edge cases (e.g., the caller opens and closes many addrConn's in a
  1018. // tight loop.
  1019. // tearDown doesn't remove ac from ac.cc.conns.
  1020. func (ac *addrConn) tearDown(err error) {
  1021. ac.cancel()
  1022. ac.mu.Lock()
  1023. ac.curAddr = resolver.Address{}
  1024. defer ac.mu.Unlock()
  1025. if err == errConnDrain && ac.transport != nil {
  1026. // GracefulClose(...) may be executed multiple times when
  1027. // i) receiving multiple GoAway frames from the server; or
  1028. // ii) there are concurrent name resolver/Balancer triggered
  1029. // address removal and GoAway.
  1030. ac.transport.GracefulClose()
  1031. }
  1032. if ac.state == connectivity.Shutdown {
  1033. return
  1034. }
  1035. ac.state = connectivity.Shutdown
  1036. ac.tearDownErr = err
  1037. if ac.cc.balancerWrapper != nil {
  1038. ac.cc.balancerWrapper.handleSubConnStateChange(ac.acbw, ac.state)
  1039. } else {
  1040. ac.cc.csMgr.updateState(ac.state)
  1041. }
  1042. if ac.events != nil {
  1043. ac.events.Finish()
  1044. ac.events = nil
  1045. }
  1046. if ac.ready != nil {
  1047. close(ac.ready)
  1048. ac.ready = nil
  1049. }
  1050. return
  1051. }
  1052. func (ac *addrConn) getState() connectivity.State {
  1053. ac.mu.Lock()
  1054. defer ac.mu.Unlock()
  1055. return ac.state
  1056. }