clientconn.go 37 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "errors"
  21. "fmt"
  22. "math"
  23. "net"
  24. "reflect"
  25. "strings"
  26. "sync"
  27. "sync/atomic"
  28. "time"
  29. "golang.org/x/net/context"
  30. "golang.org/x/net/trace"
  31. "google.golang.org/grpc/balancer"
  32. _ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
  33. "google.golang.org/grpc/codes"
  34. "google.golang.org/grpc/connectivity"
  35. "google.golang.org/grpc/credentials"
  36. "google.golang.org/grpc/grpclog"
  37. "google.golang.org/grpc/internal/backoff"
  38. "google.golang.org/grpc/internal/channelz"
  39. "google.golang.org/grpc/internal/transport"
  40. "google.golang.org/grpc/keepalive"
  41. "google.golang.org/grpc/resolver"
  42. _ "google.golang.org/grpc/resolver/dns" // To register dns resolver.
  43. _ "google.golang.org/grpc/resolver/passthrough" // To register passthrough resolver.
  44. "google.golang.org/grpc/status"
  45. )
  46. const (
  47. // minimum time to give a connection to complete
  48. minConnectTimeout = 20 * time.Second
  49. // must match grpclbName in grpclb/grpclb.go
  50. grpclbName = "grpclb"
  51. )
  52. var (
  53. // ErrClientConnClosing indicates that the operation is illegal because
  54. // the ClientConn is closing.
  55. //
  56. // Deprecated: this error should not be relied upon by users; use the status
  57. // code of Canceled instead.
  58. ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing")
  59. // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
  60. errConnDrain = errors.New("grpc: the connection is drained")
  61. // errConnClosing indicates that the connection is closing.
  62. errConnClosing = errors.New("grpc: the connection is closing")
  63. // errConnUnavailable indicates that the connection is unavailable.
  64. errConnUnavailable = errors.New("grpc: the connection is unavailable")
  65. // errBalancerClosed indicates that the balancer is closed.
  66. errBalancerClosed = errors.New("grpc: balancer is closed")
  67. // We use an accessor so that minConnectTimeout can be
  68. // atomically read and updated while testing.
  69. getMinConnectTimeout = func() time.Duration {
  70. return minConnectTimeout
  71. }
  72. )
  73. // The following errors are returned from Dial and DialContext
  74. var (
  75. // errNoTransportSecurity indicates that there is no transport security
  76. // being set for ClientConn. Users should either set one or explicitly
  77. // call WithInsecure DialOption to disable security.
  78. errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")
  79. // errTransportCredentialsMissing indicates that users want to transmit security
  80. // information (e.g., oauth2 token) which requires secure connection on an insecure
  81. // connection.
  82. errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
  83. // errCredentialsConflict indicates that grpc.WithTransportCredentials()
  84. // and grpc.WithInsecure() are both called for a connection.
  85. errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
  86. // errNetworkIO indicates that the connection is down due to some network I/O error.
  87. errNetworkIO = errors.New("grpc: failed with network I/O error")
  88. )
  89. const (
  90. defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
  91. defaultClientMaxSendMessageSize = math.MaxInt32
  92. // http2IOBufSize specifies the buffer size for sending frames.
  93. defaultWriteBufSize = 32 * 1024
  94. defaultReadBufSize = 32 * 1024
  95. )
  96. // RegisterChannelz turns on channelz service.
  97. // This is an EXPERIMENTAL API.
  98. func RegisterChannelz() {
  99. channelz.TurnOn()
  100. }
  101. // Dial creates a client connection to the given target.
  102. func Dial(target string, opts ...DialOption) (*ClientConn, error) {
  103. return DialContext(context.Background(), target, opts...)
  104. }
  105. // DialContext creates a client connection to the given target. By default, it's
  106. // a non-blocking dial (the function won't wait for connections to be
  107. // established, and connecting happens in the background). To make it a blocking
  108. // dial, use WithBlock() dial option.
  109. //
  110. // In the non-blocking case, the ctx does not act against the connection. It
  111. // only controls the setup steps.
  112. //
  113. // In the blocking case, ctx can be used to cancel or expire the pending
  114. // connection. Once this function returns, the cancellation and expiration of
  115. // ctx will be noop. Users should call ClientConn.Close to terminate all the
  116. // pending operations after this function returns.
  117. //
  118. // The target name syntax is defined in
  119. // https://github.com/grpc/grpc/blob/master/doc/naming.md.
  120. // e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
  121. func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
  122. cc := &ClientConn{
  123. target: target,
  124. csMgr: &connectivityStateManager{},
  125. conns: make(map[*addrConn]struct{}),
  126. dopts: defaultDialOptions(),
  127. blockingpicker: newPickerWrapper(),
  128. }
  129. cc.retryThrottler.Store((*retryThrottler)(nil))
  130. cc.ctx, cc.cancel = context.WithCancel(context.Background())
  131. for _, opt := range opts {
  132. opt.apply(&cc.dopts)
  133. }
  134. if channelz.IsOn() {
  135. if cc.dopts.channelzParentID != 0 {
  136. cc.channelzID = channelz.RegisterChannel(cc, cc.dopts.channelzParentID, target)
  137. } else {
  138. cc.channelzID = channelz.RegisterChannel(cc, 0, target)
  139. }
  140. }
  141. if !cc.dopts.insecure {
  142. if cc.dopts.copts.TransportCredentials == nil {
  143. return nil, errNoTransportSecurity
  144. }
  145. } else {
  146. if cc.dopts.copts.TransportCredentials != nil {
  147. return nil, errCredentialsConflict
  148. }
  149. for _, cd := range cc.dopts.copts.PerRPCCredentials {
  150. if cd.RequireTransportSecurity() {
  151. return nil, errTransportCredentialsMissing
  152. }
  153. }
  154. }
  155. cc.mkp = cc.dopts.copts.KeepaliveParams
  156. if cc.dopts.copts.Dialer == nil {
  157. cc.dopts.copts.Dialer = newProxyDialer(
  158. func(ctx context.Context, addr string) (net.Conn, error) {
  159. network, addr := parseDialTarget(addr)
  160. return dialContext(ctx, network, addr)
  161. },
  162. )
  163. }
  164. if cc.dopts.copts.UserAgent != "" {
  165. cc.dopts.copts.UserAgent += " " + grpcUA
  166. } else {
  167. cc.dopts.copts.UserAgent = grpcUA
  168. }
  169. if cc.dopts.timeout > 0 {
  170. var cancel context.CancelFunc
  171. ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
  172. defer cancel()
  173. }
  174. defer func() {
  175. select {
  176. case <-ctx.Done():
  177. conn, err = nil, ctx.Err()
  178. default:
  179. }
  180. if err != nil {
  181. cc.Close()
  182. }
  183. }()
  184. scSet := false
  185. if cc.dopts.scChan != nil {
  186. // Try to get an initial service config.
  187. select {
  188. case sc, ok := <-cc.dopts.scChan:
  189. if ok {
  190. cc.sc = sc
  191. scSet = true
  192. }
  193. default:
  194. }
  195. }
  196. if cc.dopts.bs == nil {
  197. cc.dopts.bs = backoff.Exponential{
  198. MaxDelay: DefaultBackoffConfig.MaxDelay,
  199. }
  200. }
  201. if cc.dopts.resolverBuilder == nil {
  202. // Only try to parse target when resolver builder is not already set.
  203. cc.parsedTarget = parseTarget(cc.target)
  204. grpclog.Infof("parsed scheme: %q", cc.parsedTarget.Scheme)
  205. cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
  206. if cc.dopts.resolverBuilder == nil {
  207. // If resolver builder is still nil, the parse target's scheme is
  208. // not registered. Fallback to default resolver and set Endpoint to
  209. // the original unparsed target.
  210. grpclog.Infof("scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)
  211. cc.parsedTarget = resolver.Target{
  212. Scheme: resolver.GetDefaultScheme(),
  213. Endpoint: target,
  214. }
  215. cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
  216. }
  217. } else {
  218. cc.parsedTarget = resolver.Target{Endpoint: target}
  219. }
  220. creds := cc.dopts.copts.TransportCredentials
  221. if creds != nil && creds.Info().ServerName != "" {
  222. cc.authority = creds.Info().ServerName
  223. } else if cc.dopts.insecure && cc.dopts.authority != "" {
  224. cc.authority = cc.dopts.authority
  225. } else {
  226. // Use endpoint from "scheme://authority/endpoint" as the default
  227. // authority for ClientConn.
  228. cc.authority = cc.parsedTarget.Endpoint
  229. }
  230. if cc.dopts.scChan != nil && !scSet {
  231. // Blocking wait for the initial service config.
  232. select {
  233. case sc, ok := <-cc.dopts.scChan:
  234. if ok {
  235. cc.sc = sc
  236. }
  237. case <-ctx.Done():
  238. return nil, ctx.Err()
  239. }
  240. }
  241. if cc.dopts.scChan != nil {
  242. go cc.scWatcher()
  243. }
  244. var credsClone credentials.TransportCredentials
  245. if creds := cc.dopts.copts.TransportCredentials; creds != nil {
  246. credsClone = creds.Clone()
  247. }
  248. cc.balancerBuildOpts = balancer.BuildOptions{
  249. DialCreds: credsClone,
  250. Dialer: cc.dopts.copts.Dialer,
  251. ChannelzParentID: cc.channelzID,
  252. }
  253. // Build the resolver.
  254. cc.resolverWrapper, err = newCCResolverWrapper(cc)
  255. if err != nil {
  256. return nil, fmt.Errorf("failed to build resolver: %v", err)
  257. }
  258. // Start the resolver wrapper goroutine after resolverWrapper is created.
  259. //
  260. // If the goroutine is started before resolverWrapper is ready, the
  261. // following may happen: The goroutine sends updates to cc. cc forwards
  262. // those to balancer. Balancer creates new addrConn. addrConn fails to
  263. // connect, and calls resolveNow(). resolveNow() tries to use the non-ready
  264. // resolverWrapper.
  265. cc.resolverWrapper.start()
  266. // A blocking dial blocks until the clientConn is ready.
  267. if cc.dopts.block {
  268. for {
  269. s := cc.GetState()
  270. if s == connectivity.Ready {
  271. break
  272. }
  273. if !cc.WaitForStateChange(ctx, s) {
  274. // ctx got timeout or canceled.
  275. return nil, ctx.Err()
  276. }
  277. }
  278. }
  279. return cc, nil
  280. }
  281. // connectivityStateManager keeps the connectivity.State of ClientConn.
  282. // This struct will eventually be exported so the balancers can access it.
  283. type connectivityStateManager struct {
  284. mu sync.Mutex
  285. state connectivity.State
  286. notifyChan chan struct{}
  287. }
  288. // updateState updates the connectivity.State of ClientConn.
  289. // If there's a change it notifies goroutines waiting on state change to
  290. // happen.
  291. func (csm *connectivityStateManager) updateState(state connectivity.State) {
  292. csm.mu.Lock()
  293. defer csm.mu.Unlock()
  294. if csm.state == connectivity.Shutdown {
  295. return
  296. }
  297. if csm.state == state {
  298. return
  299. }
  300. csm.state = state
  301. if csm.notifyChan != nil {
  302. // There are other goroutines waiting on this channel.
  303. close(csm.notifyChan)
  304. csm.notifyChan = nil
  305. }
  306. }
  307. func (csm *connectivityStateManager) getState() connectivity.State {
  308. csm.mu.Lock()
  309. defer csm.mu.Unlock()
  310. return csm.state
  311. }
  312. func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {
  313. csm.mu.Lock()
  314. defer csm.mu.Unlock()
  315. if csm.notifyChan == nil {
  316. csm.notifyChan = make(chan struct{})
  317. }
  318. return csm.notifyChan
  319. }
  320. // ClientConn represents a client connection to an RPC server.
  321. type ClientConn struct {
  322. ctx context.Context
  323. cancel context.CancelFunc
  324. target string
  325. parsedTarget resolver.Target
  326. authority string
  327. dopts dialOptions
  328. csMgr *connectivityStateManager
  329. balancerBuildOpts balancer.BuildOptions
  330. resolverWrapper *ccResolverWrapper
  331. blockingpicker *pickerWrapper
  332. mu sync.RWMutex
  333. sc ServiceConfig
  334. scRaw string
  335. conns map[*addrConn]struct{}
  336. // Keepalive parameter can be updated if a GoAway is received.
  337. mkp keepalive.ClientParameters
  338. curBalancerName string
  339. preBalancerName string // previous balancer name.
  340. curAddresses []resolver.Address
  341. balancerWrapper *ccBalancerWrapper
  342. retryThrottler atomic.Value
  343. channelzID int64 // channelz unique identification number
  344. czmu sync.RWMutex
  345. callsStarted int64
  346. callsSucceeded int64
  347. callsFailed int64
  348. lastCallStartedTime time.Time
  349. }
  350. // WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
  351. // ctx expires. A true value is returned in former case and false in latter.
  352. // This is an EXPERIMENTAL API.
  353. func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {
  354. ch := cc.csMgr.getNotifyChan()
  355. if cc.csMgr.getState() != sourceState {
  356. return true
  357. }
  358. select {
  359. case <-ctx.Done():
  360. return false
  361. case <-ch:
  362. return true
  363. }
  364. }
  365. // GetState returns the connectivity.State of ClientConn.
  366. // This is an EXPERIMENTAL API.
  367. func (cc *ClientConn) GetState() connectivity.State {
  368. return cc.csMgr.getState()
  369. }
  370. func (cc *ClientConn) scWatcher() {
  371. for {
  372. select {
  373. case sc, ok := <-cc.dopts.scChan:
  374. if !ok {
  375. return
  376. }
  377. cc.mu.Lock()
  378. // TODO: load balance policy runtime change is ignored.
  379. // We may revist this decision in the future.
  380. cc.sc = sc
  381. cc.scRaw = ""
  382. cc.mu.Unlock()
  383. case <-cc.ctx.Done():
  384. return
  385. }
  386. }
  387. }
  388. func (cc *ClientConn) handleResolvedAddrs(addrs []resolver.Address, err error) {
  389. cc.mu.Lock()
  390. defer cc.mu.Unlock()
  391. if cc.conns == nil {
  392. // cc was closed.
  393. return
  394. }
  395. if reflect.DeepEqual(cc.curAddresses, addrs) {
  396. return
  397. }
  398. cc.curAddresses = addrs
  399. if cc.dopts.balancerBuilder == nil {
  400. // Only look at balancer types and switch balancer if balancer dial
  401. // option is not set.
  402. var isGRPCLB bool
  403. for _, a := range addrs {
  404. if a.Type == resolver.GRPCLB {
  405. isGRPCLB = true
  406. break
  407. }
  408. }
  409. var newBalancerName string
  410. if isGRPCLB {
  411. newBalancerName = grpclbName
  412. } else {
  413. // Address list doesn't contain grpclb address. Try to pick a
  414. // non-grpclb balancer.
  415. newBalancerName = cc.curBalancerName
  416. // If current balancer is grpclb, switch to the previous one.
  417. if newBalancerName == grpclbName {
  418. newBalancerName = cc.preBalancerName
  419. }
  420. // The following could be true in two cases:
  421. // - the first time handling resolved addresses
  422. // (curBalancerName="")
  423. // - the first time handling non-grpclb addresses
  424. // (curBalancerName="grpclb", preBalancerName="")
  425. if newBalancerName == "" {
  426. newBalancerName = PickFirstBalancerName
  427. }
  428. }
  429. cc.switchBalancer(newBalancerName)
  430. } else if cc.balancerWrapper == nil {
  431. // Balancer dial option was set, and this is the first time handling
  432. // resolved addresses. Build a balancer with dopts.balancerBuilder.
  433. cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
  434. }
  435. cc.balancerWrapper.handleResolvedAddrs(addrs, nil)
  436. }
  437. // switchBalancer starts the switching from current balancer to the balancer
  438. // with the given name.
  439. //
  440. // It will NOT send the current address list to the new balancer. If needed,
  441. // caller of this function should send address list to the new balancer after
  442. // this function returns.
  443. //
  444. // Caller must hold cc.mu.
  445. func (cc *ClientConn) switchBalancer(name string) {
  446. if cc.conns == nil {
  447. return
  448. }
  449. if strings.ToLower(cc.curBalancerName) == strings.ToLower(name) {
  450. return
  451. }
  452. grpclog.Infof("ClientConn switching balancer to %q", name)
  453. if cc.dopts.balancerBuilder != nil {
  454. grpclog.Infoln("ignoring balancer switching: Balancer DialOption used instead")
  455. return
  456. }
  457. // TODO(bar switching) change this to two steps: drain and close.
  458. // Keep track of sc in wrapper.
  459. if cc.balancerWrapper != nil {
  460. cc.balancerWrapper.close()
  461. }
  462. builder := balancer.Get(name)
  463. if builder == nil {
  464. grpclog.Infof("failed to get balancer builder for: %v, using pick_first instead", name)
  465. builder = newPickfirstBuilder()
  466. }
  467. cc.preBalancerName = cc.curBalancerName
  468. cc.curBalancerName = builder.Name()
  469. cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
  470. }
  471. func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
  472. cc.mu.Lock()
  473. if cc.conns == nil {
  474. cc.mu.Unlock()
  475. return
  476. }
  477. // TODO(bar switching) send updates to all balancer wrappers when balancer
  478. // gracefully switching is supported.
  479. cc.balancerWrapper.handleSubConnStateChange(sc, s)
  480. cc.mu.Unlock()
  481. }
  482. // newAddrConn creates an addrConn for addrs and adds it to cc.conns.
  483. //
  484. // Caller needs to make sure len(addrs) > 0.
  485. func (cc *ClientConn) newAddrConn(addrs []resolver.Address) (*addrConn, error) {
  486. ac := &addrConn{
  487. cc: cc,
  488. addrs: addrs,
  489. dopts: cc.dopts,
  490. }
  491. ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
  492. // Track ac in cc. This needs to be done before any getTransport(...) is called.
  493. cc.mu.Lock()
  494. if cc.conns == nil {
  495. cc.mu.Unlock()
  496. return nil, ErrClientConnClosing
  497. }
  498. if channelz.IsOn() {
  499. ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "")
  500. }
  501. cc.conns[ac] = struct{}{}
  502. cc.mu.Unlock()
  503. return ac, nil
  504. }
  505. // removeAddrConn removes the addrConn in the subConn from clientConn.
  506. // It also tears down the ac with the given error.
  507. func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) {
  508. cc.mu.Lock()
  509. if cc.conns == nil {
  510. cc.mu.Unlock()
  511. return
  512. }
  513. delete(cc.conns, ac)
  514. cc.mu.Unlock()
  515. ac.tearDown(err)
  516. }
  517. // ChannelzMetric returns ChannelInternalMetric of current ClientConn.
  518. // This is an EXPERIMENTAL API.
  519. func (cc *ClientConn) ChannelzMetric() *channelz.ChannelInternalMetric {
  520. state := cc.GetState()
  521. cc.czmu.RLock()
  522. defer cc.czmu.RUnlock()
  523. return &channelz.ChannelInternalMetric{
  524. State: state,
  525. Target: cc.target,
  526. CallsStarted: cc.callsStarted,
  527. CallsSucceeded: cc.callsSucceeded,
  528. CallsFailed: cc.callsFailed,
  529. LastCallStartedTimestamp: cc.lastCallStartedTime,
  530. }
  531. }
  532. // Target returns the target string of the ClientConn.
  533. // This is an EXPERIMENTAL API.
  534. func (cc *ClientConn) Target() string {
  535. return cc.target
  536. }
  537. func (cc *ClientConn) incrCallsStarted() {
  538. cc.czmu.Lock()
  539. cc.callsStarted++
  540. // TODO(yuxuanli): will make this a time.Time pointer improve performance?
  541. cc.lastCallStartedTime = time.Now()
  542. cc.czmu.Unlock()
  543. }
  544. func (cc *ClientConn) incrCallsSucceeded() {
  545. cc.czmu.Lock()
  546. cc.callsSucceeded++
  547. cc.czmu.Unlock()
  548. }
  549. func (cc *ClientConn) incrCallsFailed() {
  550. cc.czmu.Lock()
  551. cc.callsFailed++
  552. cc.czmu.Unlock()
  553. }
  554. // connect starts to creating transport and also starts the transport monitor
  555. // goroutine for this ac.
  556. // It does nothing if the ac is not IDLE.
  557. // TODO(bar) Move this to the addrConn section.
  558. // This was part of resetAddrConn, keep it here to make the diff look clean.
  559. func (ac *addrConn) connect() error {
  560. ac.mu.Lock()
  561. if ac.state == connectivity.Shutdown {
  562. ac.mu.Unlock()
  563. return errConnClosing
  564. }
  565. if ac.state != connectivity.Idle {
  566. ac.mu.Unlock()
  567. return nil
  568. }
  569. ac.state = connectivity.Connecting
  570. ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
  571. ac.mu.Unlock()
  572. // Start a goroutine connecting to the server asynchronously.
  573. go func() {
  574. if err := ac.resetTransport(); err != nil {
  575. grpclog.Warningf("Failed to dial %s: %v; please retry.", ac.addrs[0].Addr, err)
  576. if err != errConnClosing {
  577. // Keep this ac in cc.conns, to get the reason it's torn down.
  578. ac.tearDown(err)
  579. }
  580. return
  581. }
  582. ac.transportMonitor()
  583. }()
  584. return nil
  585. }
  586. // tryUpdateAddrs tries to update ac.addrs with the new addresses list.
  587. //
  588. // It checks whether current connected address of ac is in the new addrs list.
  589. // - If true, it updates ac.addrs and returns true. The ac will keep using
  590. // the existing connection.
  591. // - If false, it does nothing and returns false.
  592. func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
  593. ac.mu.Lock()
  594. defer ac.mu.Unlock()
  595. grpclog.Infof("addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)
  596. if ac.state == connectivity.Shutdown {
  597. ac.addrs = addrs
  598. return true
  599. }
  600. var curAddrFound bool
  601. for _, a := range addrs {
  602. if reflect.DeepEqual(ac.curAddr, a) {
  603. curAddrFound = true
  604. break
  605. }
  606. }
  607. grpclog.Infof("addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
  608. if curAddrFound {
  609. ac.addrs = addrs
  610. ac.reconnectIdx = 0 // Start reconnecting from beginning in the new list.
  611. }
  612. return curAddrFound
  613. }
  614. // GetMethodConfig gets the method config of the input method.
  615. // If there's an exact match for input method (i.e. /service/method), we return
  616. // the corresponding MethodConfig.
  617. // If there isn't an exact match for the input method, we look for the default config
  618. // under the service (i.e /service/). If there is a default MethodConfig for
  619. // the service, we return it.
  620. // Otherwise, we return an empty MethodConfig.
  621. func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
  622. // TODO: Avoid the locking here.
  623. cc.mu.RLock()
  624. defer cc.mu.RUnlock()
  625. m, ok := cc.sc.Methods[method]
  626. if !ok {
  627. i := strings.LastIndex(method, "/")
  628. m = cc.sc.Methods[method[:i+1]]
  629. }
  630. return m
  631. }
  632. func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
  633. t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickOptions{
  634. FullMethodName: method,
  635. })
  636. if err != nil {
  637. return nil, nil, toRPCErr(err)
  638. }
  639. return t, done, nil
  640. }
  641. // handleServiceConfig parses the service config string in JSON format to Go native
  642. // struct ServiceConfig, and store both the struct and the JSON string in ClientConn.
  643. func (cc *ClientConn) handleServiceConfig(js string) error {
  644. if cc.dopts.disableServiceConfig {
  645. return nil
  646. }
  647. sc, err := parseServiceConfig(js)
  648. if err != nil {
  649. return err
  650. }
  651. cc.mu.Lock()
  652. cc.scRaw = js
  653. cc.sc = sc
  654. if sc.retryThrottling != nil {
  655. newThrottler := &retryThrottler{
  656. tokens: sc.retryThrottling.MaxTokens,
  657. max: sc.retryThrottling.MaxTokens,
  658. thresh: sc.retryThrottling.MaxTokens / 2,
  659. ratio: sc.retryThrottling.TokenRatio,
  660. }
  661. cc.retryThrottler.Store(newThrottler)
  662. } else {
  663. cc.retryThrottler.Store((*retryThrottler)(nil))
  664. }
  665. if sc.LB != nil && *sc.LB != grpclbName { // "grpclb" is not a valid balancer option in service config.
  666. if cc.curBalancerName == grpclbName {
  667. // If current balancer is grpclb, there's at least one grpclb
  668. // balancer address in the resolved list. Don't switch the balancer,
  669. // but change the previous balancer name, so if a new resolved
  670. // address list doesn't contain grpclb address, balancer will be
  671. // switched to *sc.LB.
  672. cc.preBalancerName = *sc.LB
  673. } else {
  674. cc.switchBalancer(*sc.LB)
  675. cc.balancerWrapper.handleResolvedAddrs(cc.curAddresses, nil)
  676. }
  677. }
  678. cc.mu.Unlock()
  679. return nil
  680. }
  681. func (cc *ClientConn) resolveNow(o resolver.ResolveNowOption) {
  682. cc.mu.RLock()
  683. r := cc.resolverWrapper
  684. cc.mu.RUnlock()
  685. if r == nil {
  686. return
  687. }
  688. go r.resolveNow(o)
  689. }
  690. // Close tears down the ClientConn and all underlying connections.
  691. func (cc *ClientConn) Close() error {
  692. defer cc.cancel()
  693. cc.mu.Lock()
  694. if cc.conns == nil {
  695. cc.mu.Unlock()
  696. return ErrClientConnClosing
  697. }
  698. conns := cc.conns
  699. cc.conns = nil
  700. cc.csMgr.updateState(connectivity.Shutdown)
  701. rWrapper := cc.resolverWrapper
  702. cc.resolverWrapper = nil
  703. bWrapper := cc.balancerWrapper
  704. cc.balancerWrapper = nil
  705. cc.mu.Unlock()
  706. cc.blockingpicker.close()
  707. if rWrapper != nil {
  708. rWrapper.close()
  709. }
  710. if bWrapper != nil {
  711. bWrapper.close()
  712. }
  713. for ac := range conns {
  714. ac.tearDown(ErrClientConnClosing)
  715. }
  716. if channelz.IsOn() {
  717. channelz.RemoveEntry(cc.channelzID)
  718. }
  719. return nil
  720. }
  721. // addrConn is a network connection to a given address.
  722. type addrConn struct {
  723. ctx context.Context
  724. cancel context.CancelFunc
  725. cc *ClientConn
  726. addrs []resolver.Address
  727. dopts dialOptions
  728. events trace.EventLog
  729. acbw balancer.SubConn
  730. mu sync.Mutex
  731. curAddr resolver.Address
  732. reconnectIdx int // The index in addrs list to start reconnecting from.
  733. state connectivity.State
  734. // ready is closed and becomes nil when a new transport is up or failed
  735. // due to timeout.
  736. ready chan struct{}
  737. transport transport.ClientTransport
  738. // The reason this addrConn is torn down.
  739. tearDownErr error
  740. connectRetryNum int
  741. // backoffDeadline is the time until which resetTransport needs to
  742. // wait before increasing connectRetryNum count.
  743. backoffDeadline time.Time
  744. // connectDeadline is the time by which all connection
  745. // negotiations must complete.
  746. connectDeadline time.Time
  747. channelzID int64 // channelz unique identification number
  748. czmu sync.RWMutex
  749. callsStarted int64
  750. callsSucceeded int64
  751. callsFailed int64
  752. lastCallStartedTime time.Time
  753. }
  754. // adjustParams updates parameters used to create transports upon
  755. // receiving a GoAway.
  756. func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
  757. switch r {
  758. case transport.GoAwayTooManyPings:
  759. v := 2 * ac.dopts.copts.KeepaliveParams.Time
  760. ac.cc.mu.Lock()
  761. if v > ac.cc.mkp.Time {
  762. ac.cc.mkp.Time = v
  763. }
  764. ac.cc.mu.Unlock()
  765. }
  766. }
  767. // printf records an event in ac's event log, unless ac has been closed.
  768. // REQUIRES ac.mu is held.
  769. func (ac *addrConn) printf(format string, a ...interface{}) {
  770. if ac.events != nil {
  771. ac.events.Printf(format, a...)
  772. }
  773. }
  774. // errorf records an error in ac's event log, unless ac has been closed.
  775. // REQUIRES ac.mu is held.
  776. func (ac *addrConn) errorf(format string, a ...interface{}) {
  777. if ac.events != nil {
  778. ac.events.Errorf(format, a...)
  779. }
  780. }
  781. // resetTransport recreates a transport to the address for ac. The old
  782. // transport will close itself on error or when the clientconn is closed.
  783. // The created transport must receive initial settings frame from the server.
  784. // In case that doesn't happen, transportMonitor will kill the newly created
  785. // transport after connectDeadline has expired.
  786. // In case there was an error on the transport before the settings frame was
  787. // received, resetTransport resumes connecting to backends after the one that
  788. // was previously connected to. In case end of the list is reached, resetTransport
  789. // backs off until the original deadline.
  790. // If the DialOption WithWaitForHandshake was set, resetTrasport returns
  791. // successfully only after server settings are received.
  792. //
  793. // TODO(bar) make sure all state transitions are valid.
  794. func (ac *addrConn) resetTransport() error {
  795. ac.mu.Lock()
  796. if ac.state == connectivity.Shutdown {
  797. ac.mu.Unlock()
  798. return errConnClosing
  799. }
  800. if ac.ready != nil {
  801. close(ac.ready)
  802. ac.ready = nil
  803. }
  804. ac.transport = nil
  805. ridx := ac.reconnectIdx
  806. ac.mu.Unlock()
  807. ac.cc.mu.RLock()
  808. ac.dopts.copts.KeepaliveParams = ac.cc.mkp
  809. ac.cc.mu.RUnlock()
  810. var backoffDeadline, connectDeadline time.Time
  811. for connectRetryNum := 0; ; connectRetryNum++ {
  812. ac.mu.Lock()
  813. if ac.backoffDeadline.IsZero() {
  814. // This means either a successful HTTP2 connection was established
  815. // or this is the first time this addrConn is trying to establish a
  816. // connection.
  817. backoffFor := ac.dopts.bs.Backoff(connectRetryNum) // time.Duration.
  818. // This will be the duration that dial gets to finish.
  819. dialDuration := getMinConnectTimeout()
  820. if backoffFor > dialDuration {
  821. // Give dial more time as we keep failing to connect.
  822. dialDuration = backoffFor
  823. }
  824. start := time.Now()
  825. backoffDeadline = start.Add(backoffFor)
  826. connectDeadline = start.Add(dialDuration)
  827. ridx = 0 // Start connecting from the beginning.
  828. } else {
  829. // Continue trying to connect with the same deadlines.
  830. connectRetryNum = ac.connectRetryNum
  831. backoffDeadline = ac.backoffDeadline
  832. connectDeadline = ac.connectDeadline
  833. ac.backoffDeadline = time.Time{}
  834. ac.connectDeadline = time.Time{}
  835. ac.connectRetryNum = 0
  836. }
  837. if ac.state == connectivity.Shutdown {
  838. ac.mu.Unlock()
  839. return errConnClosing
  840. }
  841. ac.printf("connecting")
  842. if ac.state != connectivity.Connecting {
  843. ac.state = connectivity.Connecting
  844. ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
  845. }
  846. // copy ac.addrs in case of race
  847. addrsIter := make([]resolver.Address, len(ac.addrs))
  848. copy(addrsIter, ac.addrs)
  849. copts := ac.dopts.copts
  850. ac.mu.Unlock()
  851. connected, err := ac.createTransport(connectRetryNum, ridx, backoffDeadline, connectDeadline, addrsIter, copts)
  852. if err != nil {
  853. return err
  854. }
  855. if connected {
  856. return nil
  857. }
  858. }
  859. }
  860. // createTransport creates a connection to one of the backends in addrs.
  861. // It returns true if a connection was established.
  862. func (ac *addrConn) createTransport(connectRetryNum, ridx int, backoffDeadline, connectDeadline time.Time, addrs []resolver.Address, copts transport.ConnectOptions) (bool, error) {
  863. for i := ridx; i < len(addrs); i++ {
  864. addr := addrs[i]
  865. target := transport.TargetInfo{
  866. Addr: addr.Addr,
  867. Metadata: addr.Metadata,
  868. Authority: ac.cc.authority,
  869. }
  870. done := make(chan struct{})
  871. onPrefaceReceipt := func() {
  872. ac.mu.Lock()
  873. close(done)
  874. if !ac.backoffDeadline.IsZero() {
  875. // If we haven't already started reconnecting to
  876. // other backends.
  877. // Note, this can happen when writer notices an error
  878. // and triggers resetTransport while at the same time
  879. // reader receives the preface and invokes this closure.
  880. ac.backoffDeadline = time.Time{}
  881. ac.connectDeadline = time.Time{}
  882. ac.connectRetryNum = 0
  883. }
  884. ac.mu.Unlock()
  885. }
  886. // Do not cancel in the success path because of
  887. // this issue in Go1.6: https://github.com/golang/go/issues/15078.
  888. connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
  889. if channelz.IsOn() {
  890. copts.ChannelzParentID = ac.channelzID
  891. }
  892. newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt)
  893. if err != nil {
  894. cancel()
  895. ac.cc.blockingpicker.updateConnectionError(err)
  896. ac.mu.Lock()
  897. if ac.state == connectivity.Shutdown {
  898. // ac.tearDown(...) has been invoked.
  899. ac.mu.Unlock()
  900. return false, errConnClosing
  901. }
  902. ac.mu.Unlock()
  903. grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err)
  904. continue
  905. }
  906. if ac.dopts.waitForHandshake {
  907. select {
  908. case <-done:
  909. case <-connectCtx.Done():
  910. // Didn't receive server preface, must kill this new transport now.
  911. grpclog.Warningf("grpc: addrConn.createTransport failed to receive server preface before deadline.")
  912. newTr.Close()
  913. continue
  914. case <-ac.ctx.Done():
  915. }
  916. }
  917. ac.mu.Lock()
  918. if ac.state == connectivity.Shutdown {
  919. ac.mu.Unlock()
  920. // ac.tearDonn(...) has been invoked.
  921. newTr.Close()
  922. return false, errConnClosing
  923. }
  924. ac.printf("ready")
  925. ac.state = connectivity.Ready
  926. ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
  927. ac.transport = newTr
  928. ac.curAddr = addr
  929. if ac.ready != nil {
  930. close(ac.ready)
  931. ac.ready = nil
  932. }
  933. select {
  934. case <-done:
  935. // If the server has responded back with preface already,
  936. // don't set the reconnect parameters.
  937. default:
  938. ac.connectRetryNum = connectRetryNum
  939. ac.backoffDeadline = backoffDeadline
  940. ac.connectDeadline = connectDeadline
  941. ac.reconnectIdx = i + 1 // Start reconnecting from the next backend in the list.
  942. }
  943. ac.mu.Unlock()
  944. return true, nil
  945. }
  946. ac.mu.Lock()
  947. if ac.state == connectivity.Shutdown {
  948. ac.mu.Unlock()
  949. return false, errConnClosing
  950. }
  951. ac.state = connectivity.TransientFailure
  952. ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
  953. ac.cc.resolveNow(resolver.ResolveNowOption{})
  954. if ac.ready != nil {
  955. close(ac.ready)
  956. ac.ready = nil
  957. }
  958. ac.mu.Unlock()
  959. timer := time.NewTimer(backoffDeadline.Sub(time.Now()))
  960. select {
  961. case <-timer.C:
  962. case <-ac.ctx.Done():
  963. timer.Stop()
  964. return false, ac.ctx.Err()
  965. }
  966. return false, nil
  967. }
  968. // Run in a goroutine to track the error in transport and create the
  969. // new transport if an error happens. It returns when the channel is closing.
  970. func (ac *addrConn) transportMonitor() {
  971. for {
  972. var timer *time.Timer
  973. var cdeadline <-chan time.Time
  974. ac.mu.Lock()
  975. t := ac.transport
  976. if !ac.connectDeadline.IsZero() {
  977. timer = time.NewTimer(ac.connectDeadline.Sub(time.Now()))
  978. cdeadline = timer.C
  979. }
  980. ac.mu.Unlock()
  981. // Block until we receive a goaway or an error occurs.
  982. select {
  983. case <-t.GoAway():
  984. done := t.Error()
  985. cleanup := t.Close
  986. // Since this transport will be orphaned (won't have a transportMonitor)
  987. // we need to launch a goroutine to keep track of clientConn.Close()
  988. // happening since it might not be noticed by any other goroutine for a while.
  989. go func() {
  990. <-done
  991. cleanup()
  992. }()
  993. case <-t.Error():
  994. // In case this is triggered because clientConn.Close()
  995. // was called, we want to immeditately close the transport
  996. // since no other goroutine might notice it for a while.
  997. t.Close()
  998. case <-cdeadline:
  999. ac.mu.Lock()
  1000. // This implies that client received server preface.
  1001. if ac.backoffDeadline.IsZero() {
  1002. ac.mu.Unlock()
  1003. continue
  1004. }
  1005. ac.mu.Unlock()
  1006. timer = nil
  1007. // No server preface received until deadline.
  1008. // Kill the connection.
  1009. grpclog.Warningf("grpc: addrConn.transportMonitor didn't get server preface after waiting. Closing the new transport now.")
  1010. t.Close()
  1011. }
  1012. if timer != nil {
  1013. timer.Stop()
  1014. }
  1015. // If a GoAway happened, regardless of error, adjust our keepalive
  1016. // parameters as appropriate.
  1017. select {
  1018. case <-t.GoAway():
  1019. ac.adjustParams(t.GetGoAwayReason())
  1020. default:
  1021. }
  1022. ac.mu.Lock()
  1023. if ac.state == connectivity.Shutdown {
  1024. ac.mu.Unlock()
  1025. return
  1026. }
  1027. // Set connectivity state to TransientFailure before calling
  1028. // resetTransport. Transition READY->CONNECTING is not valid.
  1029. ac.state = connectivity.TransientFailure
  1030. ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
  1031. ac.cc.resolveNow(resolver.ResolveNowOption{})
  1032. ac.curAddr = resolver.Address{}
  1033. ac.mu.Unlock()
  1034. if err := ac.resetTransport(); err != nil {
  1035. ac.mu.Lock()
  1036. ac.printf("transport exiting: %v", err)
  1037. ac.mu.Unlock()
  1038. grpclog.Warningf("grpc: addrConn.transportMonitor exits due to: %v", err)
  1039. if err != errConnClosing {
  1040. // Keep this ac in cc.conns, to get the reason it's torn down.
  1041. ac.tearDown(err)
  1042. }
  1043. return
  1044. }
  1045. }
  1046. }
  1047. // getReadyTransport returns the transport if ac's state is READY.
  1048. // Otherwise it returns nil, false.
  1049. // If ac's state is IDLE, it will trigger ac to connect.
  1050. func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
  1051. ac.mu.Lock()
  1052. if ac.state == connectivity.Ready {
  1053. t := ac.transport
  1054. ac.mu.Unlock()
  1055. return t, true
  1056. }
  1057. var idle bool
  1058. if ac.state == connectivity.Idle {
  1059. idle = true
  1060. }
  1061. ac.mu.Unlock()
  1062. // Trigger idle ac to connect.
  1063. if idle {
  1064. ac.connect()
  1065. }
  1066. return nil, false
  1067. }
  1068. // tearDown starts to tear down the addrConn.
  1069. // TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
  1070. // some edge cases (e.g., the caller opens and closes many addrConn's in a
  1071. // tight loop.
  1072. // tearDown doesn't remove ac from ac.cc.conns.
  1073. func (ac *addrConn) tearDown(err error) {
  1074. ac.cancel()
  1075. ac.mu.Lock()
  1076. defer ac.mu.Unlock()
  1077. if ac.state == connectivity.Shutdown {
  1078. return
  1079. }
  1080. ac.curAddr = resolver.Address{}
  1081. if err == errConnDrain && ac.transport != nil {
  1082. // GracefulClose(...) may be executed multiple times when
  1083. // i) receiving multiple GoAway frames from the server; or
  1084. // ii) there are concurrent name resolver/Balancer triggered
  1085. // address removal and GoAway.
  1086. ac.transport.GracefulClose()
  1087. }
  1088. ac.state = connectivity.Shutdown
  1089. ac.tearDownErr = err
  1090. ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
  1091. if ac.events != nil {
  1092. ac.events.Finish()
  1093. ac.events = nil
  1094. }
  1095. if ac.ready != nil {
  1096. close(ac.ready)
  1097. ac.ready = nil
  1098. }
  1099. if channelz.IsOn() {
  1100. channelz.RemoveEntry(ac.channelzID)
  1101. }
  1102. }
  1103. func (ac *addrConn) getState() connectivity.State {
  1104. ac.mu.Lock()
  1105. defer ac.mu.Unlock()
  1106. return ac.state
  1107. }
  1108. func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric {
  1109. ac.mu.Lock()
  1110. addr := ac.curAddr.Addr
  1111. ac.mu.Unlock()
  1112. state := ac.getState()
  1113. ac.czmu.RLock()
  1114. defer ac.czmu.RUnlock()
  1115. return &channelz.ChannelInternalMetric{
  1116. State: state,
  1117. Target: addr,
  1118. CallsStarted: ac.callsStarted,
  1119. CallsSucceeded: ac.callsSucceeded,
  1120. CallsFailed: ac.callsFailed,
  1121. LastCallStartedTimestamp: ac.lastCallStartedTime,
  1122. }
  1123. }
  1124. func (ac *addrConn) incrCallsStarted() {
  1125. ac.czmu.Lock()
  1126. ac.callsStarted++
  1127. ac.lastCallStartedTime = time.Now()
  1128. ac.czmu.Unlock()
  1129. }
  1130. func (ac *addrConn) incrCallsSucceeded() {
  1131. ac.czmu.Lock()
  1132. ac.callsSucceeded++
  1133. ac.czmu.Unlock()
  1134. }
  1135. func (ac *addrConn) incrCallsFailed() {
  1136. ac.czmu.Lock()
  1137. ac.callsFailed++
  1138. ac.czmu.Unlock()
  1139. }
  1140. type retryThrottler struct {
  1141. max float64
  1142. thresh float64
  1143. ratio float64
  1144. mu sync.Mutex
  1145. tokens float64 // TODO(dfawley): replace with atomic and remove lock.
  1146. }
  1147. // throttle subtracts a retry token from the pool and returns whether a retry
  1148. // should be throttled (disallowed) based upon the retry throttling policy in
  1149. // the service config.
  1150. func (rt *retryThrottler) throttle() bool {
  1151. if rt == nil {
  1152. return false
  1153. }
  1154. rt.mu.Lock()
  1155. defer rt.mu.Unlock()
  1156. rt.tokens--
  1157. if rt.tokens < 0 {
  1158. rt.tokens = 0
  1159. }
  1160. return rt.tokens <= rt.thresh
  1161. }
  1162. func (rt *retryThrottler) successfulRPC() {
  1163. if rt == nil {
  1164. return
  1165. }
  1166. rt.mu.Lock()
  1167. defer rt.mu.Unlock()
  1168. rt.tokens += rt.ratio
  1169. if rt.tokens > rt.max {
  1170. rt.tokens = rt.max
  1171. }
  1172. }
  1173. // ErrClientConnTimeout indicates that the ClientConn cannot establish the
  1174. // underlying connections within the specified timeout.
  1175. //
  1176. // Deprecated: This error is never returned by grpc and should not be
  1177. // referenced by users.
  1178. var ErrClientConnTimeout = errors.New("grpc: timed out when dialing")