clientconn.go 30 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "errors"
  21. "fmt"
  22. "net"
  23. "strings"
  24. "sync"
  25. "time"
  26. "golang.org/x/net/context"
  27. "golang.org/x/net/trace"
  28. "google.golang.org/grpc/credentials"
  29. "google.golang.org/grpc/grpclog"
  30. "google.golang.org/grpc/keepalive"
  31. "google.golang.org/grpc/stats"
  32. "google.golang.org/grpc/transport"
  33. )
  34. var (
  35. // ErrClientConnClosing indicates that the operation is illegal because
  36. // the ClientConn is closing.
  37. ErrClientConnClosing = errors.New("grpc: the client connection is closing")
  38. // ErrClientConnTimeout indicates that the ClientConn cannot establish the
  39. // underlying connections within the specified timeout.
  40. // DEPRECATED: Please use context.DeadlineExceeded instead.
  41. ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
  42. // errNoTransportSecurity indicates that there is no transport security
  43. // being set for ClientConn. Users should either set one or explicitly
  44. // call WithInsecure DialOption to disable security.
  45. errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")
  46. // errTransportCredentialsMissing indicates that users want to transmit security
  47. // information (e.g., oauth2 token) which requires secure connection on an insecure
  48. // connection.
  49. errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
  50. // errCredentialsConflict indicates that grpc.WithTransportCredentials()
  51. // and grpc.WithInsecure() are both called for a connection.
  52. errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
  53. // errNetworkIO indicates that the connection is down due to some network I/O error.
  54. errNetworkIO = errors.New("grpc: failed with network I/O error")
  55. // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
  56. errConnDrain = errors.New("grpc: the connection is drained")
  57. // errConnClosing indicates that the connection is closing.
  58. errConnClosing = errors.New("grpc: the connection is closing")
  59. // errConnUnavailable indicates that the connection is unavailable.
  60. errConnUnavailable = errors.New("grpc: the connection is unavailable")
  61. // errBalancerClosed indicates that the balancer is closed.
  62. errBalancerClosed = errors.New("grpc: balancer is closed")
  63. // minimum time to give a connection to complete
  64. minConnectTimeout = 20 * time.Second
  65. )
  66. // dialOptions configure a Dial call. dialOptions are set by the DialOption
  67. // values passed to Dial.
  68. type dialOptions struct {
  69. unaryInt UnaryClientInterceptor
  70. streamInt StreamClientInterceptor
  71. codec Codec
  72. cp Compressor
  73. dc Decompressor
  74. bs backoffStrategy
  75. balancer Balancer
  76. block bool
  77. insecure bool
  78. timeout time.Duration
  79. scChan <-chan ServiceConfig
  80. copts transport.ConnectOptions
  81. callOptions []CallOption
  82. }
  83. const (
  84. defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
  85. defaultClientMaxSendMessageSize = 1024 * 1024 * 4
  86. )
  87. // DialOption configures how we set up the connection.
  88. type DialOption func(*dialOptions)
  89. // WithInitialWindowSize returns a DialOption which sets the value for initial window size on a stream.
  90. // The lower bound for window size is 64K and any value smaller than that will be ignored.
  91. func WithInitialWindowSize(s int32) DialOption {
  92. return func(o *dialOptions) {
  93. o.copts.InitialWindowSize = s
  94. }
  95. }
  96. // WithInitialConnWindowSize returns a DialOption which sets the value for initial window size on a connection.
  97. // The lower bound for window size is 64K and any value smaller than that will be ignored.
  98. func WithInitialConnWindowSize(s int32) DialOption {
  99. return func(o *dialOptions) {
  100. o.copts.InitialConnWindowSize = s
  101. }
  102. }
  103. // WithMaxMsgSize returns a DialOption which sets the maximum message size the client can receive. Deprecated: use WithDefaultCallOptions(MaxCallRecvMsgSize(s)) instead.
  104. func WithMaxMsgSize(s int) DialOption {
  105. return WithDefaultCallOptions(MaxCallRecvMsgSize(s))
  106. }
  107. // WithDefaultCallOptions returns a DialOption which sets the default CallOptions for calls over the connection.
  108. func WithDefaultCallOptions(cos ...CallOption) DialOption {
  109. return func(o *dialOptions) {
  110. o.callOptions = append(o.callOptions, cos...)
  111. }
  112. }
  113. // WithCodec returns a DialOption which sets a codec for message marshaling and unmarshaling.
  114. func WithCodec(c Codec) DialOption {
  115. return func(o *dialOptions) {
  116. o.codec = c
  117. }
  118. }
  119. // WithCompressor returns a DialOption which sets a CompressorGenerator for generating message
  120. // compressor.
  121. func WithCompressor(cp Compressor) DialOption {
  122. return func(o *dialOptions) {
  123. o.cp = cp
  124. }
  125. }
  126. // WithDecompressor returns a DialOption which sets a DecompressorGenerator for generating
  127. // message decompressor.
  128. func WithDecompressor(dc Decompressor) DialOption {
  129. return func(o *dialOptions) {
  130. o.dc = dc
  131. }
  132. }
  133. // WithBalancer returns a DialOption which sets a load balancer.
  134. func WithBalancer(b Balancer) DialOption {
  135. return func(o *dialOptions) {
  136. o.balancer = b
  137. }
  138. }
  139. // WithServiceConfig returns a DialOption which has a channel to read the service configuration.
  140. func WithServiceConfig(c <-chan ServiceConfig) DialOption {
  141. return func(o *dialOptions) {
  142. o.scChan = c
  143. }
  144. }
  145. // WithBackoffMaxDelay configures the dialer to use the provided maximum delay
  146. // when backing off after failed connection attempts.
  147. func WithBackoffMaxDelay(md time.Duration) DialOption {
  148. return WithBackoffConfig(BackoffConfig{MaxDelay: md})
  149. }
  150. // WithBackoffConfig configures the dialer to use the provided backoff
  151. // parameters after connection failures.
  152. //
  153. // Use WithBackoffMaxDelay until more parameters on BackoffConfig are opened up
  154. // for use.
  155. func WithBackoffConfig(b BackoffConfig) DialOption {
  156. // Set defaults to ensure that provided BackoffConfig is valid and
  157. // unexported fields get default values.
  158. setDefaults(&b)
  159. return withBackoff(b)
  160. }
  161. // withBackoff sets the backoff strategy used for retries after a
  162. // failed connection attempt.
  163. //
  164. // This can be exported if arbitrary backoff strategies are allowed by gRPC.
  165. func withBackoff(bs backoffStrategy) DialOption {
  166. return func(o *dialOptions) {
  167. o.bs = bs
  168. }
  169. }
  170. // WithBlock returns a DialOption which makes caller of Dial blocks until the underlying
  171. // connection is up. Without this, Dial returns immediately and connecting the server
  172. // happens in background.
  173. func WithBlock() DialOption {
  174. return func(o *dialOptions) {
  175. o.block = true
  176. }
  177. }
  178. // WithInsecure returns a DialOption which disables transport security for this ClientConn.
  179. // Note that transport security is required unless WithInsecure is set.
  180. func WithInsecure() DialOption {
  181. return func(o *dialOptions) {
  182. o.insecure = true
  183. }
  184. }
  185. // WithTransportCredentials returns a DialOption which configures a
  186. // connection level security credentials (e.g., TLS/SSL).
  187. func WithTransportCredentials(creds credentials.TransportCredentials) DialOption {
  188. return func(o *dialOptions) {
  189. o.copts.TransportCredentials = creds
  190. }
  191. }
  192. // WithPerRPCCredentials returns a DialOption which sets
  193. // credentials and places auth state on each outbound RPC.
  194. func WithPerRPCCredentials(creds credentials.PerRPCCredentials) DialOption {
  195. return func(o *dialOptions) {
  196. o.copts.PerRPCCredentials = append(o.copts.PerRPCCredentials, creds)
  197. }
  198. }
  199. // WithTimeout returns a DialOption that configures a timeout for dialing a ClientConn
  200. // initially. This is valid if and only if WithBlock() is present.
  201. // Deprecated: use DialContext and context.WithTimeout instead.
  202. func WithTimeout(d time.Duration) DialOption {
  203. return func(o *dialOptions) {
  204. o.timeout = d
  205. }
  206. }
  207. // WithDialer returns a DialOption that specifies a function to use for dialing network addresses.
  208. // If FailOnNonTempDialError() is set to true, and an error is returned by f, gRPC checks the error's
  209. // Temporary() method to decide if it should try to reconnect to the network address.
  210. func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption {
  211. return func(o *dialOptions) {
  212. o.copts.Dialer = func(ctx context.Context, addr string) (net.Conn, error) {
  213. if deadline, ok := ctx.Deadline(); ok {
  214. return f(addr, deadline.Sub(time.Now()))
  215. }
  216. return f(addr, 0)
  217. }
  218. }
  219. }
  220. // WithStatsHandler returns a DialOption that specifies the stats handler
  221. // for all the RPCs and underlying network connections in this ClientConn.
  222. func WithStatsHandler(h stats.Handler) DialOption {
  223. return func(o *dialOptions) {
  224. o.copts.StatsHandler = h
  225. }
  226. }
  227. // FailOnNonTempDialError returns a DialOption that specifies if gRPC fails on non-temporary dial errors.
  228. // If f is true, and dialer returns a non-temporary error, gRPC will fail the connection to the network
  229. // address and won't try to reconnect.
  230. // The default value of FailOnNonTempDialError is false.
  231. // This is an EXPERIMENTAL API.
  232. func FailOnNonTempDialError(f bool) DialOption {
  233. return func(o *dialOptions) {
  234. o.copts.FailOnNonTempDialError = f
  235. }
  236. }
  237. // WithUserAgent returns a DialOption that specifies a user agent string for all the RPCs.
  238. func WithUserAgent(s string) DialOption {
  239. return func(o *dialOptions) {
  240. o.copts.UserAgent = s
  241. }
  242. }
  243. // WithKeepaliveParams returns a DialOption that specifies keepalive paramaters for the client transport.
  244. func WithKeepaliveParams(kp keepalive.ClientParameters) DialOption {
  245. return func(o *dialOptions) {
  246. o.copts.KeepaliveParams = kp
  247. }
  248. }
  249. // WithUnaryInterceptor returns a DialOption that specifies the interceptor for unary RPCs.
  250. func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption {
  251. return func(o *dialOptions) {
  252. o.unaryInt = f
  253. }
  254. }
  255. // WithStreamInterceptor returns a DialOption that specifies the interceptor for streaming RPCs.
  256. func WithStreamInterceptor(f StreamClientInterceptor) DialOption {
  257. return func(o *dialOptions) {
  258. o.streamInt = f
  259. }
  260. }
  261. // WithAuthority returns a DialOption that specifies the value to be used as
  262. // the :authority pseudo-header. This value only works with WithInsecure and
  263. // has no effect if TransportCredentials are present.
  264. func WithAuthority(a string) DialOption {
  265. return func(o *dialOptions) {
  266. o.copts.Authority = a
  267. }
  268. }
  269. // Dial creates a client connection to the given target.
  270. func Dial(target string, opts ...DialOption) (*ClientConn, error) {
  271. return DialContext(context.Background(), target, opts...)
  272. }
  273. // DialContext creates a client connection to the given target. ctx can be used to
  274. // cancel or expire the pending connection. Once this function returns, the
  275. // cancellation and expiration of ctx will be noop. Users should call ClientConn.Close
  276. // to terminate all the pending operations after this function returns.
  277. func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
  278. cc := &ClientConn{
  279. target: target,
  280. conns: make(map[Address]*addrConn),
  281. }
  282. cc.ctx, cc.cancel = context.WithCancel(context.Background())
  283. for _, opt := range opts {
  284. opt(&cc.dopts)
  285. }
  286. cc.mkp = cc.dopts.copts.KeepaliveParams
  287. if cc.dopts.copts.Dialer == nil {
  288. cc.dopts.copts.Dialer = newProxyDialer(
  289. func(ctx context.Context, addr string) (net.Conn, error) {
  290. return dialContext(ctx, "tcp", addr)
  291. },
  292. )
  293. }
  294. if cc.dopts.copts.UserAgent != "" {
  295. cc.dopts.copts.UserAgent += " " + grpcUA
  296. } else {
  297. cc.dopts.copts.UserAgent = grpcUA
  298. }
  299. if cc.dopts.timeout > 0 {
  300. var cancel context.CancelFunc
  301. ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
  302. defer cancel()
  303. }
  304. defer func() {
  305. select {
  306. case <-ctx.Done():
  307. conn, err = nil, ctx.Err()
  308. default:
  309. }
  310. if err != nil {
  311. cc.Close()
  312. }
  313. }()
  314. scSet := false
  315. if cc.dopts.scChan != nil {
  316. // Try to get an initial service config.
  317. select {
  318. case sc, ok := <-cc.dopts.scChan:
  319. if ok {
  320. cc.sc = sc
  321. scSet = true
  322. }
  323. default:
  324. }
  325. }
  326. // Set defaults.
  327. if cc.dopts.codec == nil {
  328. cc.dopts.codec = protoCodec{}
  329. }
  330. if cc.dopts.bs == nil {
  331. cc.dopts.bs = DefaultBackoffConfig
  332. }
  333. creds := cc.dopts.copts.TransportCredentials
  334. if creds != nil && creds.Info().ServerName != "" {
  335. cc.authority = creds.Info().ServerName
  336. } else if cc.dopts.insecure && cc.dopts.copts.Authority != "" {
  337. cc.authority = cc.dopts.copts.Authority
  338. } else {
  339. cc.authority = target
  340. }
  341. waitC := make(chan error, 1)
  342. go func() {
  343. defer close(waitC)
  344. if cc.dopts.balancer == nil && cc.sc.LB != nil {
  345. cc.dopts.balancer = cc.sc.LB
  346. }
  347. if cc.dopts.balancer != nil {
  348. var credsClone credentials.TransportCredentials
  349. if creds != nil {
  350. credsClone = creds.Clone()
  351. }
  352. config := BalancerConfig{
  353. DialCreds: credsClone,
  354. Dialer: cc.dopts.copts.Dialer,
  355. }
  356. if err := cc.dopts.balancer.Start(target, config); err != nil {
  357. waitC <- err
  358. return
  359. }
  360. ch := cc.dopts.balancer.Notify()
  361. if ch != nil {
  362. if cc.dopts.block {
  363. doneChan := make(chan struct{})
  364. go cc.lbWatcher(doneChan)
  365. <-doneChan
  366. } else {
  367. go cc.lbWatcher(nil)
  368. }
  369. return
  370. }
  371. }
  372. // No balancer, or no resolver within the balancer. Connect directly.
  373. if err := cc.resetAddrConn(Address{Addr: target}, cc.dopts.block, nil); err != nil {
  374. waitC <- err
  375. return
  376. }
  377. }()
  378. select {
  379. case <-ctx.Done():
  380. return nil, ctx.Err()
  381. case err := <-waitC:
  382. if err != nil {
  383. return nil, err
  384. }
  385. }
  386. if cc.dopts.scChan != nil && !scSet {
  387. // Blocking wait for the initial service config.
  388. select {
  389. case sc, ok := <-cc.dopts.scChan:
  390. if ok {
  391. cc.sc = sc
  392. }
  393. case <-ctx.Done():
  394. return nil, ctx.Err()
  395. }
  396. }
  397. if cc.dopts.scChan != nil {
  398. go cc.scWatcher()
  399. }
  400. return cc, nil
  401. }
  402. // ConnectivityState indicates the state of a client connection.
  403. type ConnectivityState int
  404. const (
  405. // Idle indicates the ClientConn is idle.
  406. Idle ConnectivityState = iota
  407. // Connecting indicates the ClienConn is connecting.
  408. Connecting
  409. // Ready indicates the ClientConn is ready for work.
  410. Ready
  411. // TransientFailure indicates the ClientConn has seen a failure but expects to recover.
  412. TransientFailure
  413. // Shutdown indicates the ClientConn has started shutting down.
  414. Shutdown
  415. )
  416. func (s ConnectivityState) String() string {
  417. switch s {
  418. case Idle:
  419. return "IDLE"
  420. case Connecting:
  421. return "CONNECTING"
  422. case Ready:
  423. return "READY"
  424. case TransientFailure:
  425. return "TRANSIENT_FAILURE"
  426. case Shutdown:
  427. return "SHUTDOWN"
  428. default:
  429. panic(fmt.Sprintf("unknown connectivity state: %d", s))
  430. }
  431. }
  432. // ClientConn represents a client connection to an RPC server.
  433. type ClientConn struct {
  434. ctx context.Context
  435. cancel context.CancelFunc
  436. target string
  437. authority string
  438. dopts dialOptions
  439. mu sync.RWMutex
  440. sc ServiceConfig
  441. conns map[Address]*addrConn
  442. // Keepalive parameter can be updated if a GoAway is received.
  443. mkp keepalive.ClientParameters
  444. }
  445. // lbWatcher watches the Notify channel of the balancer in cc and manages
  446. // connections accordingly. If doneChan is not nil, it is closed after the
  447. // first successfull connection is made.
  448. func (cc *ClientConn) lbWatcher(doneChan chan struct{}) {
  449. for addrs := range cc.dopts.balancer.Notify() {
  450. var (
  451. add []Address // Addresses need to setup connections.
  452. del []*addrConn // Connections need to tear down.
  453. )
  454. cc.mu.Lock()
  455. for _, a := range addrs {
  456. if _, ok := cc.conns[a]; !ok {
  457. add = append(add, a)
  458. }
  459. }
  460. for k, c := range cc.conns {
  461. var keep bool
  462. for _, a := range addrs {
  463. if k == a {
  464. keep = true
  465. break
  466. }
  467. }
  468. if !keep {
  469. del = append(del, c)
  470. delete(cc.conns, c.addr)
  471. }
  472. }
  473. cc.mu.Unlock()
  474. for _, a := range add {
  475. if doneChan != nil {
  476. err := cc.resetAddrConn(a, true, nil)
  477. if err == nil {
  478. close(doneChan)
  479. doneChan = nil
  480. }
  481. } else {
  482. cc.resetAddrConn(a, false, nil)
  483. }
  484. }
  485. for _, c := range del {
  486. c.tearDown(errConnDrain)
  487. }
  488. }
  489. }
  490. func (cc *ClientConn) scWatcher() {
  491. for {
  492. select {
  493. case sc, ok := <-cc.dopts.scChan:
  494. if !ok {
  495. return
  496. }
  497. cc.mu.Lock()
  498. // TODO: load balance policy runtime change is ignored.
  499. // We may revist this decision in the future.
  500. cc.sc = sc
  501. cc.mu.Unlock()
  502. case <-cc.ctx.Done():
  503. return
  504. }
  505. }
  506. }
  507. // resetAddrConn creates an addrConn for addr and adds it to cc.conns.
  508. // If there is an old addrConn for addr, it will be torn down, using tearDownErr as the reason.
  509. // If tearDownErr is nil, errConnDrain will be used instead.
  510. //
  511. // We should never need to replace an addrConn with a new one. This function is only used
  512. // as newAddrConn to create new addrConn.
  513. // TODO rename this function and clean up the code.
  514. func (cc *ClientConn) resetAddrConn(addr Address, block bool, tearDownErr error) error {
  515. ac := &addrConn{
  516. cc: cc,
  517. addr: addr,
  518. dopts: cc.dopts,
  519. }
  520. ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
  521. ac.stateCV = sync.NewCond(&ac.mu)
  522. if EnableTracing {
  523. ac.events = trace.NewEventLog("grpc.ClientConn", ac.addr.Addr)
  524. }
  525. if !ac.dopts.insecure {
  526. if ac.dopts.copts.TransportCredentials == nil {
  527. return errNoTransportSecurity
  528. }
  529. } else {
  530. if ac.dopts.copts.TransportCredentials != nil {
  531. return errCredentialsConflict
  532. }
  533. for _, cd := range ac.dopts.copts.PerRPCCredentials {
  534. if cd.RequireTransportSecurity() {
  535. return errTransportCredentialsMissing
  536. }
  537. }
  538. }
  539. // Track ac in cc. This needs to be done before any getTransport(...) is called.
  540. cc.mu.Lock()
  541. if cc.conns == nil {
  542. cc.mu.Unlock()
  543. return ErrClientConnClosing
  544. }
  545. stale := cc.conns[ac.addr]
  546. cc.conns[ac.addr] = ac
  547. cc.mu.Unlock()
  548. if stale != nil {
  549. // There is an addrConn alive on ac.addr already. This could be due to
  550. // a buggy Balancer that reports duplicated Addresses.
  551. if tearDownErr == nil {
  552. // tearDownErr is nil if resetAddrConn is called by
  553. // 1) Dial
  554. // 2) lbWatcher
  555. // In both cases, the stale ac should drain, not close.
  556. stale.tearDown(errConnDrain)
  557. } else {
  558. stale.tearDown(tearDownErr)
  559. }
  560. }
  561. if block {
  562. if err := ac.resetTransport(false); err != nil {
  563. if err != errConnClosing {
  564. // Tear down ac and delete it from cc.conns.
  565. cc.mu.Lock()
  566. delete(cc.conns, ac.addr)
  567. cc.mu.Unlock()
  568. ac.tearDown(err)
  569. }
  570. if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
  571. return e.Origin()
  572. }
  573. return err
  574. }
  575. // Start to monitor the error status of transport.
  576. go ac.transportMonitor()
  577. } else {
  578. // Start a goroutine connecting to the server asynchronously.
  579. go func() {
  580. if err := ac.resetTransport(false); err != nil {
  581. grpclog.Warningf("Failed to dial %s: %v; please retry.", ac.addr.Addr, err)
  582. if err != errConnClosing {
  583. // Keep this ac in cc.conns, to get the reason it's torn down.
  584. ac.tearDown(err)
  585. }
  586. return
  587. }
  588. ac.transportMonitor()
  589. }()
  590. }
  591. return nil
  592. }
  593. // GetMethodConfig gets the method config of the input method.
  594. // If there's an exact match for input method (i.e. /service/method), we return
  595. // the corresponding MethodConfig.
  596. // If there isn't an exact match for the input method, we look for the default config
  597. // under the service (i.e /service/). If there is a default MethodConfig for
  598. // the serivce, we return it.
  599. // Otherwise, we return an empty MethodConfig.
  600. func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
  601. // TODO: Avoid the locking here.
  602. cc.mu.RLock()
  603. defer cc.mu.RUnlock()
  604. m, ok := cc.sc.Methods[method]
  605. if !ok {
  606. i := strings.LastIndex(method, "/")
  607. m, _ = cc.sc.Methods[method[:i+1]]
  608. }
  609. return m
  610. }
  611. func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) {
  612. var (
  613. ac *addrConn
  614. ok bool
  615. put func()
  616. )
  617. if cc.dopts.balancer == nil {
  618. // If balancer is nil, there should be only one addrConn available.
  619. cc.mu.RLock()
  620. if cc.conns == nil {
  621. cc.mu.RUnlock()
  622. return nil, nil, toRPCErr(ErrClientConnClosing)
  623. }
  624. for _, ac = range cc.conns {
  625. // Break after the first iteration to get the first addrConn.
  626. ok = true
  627. break
  628. }
  629. cc.mu.RUnlock()
  630. } else {
  631. var (
  632. addr Address
  633. err error
  634. )
  635. addr, put, err = cc.dopts.balancer.Get(ctx, opts)
  636. if err != nil {
  637. return nil, nil, toRPCErr(err)
  638. }
  639. cc.mu.RLock()
  640. if cc.conns == nil {
  641. cc.mu.RUnlock()
  642. return nil, nil, toRPCErr(ErrClientConnClosing)
  643. }
  644. ac, ok = cc.conns[addr]
  645. cc.mu.RUnlock()
  646. }
  647. if !ok {
  648. if put != nil {
  649. updateRPCInfoInContext(ctx, rpcInfo{bytesSent: false, bytesReceived: false})
  650. put()
  651. }
  652. return nil, nil, errConnClosing
  653. }
  654. t, err := ac.wait(ctx, cc.dopts.balancer != nil, !opts.BlockingWait)
  655. if err != nil {
  656. if put != nil {
  657. updateRPCInfoInContext(ctx, rpcInfo{bytesSent: false, bytesReceived: false})
  658. put()
  659. }
  660. return nil, nil, err
  661. }
  662. return t, put, nil
  663. }
  664. // Close tears down the ClientConn and all underlying connections.
  665. func (cc *ClientConn) Close() error {
  666. cc.cancel()
  667. cc.mu.Lock()
  668. if cc.conns == nil {
  669. cc.mu.Unlock()
  670. return ErrClientConnClosing
  671. }
  672. conns := cc.conns
  673. cc.conns = nil
  674. cc.mu.Unlock()
  675. if cc.dopts.balancer != nil {
  676. cc.dopts.balancer.Close()
  677. }
  678. for _, ac := range conns {
  679. ac.tearDown(ErrClientConnClosing)
  680. }
  681. return nil
  682. }
  683. // addrConn is a network connection to a given address.
  684. type addrConn struct {
  685. ctx context.Context
  686. cancel context.CancelFunc
  687. cc *ClientConn
  688. addr Address
  689. dopts dialOptions
  690. events trace.EventLog
  691. mu sync.Mutex
  692. state ConnectivityState
  693. stateCV *sync.Cond
  694. down func(error) // the handler called when a connection is down.
  695. // ready is closed and becomes nil when a new transport is up or failed
  696. // due to timeout.
  697. ready chan struct{}
  698. transport transport.ClientTransport
  699. // The reason this addrConn is torn down.
  700. tearDownErr error
  701. }
  702. // adjustParams updates parameters used to create transports upon
  703. // receiving a GoAway.
  704. func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
  705. switch r {
  706. case transport.TooManyPings:
  707. v := 2 * ac.dopts.copts.KeepaliveParams.Time
  708. ac.cc.mu.Lock()
  709. if v > ac.cc.mkp.Time {
  710. ac.cc.mkp.Time = v
  711. }
  712. ac.cc.mu.Unlock()
  713. }
  714. }
  715. // printf records an event in ac's event log, unless ac has been closed.
  716. // REQUIRES ac.mu is held.
  717. func (ac *addrConn) printf(format string, a ...interface{}) {
  718. if ac.events != nil {
  719. ac.events.Printf(format, a...)
  720. }
  721. }
  722. // errorf records an error in ac's event log, unless ac has been closed.
  723. // REQUIRES ac.mu is held.
  724. func (ac *addrConn) errorf(format string, a ...interface{}) {
  725. if ac.events != nil {
  726. ac.events.Errorf(format, a...)
  727. }
  728. }
  729. // getState returns the connectivity state of the Conn
  730. func (ac *addrConn) getState() ConnectivityState {
  731. ac.mu.Lock()
  732. defer ac.mu.Unlock()
  733. return ac.state
  734. }
  735. // waitForStateChange blocks until the state changes to something other than the sourceState.
  736. func (ac *addrConn) waitForStateChange(ctx context.Context, sourceState ConnectivityState) (ConnectivityState, error) {
  737. ac.mu.Lock()
  738. defer ac.mu.Unlock()
  739. if sourceState != ac.state {
  740. return ac.state, nil
  741. }
  742. done := make(chan struct{})
  743. var err error
  744. go func() {
  745. select {
  746. case <-ctx.Done():
  747. ac.mu.Lock()
  748. err = ctx.Err()
  749. ac.stateCV.Broadcast()
  750. ac.mu.Unlock()
  751. case <-done:
  752. }
  753. }()
  754. defer close(done)
  755. for sourceState == ac.state {
  756. ac.stateCV.Wait()
  757. if err != nil {
  758. return ac.state, err
  759. }
  760. }
  761. return ac.state, nil
  762. }
  763. // resetTransport recreates a transport to the address for ac.
  764. // For the old transport:
  765. // - if drain is true, it will be gracefully closed.
  766. // - otherwise, it will be closed.
  767. func (ac *addrConn) resetTransport(drain bool) error {
  768. ac.mu.Lock()
  769. if ac.state == Shutdown {
  770. ac.mu.Unlock()
  771. return errConnClosing
  772. }
  773. ac.printf("connecting")
  774. if ac.down != nil {
  775. ac.down(downErrorf(false, true, "%v", errNetworkIO))
  776. ac.down = nil
  777. }
  778. ac.state = Connecting
  779. ac.stateCV.Broadcast()
  780. t := ac.transport
  781. ac.transport = nil
  782. ac.mu.Unlock()
  783. if t != nil {
  784. if drain {
  785. t.GracefulClose()
  786. } else {
  787. t.Close()
  788. }
  789. }
  790. ac.cc.mu.RLock()
  791. ac.dopts.copts.KeepaliveParams = ac.cc.mkp
  792. ac.cc.mu.RUnlock()
  793. for retries := 0; ; retries++ {
  794. ac.mu.Lock()
  795. if ac.state == Shutdown {
  796. // ac.tearDown(...) has been invoked.
  797. ac.mu.Unlock()
  798. return errConnClosing
  799. }
  800. ac.mu.Unlock()
  801. sleepTime := ac.dopts.bs.backoff(retries)
  802. timeout := minConnectTimeout
  803. if timeout < sleepTime {
  804. timeout = sleepTime
  805. }
  806. ctx, cancel := context.WithTimeout(ac.ctx, timeout)
  807. connectTime := time.Now()
  808. sinfo := transport.TargetInfo{
  809. Addr: ac.addr.Addr,
  810. Metadata: ac.addr.Metadata,
  811. }
  812. newTransport, err := transport.NewClientTransport(ctx, sinfo, ac.dopts.copts)
  813. // Don't call cancel in success path due to a race in Go 1.6:
  814. // https://github.com/golang/go/issues/15078.
  815. if err != nil {
  816. cancel()
  817. if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
  818. return err
  819. }
  820. grpclog.Warningf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %v", err, ac.addr)
  821. ac.mu.Lock()
  822. if ac.state == Shutdown {
  823. // ac.tearDown(...) has been invoked.
  824. ac.mu.Unlock()
  825. return errConnClosing
  826. }
  827. ac.errorf("transient failure: %v", err)
  828. ac.state = TransientFailure
  829. ac.stateCV.Broadcast()
  830. if ac.ready != nil {
  831. close(ac.ready)
  832. ac.ready = nil
  833. }
  834. ac.mu.Unlock()
  835. timer := time.NewTimer(sleepTime - time.Since(connectTime))
  836. select {
  837. case <-timer.C:
  838. case <-ac.ctx.Done():
  839. timer.Stop()
  840. return ac.ctx.Err()
  841. }
  842. timer.Stop()
  843. continue
  844. }
  845. ac.mu.Lock()
  846. ac.printf("ready")
  847. if ac.state == Shutdown {
  848. // ac.tearDown(...) has been invoked.
  849. ac.mu.Unlock()
  850. newTransport.Close()
  851. return errConnClosing
  852. }
  853. ac.state = Ready
  854. ac.stateCV.Broadcast()
  855. ac.transport = newTransport
  856. if ac.ready != nil {
  857. close(ac.ready)
  858. ac.ready = nil
  859. }
  860. if ac.cc.dopts.balancer != nil {
  861. ac.down = ac.cc.dopts.balancer.Up(ac.addr)
  862. }
  863. ac.mu.Unlock()
  864. return nil
  865. }
  866. }
  867. // Run in a goroutine to track the error in transport and create the
  868. // new transport if an error happens. It returns when the channel is closing.
  869. func (ac *addrConn) transportMonitor() {
  870. for {
  871. ac.mu.Lock()
  872. t := ac.transport
  873. ac.mu.Unlock()
  874. select {
  875. // This is needed to detect the teardown when
  876. // the addrConn is idle (i.e., no RPC in flight).
  877. case <-ac.ctx.Done():
  878. select {
  879. case <-t.Error():
  880. t.Close()
  881. default:
  882. }
  883. return
  884. case <-t.GoAway():
  885. ac.adjustParams(t.GetGoAwayReason())
  886. // If GoAway happens without any network I/O error, the underlying transport
  887. // will be gracefully closed, and a new transport will be created.
  888. // (The transport will be closed when all the pending RPCs finished or failed.)
  889. // If GoAway and some network I/O error happen concurrently, the underlying transport
  890. // will be closed, and a new transport will be created.
  891. var drain bool
  892. select {
  893. case <-t.Error():
  894. default:
  895. drain = true
  896. }
  897. if err := ac.resetTransport(drain); err != nil {
  898. grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err)
  899. if err != errConnClosing {
  900. // Keep this ac in cc.conns, to get the reason it's torn down.
  901. ac.tearDown(err)
  902. }
  903. return
  904. }
  905. case <-t.Error():
  906. select {
  907. case <-ac.ctx.Done():
  908. t.Close()
  909. return
  910. case <-t.GoAway():
  911. ac.adjustParams(t.GetGoAwayReason())
  912. if err := ac.resetTransport(false); err != nil {
  913. grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err)
  914. if err != errConnClosing {
  915. // Keep this ac in cc.conns, to get the reason it's torn down.
  916. ac.tearDown(err)
  917. }
  918. return
  919. }
  920. default:
  921. }
  922. ac.mu.Lock()
  923. if ac.state == Shutdown {
  924. // ac has been shutdown.
  925. ac.mu.Unlock()
  926. return
  927. }
  928. ac.state = TransientFailure
  929. ac.stateCV.Broadcast()
  930. ac.mu.Unlock()
  931. if err := ac.resetTransport(false); err != nil {
  932. grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err)
  933. ac.mu.Lock()
  934. ac.printf("transport exiting: %v", err)
  935. ac.mu.Unlock()
  936. grpclog.Warningf("grpc: addrConn.transportMonitor exits due to: %v", err)
  937. if err != errConnClosing {
  938. // Keep this ac in cc.conns, to get the reason it's torn down.
  939. ac.tearDown(err)
  940. }
  941. return
  942. }
  943. }
  944. }
  945. }
  946. // wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or
  947. // iv) transport is in TransientFailure and there is a balancer/failfast is true.
  948. func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (transport.ClientTransport, error) {
  949. for {
  950. ac.mu.Lock()
  951. switch {
  952. case ac.state == Shutdown:
  953. if failfast || !hasBalancer {
  954. // RPC is failfast or balancer is nil. This RPC should fail with ac.tearDownErr.
  955. err := ac.tearDownErr
  956. ac.mu.Unlock()
  957. return nil, err
  958. }
  959. ac.mu.Unlock()
  960. return nil, errConnClosing
  961. case ac.state == Ready:
  962. ct := ac.transport
  963. ac.mu.Unlock()
  964. return ct, nil
  965. case ac.state == TransientFailure:
  966. if failfast || hasBalancer {
  967. ac.mu.Unlock()
  968. return nil, errConnUnavailable
  969. }
  970. }
  971. ready := ac.ready
  972. if ready == nil {
  973. ready = make(chan struct{})
  974. ac.ready = ready
  975. }
  976. ac.mu.Unlock()
  977. select {
  978. case <-ctx.Done():
  979. return nil, toRPCErr(ctx.Err())
  980. // Wait until the new transport is ready or failed.
  981. case <-ready:
  982. }
  983. }
  984. }
  985. // tearDown starts to tear down the addrConn.
  986. // TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
  987. // some edge cases (e.g., the caller opens and closes many addrConn's in a
  988. // tight loop.
  989. // tearDown doesn't remove ac from ac.cc.conns.
  990. func (ac *addrConn) tearDown(err error) {
  991. ac.cancel()
  992. ac.mu.Lock()
  993. defer ac.mu.Unlock()
  994. if ac.down != nil {
  995. ac.down(downErrorf(false, false, "%v", err))
  996. ac.down = nil
  997. }
  998. if err == errConnDrain && ac.transport != nil {
  999. // GracefulClose(...) may be executed multiple times when
  1000. // i) receiving multiple GoAway frames from the server; or
  1001. // ii) there are concurrent name resolver/Balancer triggered
  1002. // address removal and GoAway.
  1003. ac.transport.GracefulClose()
  1004. }
  1005. if ac.state == Shutdown {
  1006. return
  1007. }
  1008. ac.state = Shutdown
  1009. ac.tearDownErr = err
  1010. ac.stateCV.Broadcast()
  1011. if ac.events != nil {
  1012. ac.events.Finish()
  1013. ac.events = nil
  1014. }
  1015. if ac.ready != nil {
  1016. close(ac.ready)
  1017. ac.ready = nil
  1018. }
  1019. if ac.transport != nil && err != errConnDrain {
  1020. ac.transport.Close()
  1021. }
  1022. return
  1023. }