clientconn.go 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "errors"
  21. "math"
  22. "net"
  23. "strings"
  24. "sync"
  25. "time"
  26. "golang.org/x/net/context"
  27. "golang.org/x/net/trace"
  28. "google.golang.org/grpc/connectivity"
  29. "google.golang.org/grpc/credentials"
  30. "google.golang.org/grpc/grpclog"
  31. "google.golang.org/grpc/keepalive"
  32. "google.golang.org/grpc/stats"
  33. "google.golang.org/grpc/transport"
  34. )
  35. var (
  36. // ErrClientConnClosing indicates that the operation is illegal because
  37. // the ClientConn is closing.
  38. ErrClientConnClosing = errors.New("grpc: the client connection is closing")
  39. // ErrClientConnTimeout indicates that the ClientConn cannot establish the
  40. // underlying connections within the specified timeout.
  41. // DEPRECATED: Please use context.DeadlineExceeded instead.
  42. ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
  43. // errNoTransportSecurity indicates that there is no transport security
  44. // being set for ClientConn. Users should either set one or explicitly
  45. // call WithInsecure DialOption to disable security.
  46. errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")
  47. // errTransportCredentialsMissing indicates that users want to transmit security
  48. // information (e.g., oauth2 token) which requires secure connection on an insecure
  49. // connection.
  50. errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
  51. // errCredentialsConflict indicates that grpc.WithTransportCredentials()
  52. // and grpc.WithInsecure() are both called for a connection.
  53. errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
  54. // errNetworkIO indicates that the connection is down due to some network I/O error.
  55. errNetworkIO = errors.New("grpc: failed with network I/O error")
  56. // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
  57. errConnDrain = errors.New("grpc: the connection is drained")
  58. // errConnClosing indicates that the connection is closing.
  59. errConnClosing = errors.New("grpc: the connection is closing")
  60. // errConnUnavailable indicates that the connection is unavailable.
  61. errConnUnavailable = errors.New("grpc: the connection is unavailable")
  62. // errBalancerClosed indicates that the balancer is closed.
  63. errBalancerClosed = errors.New("grpc: balancer is closed")
  64. // minimum time to give a connection to complete
  65. minConnectTimeout = 20 * time.Second
  66. )
  67. // dialOptions configure a Dial call. dialOptions are set by the DialOption
  68. // values passed to Dial.
  69. type dialOptions struct {
  70. unaryInt UnaryClientInterceptor
  71. streamInt StreamClientInterceptor
  72. codec Codec
  73. cp Compressor
  74. dc Decompressor
  75. bs backoffStrategy
  76. balancer Balancer
  77. block bool
  78. insecure bool
  79. timeout time.Duration
  80. scChan <-chan ServiceConfig
  81. copts transport.ConnectOptions
  82. callOptions []CallOption
  83. }
  84. const (
  85. defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
  86. defaultClientMaxSendMessageSize = math.MaxInt32
  87. )
  88. // DialOption configures how we set up the connection.
  89. type DialOption func(*dialOptions)
  90. // WithInitialWindowSize returns a DialOption which sets the value for initial window size on a stream.
  91. // The lower bound for window size is 64K and any value smaller than that will be ignored.
  92. func WithInitialWindowSize(s int32) DialOption {
  93. return func(o *dialOptions) {
  94. o.copts.InitialWindowSize = s
  95. }
  96. }
  97. // WithInitialConnWindowSize returns a DialOption which sets the value for initial window size on a connection.
  98. // The lower bound for window size is 64K and any value smaller than that will be ignored.
  99. func WithInitialConnWindowSize(s int32) DialOption {
  100. return func(o *dialOptions) {
  101. o.copts.InitialConnWindowSize = s
  102. }
  103. }
  104. // WithMaxMsgSize returns a DialOption which sets the maximum message size the client can receive. Deprecated: use WithDefaultCallOptions(MaxCallRecvMsgSize(s)) instead.
  105. func WithMaxMsgSize(s int) DialOption {
  106. return WithDefaultCallOptions(MaxCallRecvMsgSize(s))
  107. }
  108. // WithDefaultCallOptions returns a DialOption which sets the default CallOptions for calls over the connection.
  109. func WithDefaultCallOptions(cos ...CallOption) DialOption {
  110. return func(o *dialOptions) {
  111. o.callOptions = append(o.callOptions, cos...)
  112. }
  113. }
  114. // WithCodec returns a DialOption which sets a codec for message marshaling and unmarshaling.
  115. func WithCodec(c Codec) DialOption {
  116. return func(o *dialOptions) {
  117. o.codec = c
  118. }
  119. }
  120. // WithCompressor returns a DialOption which sets a CompressorGenerator for generating message
  121. // compressor.
  122. func WithCompressor(cp Compressor) DialOption {
  123. return func(o *dialOptions) {
  124. o.cp = cp
  125. }
  126. }
  127. // WithDecompressor returns a DialOption which sets a DecompressorGenerator for generating
  128. // message decompressor.
  129. func WithDecompressor(dc Decompressor) DialOption {
  130. return func(o *dialOptions) {
  131. o.dc = dc
  132. }
  133. }
  134. // WithBalancer returns a DialOption which sets a load balancer.
  135. func WithBalancer(b Balancer) DialOption {
  136. return func(o *dialOptions) {
  137. o.balancer = b
  138. }
  139. }
  140. // WithServiceConfig returns a DialOption which has a channel to read the service configuration.
  141. func WithServiceConfig(c <-chan ServiceConfig) DialOption {
  142. return func(o *dialOptions) {
  143. o.scChan = c
  144. }
  145. }
  146. // WithBackoffMaxDelay configures the dialer to use the provided maximum delay
  147. // when backing off after failed connection attempts.
  148. func WithBackoffMaxDelay(md time.Duration) DialOption {
  149. return WithBackoffConfig(BackoffConfig{MaxDelay: md})
  150. }
  151. // WithBackoffConfig configures the dialer to use the provided backoff
  152. // parameters after connection failures.
  153. //
  154. // Use WithBackoffMaxDelay until more parameters on BackoffConfig are opened up
  155. // for use.
  156. func WithBackoffConfig(b BackoffConfig) DialOption {
  157. // Set defaults to ensure that provided BackoffConfig is valid and
  158. // unexported fields get default values.
  159. setDefaults(&b)
  160. return withBackoff(b)
  161. }
  162. // withBackoff sets the backoff strategy used for retries after a
  163. // failed connection attempt.
  164. //
  165. // This can be exported if arbitrary backoff strategies are allowed by gRPC.
  166. func withBackoff(bs backoffStrategy) DialOption {
  167. return func(o *dialOptions) {
  168. o.bs = bs
  169. }
  170. }
  171. // WithBlock returns a DialOption which makes caller of Dial blocks until the underlying
  172. // connection is up. Without this, Dial returns immediately and connecting the server
  173. // happens in background.
  174. func WithBlock() DialOption {
  175. return func(o *dialOptions) {
  176. o.block = true
  177. }
  178. }
  179. // WithInsecure returns a DialOption which disables transport security for this ClientConn.
  180. // Note that transport security is required unless WithInsecure is set.
  181. func WithInsecure() DialOption {
  182. return func(o *dialOptions) {
  183. o.insecure = true
  184. }
  185. }
  186. // WithTransportCredentials returns a DialOption which configures a
  187. // connection level security credentials (e.g., TLS/SSL).
  188. func WithTransportCredentials(creds credentials.TransportCredentials) DialOption {
  189. return func(o *dialOptions) {
  190. o.copts.TransportCredentials = creds
  191. }
  192. }
  193. // WithPerRPCCredentials returns a DialOption which sets
  194. // credentials and places auth state on each outbound RPC.
  195. func WithPerRPCCredentials(creds credentials.PerRPCCredentials) DialOption {
  196. return func(o *dialOptions) {
  197. o.copts.PerRPCCredentials = append(o.copts.PerRPCCredentials, creds)
  198. }
  199. }
  200. // WithTimeout returns a DialOption that configures a timeout for dialing a ClientConn
  201. // initially. This is valid if and only if WithBlock() is present.
  202. // Deprecated: use DialContext and context.WithTimeout instead.
  203. func WithTimeout(d time.Duration) DialOption {
  204. return func(o *dialOptions) {
  205. o.timeout = d
  206. }
  207. }
  208. // WithDialer returns a DialOption that specifies a function to use for dialing network addresses.
  209. // If FailOnNonTempDialError() is set to true, and an error is returned by f, gRPC checks the error's
  210. // Temporary() method to decide if it should try to reconnect to the network address.
  211. func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption {
  212. return func(o *dialOptions) {
  213. o.copts.Dialer = func(ctx context.Context, addr string) (net.Conn, error) {
  214. if deadline, ok := ctx.Deadline(); ok {
  215. return f(addr, deadline.Sub(time.Now()))
  216. }
  217. return f(addr, 0)
  218. }
  219. }
  220. }
  221. // WithStatsHandler returns a DialOption that specifies the stats handler
  222. // for all the RPCs and underlying network connections in this ClientConn.
  223. func WithStatsHandler(h stats.Handler) DialOption {
  224. return func(o *dialOptions) {
  225. o.copts.StatsHandler = h
  226. }
  227. }
  228. // FailOnNonTempDialError returns a DialOption that specifies if gRPC fails on non-temporary dial errors.
  229. // If f is true, and dialer returns a non-temporary error, gRPC will fail the connection to the network
  230. // address and won't try to reconnect.
  231. // The default value of FailOnNonTempDialError is false.
  232. // This is an EXPERIMENTAL API.
  233. func FailOnNonTempDialError(f bool) DialOption {
  234. return func(o *dialOptions) {
  235. o.copts.FailOnNonTempDialError = f
  236. }
  237. }
  238. // WithUserAgent returns a DialOption that specifies a user agent string for all the RPCs.
  239. func WithUserAgent(s string) DialOption {
  240. return func(o *dialOptions) {
  241. o.copts.UserAgent = s
  242. }
  243. }
  244. // WithKeepaliveParams returns a DialOption that specifies keepalive paramaters for the client transport.
  245. func WithKeepaliveParams(kp keepalive.ClientParameters) DialOption {
  246. return func(o *dialOptions) {
  247. o.copts.KeepaliveParams = kp
  248. }
  249. }
  250. // WithUnaryInterceptor returns a DialOption that specifies the interceptor for unary RPCs.
  251. func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption {
  252. return func(o *dialOptions) {
  253. o.unaryInt = f
  254. }
  255. }
  256. // WithStreamInterceptor returns a DialOption that specifies the interceptor for streaming RPCs.
  257. func WithStreamInterceptor(f StreamClientInterceptor) DialOption {
  258. return func(o *dialOptions) {
  259. o.streamInt = f
  260. }
  261. }
  262. // WithAuthority returns a DialOption that specifies the value to be used as
  263. // the :authority pseudo-header. This value only works with WithInsecure and
  264. // has no effect if TransportCredentials are present.
  265. func WithAuthority(a string) DialOption {
  266. return func(o *dialOptions) {
  267. o.copts.Authority = a
  268. }
  269. }
  270. // Dial creates a client connection to the given target.
  271. func Dial(target string, opts ...DialOption) (*ClientConn, error) {
  272. return DialContext(context.Background(), target, opts...)
  273. }
  274. // DialContext creates a client connection to the given target. ctx can be used to
  275. // cancel or expire the pending connection. Once this function returns, the
  276. // cancellation and expiration of ctx will be noop. Users should call ClientConn.Close
  277. // to terminate all the pending operations after this function returns.
  278. func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
  279. cc := &ClientConn{
  280. target: target,
  281. csMgr: &connectivityStateManager{},
  282. conns: make(map[Address]*addrConn),
  283. }
  284. cc.csEvltr = &connectivityStateEvaluator{csMgr: cc.csMgr}
  285. cc.ctx, cc.cancel = context.WithCancel(context.Background())
  286. for _, opt := range opts {
  287. opt(&cc.dopts)
  288. }
  289. cc.mkp = cc.dopts.copts.KeepaliveParams
  290. if cc.dopts.copts.Dialer == nil {
  291. cc.dopts.copts.Dialer = newProxyDialer(
  292. func(ctx context.Context, addr string) (net.Conn, error) {
  293. return dialContext(ctx, "tcp", addr)
  294. },
  295. )
  296. }
  297. if cc.dopts.copts.UserAgent != "" {
  298. cc.dopts.copts.UserAgent += " " + grpcUA
  299. } else {
  300. cc.dopts.copts.UserAgent = grpcUA
  301. }
  302. if cc.dopts.timeout > 0 {
  303. var cancel context.CancelFunc
  304. ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
  305. defer cancel()
  306. }
  307. defer func() {
  308. select {
  309. case <-ctx.Done():
  310. conn, err = nil, ctx.Err()
  311. default:
  312. }
  313. if err != nil {
  314. cc.Close()
  315. }
  316. }()
  317. scSet := false
  318. if cc.dopts.scChan != nil {
  319. // Try to get an initial service config.
  320. select {
  321. case sc, ok := <-cc.dopts.scChan:
  322. if ok {
  323. cc.sc = sc
  324. scSet = true
  325. }
  326. default:
  327. }
  328. }
  329. // Set defaults.
  330. if cc.dopts.codec == nil {
  331. cc.dopts.codec = protoCodec{}
  332. }
  333. if cc.dopts.bs == nil {
  334. cc.dopts.bs = DefaultBackoffConfig
  335. }
  336. creds := cc.dopts.copts.TransportCredentials
  337. if creds != nil && creds.Info().ServerName != "" {
  338. cc.authority = creds.Info().ServerName
  339. } else if cc.dopts.insecure && cc.dopts.copts.Authority != "" {
  340. cc.authority = cc.dopts.copts.Authority
  341. } else {
  342. cc.authority = target
  343. }
  344. waitC := make(chan error, 1)
  345. go func() {
  346. defer close(waitC)
  347. if cc.dopts.balancer == nil && cc.sc.LB != nil {
  348. cc.dopts.balancer = cc.sc.LB
  349. }
  350. if cc.dopts.balancer != nil {
  351. var credsClone credentials.TransportCredentials
  352. if creds != nil {
  353. credsClone = creds.Clone()
  354. }
  355. config := BalancerConfig{
  356. DialCreds: credsClone,
  357. Dialer: cc.dopts.copts.Dialer,
  358. }
  359. if err := cc.dopts.balancer.Start(target, config); err != nil {
  360. waitC <- err
  361. return
  362. }
  363. ch := cc.dopts.balancer.Notify()
  364. if ch != nil {
  365. if cc.dopts.block {
  366. doneChan := make(chan struct{})
  367. go cc.lbWatcher(doneChan)
  368. <-doneChan
  369. } else {
  370. go cc.lbWatcher(nil)
  371. }
  372. return
  373. }
  374. }
  375. // No balancer, or no resolver within the balancer. Connect directly.
  376. if err := cc.resetAddrConn([]Address{{Addr: target}}, cc.dopts.block, nil); err != nil {
  377. waitC <- err
  378. return
  379. }
  380. }()
  381. select {
  382. case <-ctx.Done():
  383. return nil, ctx.Err()
  384. case err := <-waitC:
  385. if err != nil {
  386. return nil, err
  387. }
  388. }
  389. if cc.dopts.scChan != nil && !scSet {
  390. // Blocking wait for the initial service config.
  391. select {
  392. case sc, ok := <-cc.dopts.scChan:
  393. if ok {
  394. cc.sc = sc
  395. }
  396. case <-ctx.Done():
  397. return nil, ctx.Err()
  398. }
  399. }
  400. if cc.dopts.scChan != nil {
  401. go cc.scWatcher()
  402. }
  403. return cc, nil
  404. }
  405. // connectivityStateEvaluator gets updated by addrConns when their
  406. // states transition, based on which it evaluates the state of
  407. // ClientConn.
  408. // Note: This code will eventually sit in the balancer in the new design.
  409. type connectivityStateEvaluator struct {
  410. csMgr *connectivityStateManager
  411. mu sync.Mutex
  412. numReady uint64 // Number of addrConns in ready state.
  413. numConnecting uint64 // Number of addrConns in connecting state.
  414. numTransientFailure uint64 // Number of addrConns in transientFailure.
  415. }
  416. // recordTransition records state change happening in every addrConn and based on
  417. // that it evaluates what state the ClientConn is in.
  418. // It can only transition between connectivity.Ready, connectivity.Connecting and connectivity.TransientFailure. Other states,
  419. // Idle and connectivity.Shutdown are transitioned into by ClientConn; in the begining of the connection
  420. // before any addrConn is created ClientConn is in idle state. In the end when ClientConn
  421. // closes it is in connectivity.Shutdown state.
  422. // TODO Note that in later releases, a ClientConn with no activity will be put into an Idle state.
  423. func (cse *connectivityStateEvaluator) recordTransition(oldState, newState connectivity.State) {
  424. cse.mu.Lock()
  425. defer cse.mu.Unlock()
  426. // Update counters.
  427. for idx, state := range []connectivity.State{oldState, newState} {
  428. updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new.
  429. switch state {
  430. case connectivity.Ready:
  431. cse.numReady += updateVal
  432. case connectivity.Connecting:
  433. cse.numConnecting += updateVal
  434. case connectivity.TransientFailure:
  435. cse.numTransientFailure += updateVal
  436. }
  437. }
  438. // Evaluate.
  439. if cse.numReady > 0 {
  440. cse.csMgr.updateState(connectivity.Ready)
  441. return
  442. }
  443. if cse.numConnecting > 0 {
  444. cse.csMgr.updateState(connectivity.Connecting)
  445. return
  446. }
  447. cse.csMgr.updateState(connectivity.TransientFailure)
  448. }
  449. // connectivityStateManager keeps the connectivity.State of ClientConn.
  450. // This struct will eventually be exported so the balancers can access it.
  451. type connectivityStateManager struct {
  452. mu sync.Mutex
  453. state connectivity.State
  454. notifyChan chan struct{}
  455. }
  456. // updateState updates the connectivity.State of ClientConn.
  457. // If there's a change it notifies goroutines waiting on state change to
  458. // happen.
  459. func (csm *connectivityStateManager) updateState(state connectivity.State) {
  460. csm.mu.Lock()
  461. defer csm.mu.Unlock()
  462. if csm.state == connectivity.Shutdown {
  463. return
  464. }
  465. if csm.state == state {
  466. return
  467. }
  468. csm.state = state
  469. if csm.notifyChan != nil {
  470. // There are other goroutines waiting on this channel.
  471. close(csm.notifyChan)
  472. csm.notifyChan = nil
  473. }
  474. }
  475. func (csm *connectivityStateManager) getState() connectivity.State {
  476. csm.mu.Lock()
  477. defer csm.mu.Unlock()
  478. return csm.state
  479. }
  480. func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {
  481. csm.mu.Lock()
  482. defer csm.mu.Unlock()
  483. if csm.notifyChan == nil {
  484. csm.notifyChan = make(chan struct{})
  485. }
  486. return csm.notifyChan
  487. }
  488. // ClientConn represents a client connection to an RPC server.
  489. type ClientConn struct {
  490. ctx context.Context
  491. cancel context.CancelFunc
  492. target string
  493. authority string
  494. dopts dialOptions
  495. csMgr *connectivityStateManager
  496. csEvltr *connectivityStateEvaluator // This will eventually be part of balancer.
  497. mu sync.RWMutex
  498. sc ServiceConfig
  499. conns map[Address]*addrConn
  500. // Keepalive parameter can be updated if a GoAway is received.
  501. mkp keepalive.ClientParameters
  502. }
  503. // WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
  504. // ctx expires. A true value is returned in former case and false in latter.
  505. // This is an EXPERIMENTAL API.
  506. func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {
  507. ch := cc.csMgr.getNotifyChan()
  508. if cc.csMgr.getState() != sourceState {
  509. return true
  510. }
  511. select {
  512. case <-ctx.Done():
  513. return false
  514. case <-ch:
  515. return true
  516. }
  517. }
  518. // GetState returns the connectivity.State of ClientConn.
  519. // This is an EXPERIMENTAL API.
  520. func (cc *ClientConn) GetState() connectivity.State {
  521. return cc.csMgr.getState()
  522. }
  523. // lbWatcher watches the Notify channel of the balancer in cc and manages
  524. // connections accordingly. If doneChan is not nil, it is closed after the
  525. // first successfull connection is made.
  526. func (cc *ClientConn) lbWatcher(doneChan chan struct{}) {
  527. defer func() {
  528. // In case channel from cc.dopts.balancer.Notify() gets closed before a
  529. // successful connection gets established, don't forget to notify the
  530. // caller.
  531. if doneChan != nil {
  532. close(doneChan)
  533. }
  534. }()
  535. _, isPickFirst := cc.dopts.balancer.(*pickFirst)
  536. for addrs := range cc.dopts.balancer.Notify() {
  537. if isPickFirst {
  538. if len(addrs) == 0 {
  539. // No address can be connected, should teardown current addrconn if exists
  540. cc.mu.Lock()
  541. if len(cc.conns) != 0 {
  542. cc.pickFirstAddrConnTearDown()
  543. }
  544. cc.mu.Unlock()
  545. } else {
  546. cc.resetAddrConn(addrs, true, nil)
  547. if doneChan != nil {
  548. close(doneChan)
  549. doneChan = nil
  550. }
  551. }
  552. } else {
  553. // Not pickFirst, create a new addrConn for each address.
  554. var (
  555. add []Address // Addresses need to setup connections.
  556. del []*addrConn // Connections need to tear down.
  557. )
  558. cc.mu.Lock()
  559. for _, a := range addrs {
  560. if _, ok := cc.conns[a]; !ok {
  561. add = append(add, a)
  562. }
  563. }
  564. for k, c := range cc.conns {
  565. var keep bool
  566. for _, a := range addrs {
  567. if k == a {
  568. keep = true
  569. break
  570. }
  571. }
  572. if !keep {
  573. del = append(del, c)
  574. delete(cc.conns, k)
  575. }
  576. }
  577. cc.mu.Unlock()
  578. for _, a := range add {
  579. var err error
  580. if doneChan != nil {
  581. err = cc.resetAddrConn([]Address{a}, true, nil)
  582. if err == nil {
  583. close(doneChan)
  584. doneChan = nil
  585. }
  586. } else {
  587. err = cc.resetAddrConn([]Address{a}, false, nil)
  588. }
  589. if err != nil {
  590. grpclog.Warningf("Error creating connection to %v. Err: %v", a, err)
  591. }
  592. }
  593. for _, c := range del {
  594. c.tearDown(errConnDrain)
  595. }
  596. }
  597. }
  598. }
  599. func (cc *ClientConn) scWatcher() {
  600. for {
  601. select {
  602. case sc, ok := <-cc.dopts.scChan:
  603. if !ok {
  604. return
  605. }
  606. cc.mu.Lock()
  607. // TODO: load balance policy runtime change is ignored.
  608. // We may revist this decision in the future.
  609. cc.sc = sc
  610. cc.mu.Unlock()
  611. case <-cc.ctx.Done():
  612. return
  613. }
  614. }
  615. }
  616. // pickFirstUpdateAddresses checks whether current address in the updating list, Update the list if true.
  617. // It is only used when the balancer is pick first.
  618. func (cc *ClientConn) pickFirstUpdateAddresses(addrs []Address) bool {
  619. if len(cc.conns) == 0 {
  620. // No addrconn. Should go resetting addrconn.
  621. return false
  622. }
  623. var currentAc *addrConn
  624. for _, currentAc = range cc.conns {
  625. break
  626. }
  627. var addrInNewSlice bool
  628. for _, addr := range addrs {
  629. if strings.Compare(addr.Addr, currentAc.curAddr.Addr) == 0 {
  630. addrInNewSlice = true
  631. break
  632. }
  633. }
  634. if addrInNewSlice {
  635. cc.conns = make(map[Address]*addrConn)
  636. for _, addr := range addrs {
  637. cc.conns[addr] = currentAc
  638. }
  639. currentAc.addrs = addrs
  640. return true
  641. }
  642. return false
  643. }
  644. // pickFirstAddrConnTearDown() should be called after lock.
  645. func (cc *ClientConn) pickFirstAddrConnTearDown() {
  646. if len(cc.conns) == 0 {
  647. return
  648. }
  649. var currentAc *addrConn
  650. for _, currentAc = range cc.conns {
  651. break
  652. }
  653. for k := range cc.conns {
  654. delete(cc.conns, k)
  655. }
  656. currentAc.tearDown(errConnDrain)
  657. }
  658. // resetAddrConn creates an addrConn for addr and adds it to cc.conns.
  659. // If there is an old addrConn for addr, it will be torn down, using tearDownErr as the reason.
  660. // If tearDownErr is nil, errConnDrain will be used instead.
  661. //
  662. // We should never need to replace an addrConn with a new one. This function is only used
  663. // as newAddrConn to create new addrConn.
  664. // TODO rename this function and clean up the code.
  665. func (cc *ClientConn) resetAddrConn(addrs []Address, block bool, tearDownErr error) error {
  666. // if current transport in addrs, just change lists to update order and new addresses
  667. // not work for roundrobin
  668. cc.mu.Lock()
  669. if _, isPickFirst := cc.dopts.balancer.(*pickFirst); isPickFirst {
  670. // If Current address in use in the updating list, just update the list.
  671. // Otherwise, teardown current addrconn and create a new one.
  672. if cc.pickFirstUpdateAddresses(addrs) {
  673. cc.mu.Unlock()
  674. return nil
  675. }
  676. cc.pickFirstAddrConnTearDown()
  677. }
  678. cc.mu.Unlock()
  679. ac := &addrConn{
  680. cc: cc,
  681. addrs: addrs,
  682. dopts: cc.dopts,
  683. }
  684. ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
  685. ac.csEvltr = cc.csEvltr
  686. if EnableTracing {
  687. ac.events = trace.NewEventLog("grpc.ClientConn", ac.addrs[0].Addr)
  688. }
  689. if !ac.dopts.insecure {
  690. if ac.dopts.copts.TransportCredentials == nil {
  691. return errNoTransportSecurity
  692. }
  693. } else {
  694. if ac.dopts.copts.TransportCredentials != nil {
  695. return errCredentialsConflict
  696. }
  697. for _, cd := range ac.dopts.copts.PerRPCCredentials {
  698. if cd.RequireTransportSecurity() {
  699. return errTransportCredentialsMissing
  700. }
  701. }
  702. }
  703. // Track ac in cc. This needs to be done before any getTransport(...) is called.
  704. cc.mu.Lock()
  705. if cc.conns == nil {
  706. cc.mu.Unlock()
  707. return ErrClientConnClosing
  708. }
  709. stale := cc.conns[ac.addrs[0]]
  710. for _, a := range ac.addrs {
  711. cc.conns[a] = ac
  712. }
  713. cc.mu.Unlock()
  714. if stale != nil {
  715. // There is an addrConn alive on ac.addr already. This could be due to
  716. // a buggy Balancer that reports duplicated Addresses.
  717. if tearDownErr == nil {
  718. // tearDownErr is nil if resetAddrConn is called by
  719. // 1) Dial
  720. // 2) lbWatcher
  721. // In both cases, the stale ac should drain, not close.
  722. stale.tearDown(errConnDrain)
  723. } else {
  724. stale.tearDown(tearDownErr)
  725. }
  726. }
  727. if block {
  728. if err := ac.resetTransport(false); err != nil {
  729. if err != errConnClosing {
  730. // Tear down ac and delete it from cc.conns.
  731. cc.mu.Lock()
  732. delete(cc.conns, ac.addrs[0])
  733. cc.mu.Unlock()
  734. ac.tearDown(err)
  735. }
  736. if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
  737. return e.Origin()
  738. }
  739. return err
  740. }
  741. // Start to monitor the error status of transport.
  742. go ac.transportMonitor()
  743. } else {
  744. // Start a goroutine connecting to the server asynchronously.
  745. go func() {
  746. if err := ac.resetTransport(false); err != nil {
  747. grpclog.Warningf("Failed to dial %s: %v; please retry.", ac.addrs[0].Addr, err)
  748. if err != errConnClosing {
  749. // Keep this ac in cc.conns, to get the reason it's torn down.
  750. ac.tearDown(err)
  751. }
  752. return
  753. }
  754. ac.transportMonitor()
  755. }()
  756. }
  757. return nil
  758. }
  759. // GetMethodConfig gets the method config of the input method.
  760. // If there's an exact match for input method (i.e. /service/method), we return
  761. // the corresponding MethodConfig.
  762. // If there isn't an exact match for the input method, we look for the default config
  763. // under the service (i.e /service/). If there is a default MethodConfig for
  764. // the serivce, we return it.
  765. // Otherwise, we return an empty MethodConfig.
  766. func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
  767. // TODO: Avoid the locking here.
  768. cc.mu.RLock()
  769. defer cc.mu.RUnlock()
  770. m, ok := cc.sc.Methods[method]
  771. if !ok {
  772. i := strings.LastIndex(method, "/")
  773. m, _ = cc.sc.Methods[method[:i+1]]
  774. }
  775. return m
  776. }
  777. func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) {
  778. var (
  779. ac *addrConn
  780. ok bool
  781. put func()
  782. )
  783. if cc.dopts.balancer == nil {
  784. // If balancer is nil, there should be only one addrConn available.
  785. cc.mu.RLock()
  786. if cc.conns == nil {
  787. cc.mu.RUnlock()
  788. return nil, nil, toRPCErr(ErrClientConnClosing)
  789. }
  790. for _, ac = range cc.conns {
  791. // Break after the first iteration to get the first addrConn.
  792. ok = true
  793. break
  794. }
  795. cc.mu.RUnlock()
  796. } else {
  797. var (
  798. addr Address
  799. err error
  800. )
  801. addr, put, err = cc.dopts.balancer.Get(ctx, opts)
  802. if err != nil {
  803. return nil, nil, toRPCErr(err)
  804. }
  805. cc.mu.RLock()
  806. if cc.conns == nil {
  807. cc.mu.RUnlock()
  808. return nil, nil, toRPCErr(ErrClientConnClosing)
  809. }
  810. ac, ok = cc.conns[addr]
  811. cc.mu.RUnlock()
  812. }
  813. if !ok {
  814. if put != nil {
  815. updateRPCInfoInContext(ctx, rpcInfo{bytesSent: false, bytesReceived: false})
  816. put()
  817. }
  818. return nil, nil, errConnClosing
  819. }
  820. t, err := ac.wait(ctx, cc.dopts.balancer != nil, !opts.BlockingWait)
  821. if err != nil {
  822. if put != nil {
  823. updateRPCInfoInContext(ctx, rpcInfo{bytesSent: false, bytesReceived: false})
  824. put()
  825. }
  826. return nil, nil, err
  827. }
  828. return t, put, nil
  829. }
  830. // Close tears down the ClientConn and all underlying connections.
  831. func (cc *ClientConn) Close() error {
  832. cc.cancel()
  833. cc.mu.Lock()
  834. if cc.conns == nil {
  835. cc.mu.Unlock()
  836. return ErrClientConnClosing
  837. }
  838. conns := cc.conns
  839. cc.conns = nil
  840. cc.csMgr.updateState(connectivity.Shutdown)
  841. cc.mu.Unlock()
  842. if cc.dopts.balancer != nil {
  843. cc.dopts.balancer.Close()
  844. }
  845. for _, ac := range conns {
  846. ac.tearDown(ErrClientConnClosing)
  847. }
  848. return nil
  849. }
  850. // addrConn is a network connection to a given address.
  851. type addrConn struct {
  852. ctx context.Context
  853. cancel context.CancelFunc
  854. cc *ClientConn
  855. curAddr Address
  856. addrs []Address
  857. dopts dialOptions
  858. events trace.EventLog
  859. csEvltr *connectivityStateEvaluator
  860. mu sync.Mutex
  861. state connectivity.State
  862. down func(error) // the handler called when a connection is down.
  863. // ready is closed and becomes nil when a new transport is up or failed
  864. // due to timeout.
  865. ready chan struct{}
  866. transport transport.ClientTransport
  867. // The reason this addrConn is torn down.
  868. tearDownErr error
  869. }
  870. // adjustParams updates parameters used to create transports upon
  871. // receiving a GoAway.
  872. func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
  873. switch r {
  874. case transport.TooManyPings:
  875. v := 2 * ac.dopts.copts.KeepaliveParams.Time
  876. ac.cc.mu.Lock()
  877. if v > ac.cc.mkp.Time {
  878. ac.cc.mkp.Time = v
  879. }
  880. ac.cc.mu.Unlock()
  881. }
  882. }
  883. // printf records an event in ac's event log, unless ac has been closed.
  884. // REQUIRES ac.mu is held.
  885. func (ac *addrConn) printf(format string, a ...interface{}) {
  886. if ac.events != nil {
  887. ac.events.Printf(format, a...)
  888. }
  889. }
  890. // errorf records an error in ac's event log, unless ac has been closed.
  891. // REQUIRES ac.mu is held.
  892. func (ac *addrConn) errorf(format string, a ...interface{}) {
  893. if ac.events != nil {
  894. ac.events.Errorf(format, a...)
  895. }
  896. }
  897. // resetTransport recreates a transport to the address for ac.
  898. // For the old transport:
  899. // - if drain is true, it will be gracefully closed.
  900. // - otherwise, it will be closed.
  901. func (ac *addrConn) resetTransport(drain bool) error {
  902. ac.mu.Lock()
  903. if ac.state == connectivity.Shutdown {
  904. ac.mu.Unlock()
  905. return errConnClosing
  906. }
  907. ac.printf("connecting")
  908. if ac.down != nil {
  909. ac.down(downErrorf(false, true, "%v", errNetworkIO))
  910. ac.down = nil
  911. }
  912. oldState := ac.state
  913. ac.state = connectivity.Connecting
  914. ac.csEvltr.recordTransition(oldState, ac.state)
  915. t := ac.transport
  916. ac.transport = nil
  917. ac.mu.Unlock()
  918. if t != nil && !drain {
  919. t.Close()
  920. }
  921. ac.cc.mu.RLock()
  922. ac.dopts.copts.KeepaliveParams = ac.cc.mkp
  923. ac.cc.mu.RUnlock()
  924. for retries := 0; ; retries++ {
  925. ac.mu.Lock()
  926. sleepTime := ac.dopts.bs.backoff(retries)
  927. timeout := minConnectTimeout
  928. if timeout < time.Duration(int(sleepTime)/len(ac.addrs)) {
  929. timeout = time.Duration(int(sleepTime) / len(ac.addrs))
  930. }
  931. connectTime := time.Now()
  932. // copy ac.addrs in case of race
  933. addrsIter := make([]Address, len(ac.addrs))
  934. copy(addrsIter, ac.addrs)
  935. ac.mu.Unlock()
  936. for _, addr := range addrsIter {
  937. ac.mu.Lock()
  938. if ac.state == connectivity.Shutdown {
  939. // ac.tearDown(...) has been invoked.
  940. ac.mu.Unlock()
  941. return errConnClosing
  942. }
  943. ac.mu.Unlock()
  944. ctx, cancel := context.WithTimeout(ac.ctx, timeout)
  945. sinfo := transport.TargetInfo{
  946. Addr: addr.Addr,
  947. Metadata: addr.Metadata,
  948. }
  949. newTransport, err := transport.NewClientTransport(ctx, sinfo, ac.dopts.copts)
  950. // Don't call cancel in success path due to a race in Go 1.6:
  951. // https://github.com/golang/go/issues/15078.
  952. if err != nil {
  953. cancel()
  954. if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
  955. return err
  956. }
  957. grpclog.Warningf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %v", err, addr)
  958. ac.mu.Lock()
  959. if ac.state == connectivity.Shutdown {
  960. // ac.tearDown(...) has been invoked.
  961. ac.mu.Unlock()
  962. return errConnClosing
  963. }
  964. ac.errorf("transient failure: %v", err)
  965. oldState = ac.state
  966. ac.state = connectivity.TransientFailure
  967. ac.csEvltr.recordTransition(oldState, ac.state)
  968. if ac.ready != nil {
  969. close(ac.ready)
  970. ac.ready = nil
  971. }
  972. ac.mu.Unlock()
  973. continue
  974. }
  975. ac.mu.Lock()
  976. ac.printf("ready")
  977. if ac.state == connectivity.Shutdown {
  978. // ac.tearDown(...) has been invoked.
  979. ac.mu.Unlock()
  980. newTransport.Close()
  981. return errConnClosing
  982. }
  983. oldState = ac.state
  984. ac.state = connectivity.Ready
  985. ac.csEvltr.recordTransition(oldState, ac.state)
  986. ac.transport = newTransport
  987. if ac.ready != nil {
  988. close(ac.ready)
  989. ac.ready = nil
  990. }
  991. if ac.cc.dopts.balancer != nil {
  992. ac.down = ac.cc.dopts.balancer.Up(addr)
  993. }
  994. ac.curAddr = addr
  995. ac.mu.Unlock()
  996. return nil
  997. }
  998. timer := time.NewTimer(sleepTime - time.Since(connectTime))
  999. select {
  1000. case <-timer.C:
  1001. case <-ac.ctx.Done():
  1002. timer.Stop()
  1003. return ac.ctx.Err()
  1004. }
  1005. timer.Stop()
  1006. }
  1007. }
  1008. // Run in a goroutine to track the error in transport and create the
  1009. // new transport if an error happens. It returns when the channel is closing.
  1010. func (ac *addrConn) transportMonitor() {
  1011. for {
  1012. ac.mu.Lock()
  1013. t := ac.transport
  1014. ac.mu.Unlock()
  1015. select {
  1016. // This is needed to detect the teardown when
  1017. // the addrConn is idle (i.e., no RPC in flight).
  1018. case <-ac.ctx.Done():
  1019. select {
  1020. case <-t.Error():
  1021. t.Close()
  1022. default:
  1023. }
  1024. return
  1025. case <-t.GoAway():
  1026. ac.adjustParams(t.GetGoAwayReason())
  1027. // If GoAway happens without any network I/O error, the underlying transport
  1028. // will be gracefully closed, and a new transport will be created.
  1029. // (The transport will be closed when all the pending RPCs finished or failed.)
  1030. // If GoAway and some network I/O error happen concurrently, the underlying transport
  1031. // will be closed, and a new transport will be created.
  1032. var drain bool
  1033. select {
  1034. case <-t.Error():
  1035. default:
  1036. drain = true
  1037. }
  1038. if err := ac.resetTransport(drain); err != nil {
  1039. grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err)
  1040. if err != errConnClosing {
  1041. // Keep this ac in cc.conns, to get the reason it's torn down.
  1042. ac.tearDown(err)
  1043. }
  1044. return
  1045. }
  1046. case <-t.Error():
  1047. select {
  1048. case <-ac.ctx.Done():
  1049. t.Close()
  1050. return
  1051. case <-t.GoAway():
  1052. ac.adjustParams(t.GetGoAwayReason())
  1053. if err := ac.resetTransport(false); err != nil {
  1054. grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err)
  1055. if err != errConnClosing {
  1056. // Keep this ac in cc.conns, to get the reason it's torn down.
  1057. ac.tearDown(err)
  1058. }
  1059. return
  1060. }
  1061. default:
  1062. }
  1063. ac.mu.Lock()
  1064. if ac.state == connectivity.Shutdown {
  1065. // ac has been shutdown.
  1066. ac.mu.Unlock()
  1067. return
  1068. }
  1069. oldState := ac.state
  1070. ac.state = connectivity.TransientFailure
  1071. ac.csEvltr.recordTransition(oldState, ac.state)
  1072. ac.mu.Unlock()
  1073. if err := ac.resetTransport(false); err != nil {
  1074. grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err)
  1075. ac.mu.Lock()
  1076. ac.printf("transport exiting: %v", err)
  1077. ac.mu.Unlock()
  1078. grpclog.Warningf("grpc: addrConn.transportMonitor exits due to: %v", err)
  1079. if err != errConnClosing {
  1080. // Keep this ac in cc.conns, to get the reason it's torn down.
  1081. ac.tearDown(err)
  1082. }
  1083. return
  1084. }
  1085. }
  1086. }
  1087. }
  1088. // wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or
  1089. // iv) transport is in connectivity.TransientFailure and there is a balancer/failfast is true.
  1090. func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (transport.ClientTransport, error) {
  1091. for {
  1092. ac.mu.Lock()
  1093. switch {
  1094. case ac.state == connectivity.Shutdown:
  1095. if failfast || !hasBalancer {
  1096. // RPC is failfast or balancer is nil. This RPC should fail with ac.tearDownErr.
  1097. err := ac.tearDownErr
  1098. ac.mu.Unlock()
  1099. return nil, err
  1100. }
  1101. ac.mu.Unlock()
  1102. return nil, errConnClosing
  1103. case ac.state == connectivity.Ready:
  1104. ct := ac.transport
  1105. ac.mu.Unlock()
  1106. return ct, nil
  1107. case ac.state == connectivity.TransientFailure:
  1108. if failfast || hasBalancer {
  1109. ac.mu.Unlock()
  1110. return nil, errConnUnavailable
  1111. }
  1112. }
  1113. ready := ac.ready
  1114. if ready == nil {
  1115. ready = make(chan struct{})
  1116. ac.ready = ready
  1117. }
  1118. ac.mu.Unlock()
  1119. select {
  1120. case <-ctx.Done():
  1121. return nil, toRPCErr(ctx.Err())
  1122. // Wait until the new transport is ready or failed.
  1123. case <-ready:
  1124. }
  1125. }
  1126. }
  1127. // tearDown starts to tear down the addrConn.
  1128. // TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
  1129. // some edge cases (e.g., the caller opens and closes many addrConn's in a
  1130. // tight loop.
  1131. // tearDown doesn't remove ac from ac.cc.conns.
  1132. func (ac *addrConn) tearDown(err error) {
  1133. ac.cancel()
  1134. ac.mu.Lock()
  1135. ac.curAddr = Address{}
  1136. defer ac.mu.Unlock()
  1137. if ac.down != nil {
  1138. ac.down(downErrorf(false, false, "%v", err))
  1139. ac.down = nil
  1140. }
  1141. if err == errConnDrain && ac.transport != nil {
  1142. // GracefulClose(...) may be executed multiple times when
  1143. // i) receiving multiple GoAway frames from the server; or
  1144. // ii) there are concurrent name resolver/Balancer triggered
  1145. // address removal and GoAway.
  1146. ac.transport.GracefulClose()
  1147. }
  1148. if ac.state == connectivity.Shutdown {
  1149. return
  1150. }
  1151. oldState := ac.state
  1152. ac.state = connectivity.Shutdown
  1153. ac.tearDownErr = err
  1154. ac.csEvltr.recordTransition(oldState, ac.state)
  1155. if ac.events != nil {
  1156. ac.events.Finish()
  1157. ac.events = nil
  1158. }
  1159. if ac.ready != nil {
  1160. close(ac.ready)
  1161. ac.ready = nil
  1162. }
  1163. if ac.transport != nil && err != errConnDrain {
  1164. ac.transport.Close()
  1165. }
  1166. return
  1167. }