grpclb.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765
  1. /*
  2. *
  3. * Copyright 2016, Google Inc.
  4. * All rights reserved.
  5. *
  6. * Redistribution and use in source and binary forms, with or without
  7. * modification, are permitted provided that the following conditions are
  8. * met:
  9. *
  10. * * Redistributions of source code must retain the above copyright
  11. * notice, this list of conditions and the following disclaimer.
  12. * * Redistributions in binary form must reproduce the above
  13. * copyright notice, this list of conditions and the following disclaimer
  14. * in the documentation and/or other materials provided with the
  15. * distribution.
  16. * * Neither the name of Google Inc. nor the names of its
  17. * contributors may be used to endorse or promote products derived from
  18. * this software without specific prior written permission.
  19. *
  20. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  21. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  22. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  23. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  24. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  25. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  26. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  27. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  28. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  29. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  30. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  31. *
  32. */
  33. package grpc
  34. import (
  35. "errors"
  36. "fmt"
  37. "math/rand"
  38. "net"
  39. "sync"
  40. "time"
  41. "golang.org/x/net/context"
  42. "google.golang.org/grpc/codes"
  43. lbpb "google.golang.org/grpc/grpclb/grpc_lb_v1"
  44. "google.golang.org/grpc/grpclog"
  45. "google.golang.org/grpc/metadata"
  46. "google.golang.org/grpc/naming"
  47. )
  48. // Client API for LoadBalancer service.
  49. // Mostly copied from generated pb.go file.
  50. // To avoid circular dependency.
  51. type loadBalancerClient struct {
  52. cc *ClientConn
  53. }
  54. func (c *loadBalancerClient) BalanceLoad(ctx context.Context, opts ...CallOption) (*balanceLoadClientStream, error) {
  55. desc := &StreamDesc{
  56. StreamName: "BalanceLoad",
  57. ServerStreams: true,
  58. ClientStreams: true,
  59. }
  60. stream, err := NewClientStream(ctx, desc, c.cc, "/grpc.lb.v1.LoadBalancer/BalanceLoad", opts...)
  61. if err != nil {
  62. return nil, err
  63. }
  64. x := &balanceLoadClientStream{stream}
  65. return x, nil
  66. }
  67. type balanceLoadClientStream struct {
  68. ClientStream
  69. }
  70. func (x *balanceLoadClientStream) Send(m *lbpb.LoadBalanceRequest) error {
  71. return x.ClientStream.SendMsg(m)
  72. }
  73. func (x *balanceLoadClientStream) Recv() (*lbpb.LoadBalanceResponse, error) {
  74. m := new(lbpb.LoadBalanceResponse)
  75. if err := x.ClientStream.RecvMsg(m); err != nil {
  76. return nil, err
  77. }
  78. return m, nil
  79. }
  80. // AddressType indicates the address type returned by name resolution.
  81. type AddressType uint8
  82. const (
  83. // Backend indicates the server is a backend server.
  84. Backend AddressType = iota
  85. // GRPCLB indicates the server is a grpclb load balancer.
  86. GRPCLB
  87. )
  88. // AddrMetadataGRPCLB contains the information the name resolver for grpclb should provide. The
  89. // name resolver used by the grpclb balancer is required to provide this type of metadata in
  90. // its address updates.
  91. type AddrMetadataGRPCLB struct {
  92. // AddrType is the type of server (grpc load balancer or backend).
  93. AddrType AddressType
  94. // ServerName is the name of the grpc load balancer. Used for authentication.
  95. ServerName string
  96. }
  97. // NewGRPCLBBalancer creates a grpclb load balancer.
  98. func NewGRPCLBBalancer(r naming.Resolver) Balancer {
  99. return &balancer{
  100. r: r,
  101. }
  102. }
  103. type remoteBalancerInfo struct {
  104. addr string
  105. // the server name used for authentication with the remote LB server.
  106. name string
  107. }
  108. // grpclbAddrInfo consists of the information of a backend server.
  109. type grpclbAddrInfo struct {
  110. addr Address
  111. connected bool
  112. // dropForRateLimiting indicates whether this particular request should be
  113. // dropped by the client for rate limiting.
  114. dropForRateLimiting bool
  115. // dropForLoadBalancing indicates whether this particular request should be
  116. // dropped by the client for load balancing.
  117. dropForLoadBalancing bool
  118. }
  119. type balancer struct {
  120. r naming.Resolver
  121. target string
  122. mu sync.Mutex
  123. seq int // a sequence number to make sure addrCh does not get stale addresses.
  124. w naming.Watcher
  125. addrCh chan []Address
  126. rbs []remoteBalancerInfo
  127. addrs []*grpclbAddrInfo
  128. next int
  129. waitCh chan struct{}
  130. done bool
  131. expTimer *time.Timer
  132. rand *rand.Rand
  133. clientStats lbpb.ClientStats
  134. }
  135. func (b *balancer) watchAddrUpdates(w naming.Watcher, ch chan []remoteBalancerInfo) error {
  136. updates, err := w.Next()
  137. if err != nil {
  138. grpclog.Printf("grpclb: failed to get next addr update from watcher: %v", err)
  139. return err
  140. }
  141. b.mu.Lock()
  142. defer b.mu.Unlock()
  143. if b.done {
  144. return ErrClientConnClosing
  145. }
  146. for _, update := range updates {
  147. switch update.Op {
  148. case naming.Add:
  149. var exist bool
  150. for _, v := range b.rbs {
  151. // TODO: Is the same addr with different server name a different balancer?
  152. if update.Addr == v.addr {
  153. exist = true
  154. break
  155. }
  156. }
  157. if exist {
  158. continue
  159. }
  160. md, ok := update.Metadata.(*AddrMetadataGRPCLB)
  161. if !ok {
  162. // TODO: Revisit the handling here and may introduce some fallback mechanism.
  163. grpclog.Printf("The name resolution contains unexpected metadata %v", update.Metadata)
  164. continue
  165. }
  166. switch md.AddrType {
  167. case Backend:
  168. // TODO: Revisit the handling here and may introduce some fallback mechanism.
  169. grpclog.Printf("The name resolution does not give grpclb addresses")
  170. continue
  171. case GRPCLB:
  172. b.rbs = append(b.rbs, remoteBalancerInfo{
  173. addr: update.Addr,
  174. name: md.ServerName,
  175. })
  176. default:
  177. grpclog.Printf("Received unknow address type %d", md.AddrType)
  178. continue
  179. }
  180. case naming.Delete:
  181. for i, v := range b.rbs {
  182. if update.Addr == v.addr {
  183. copy(b.rbs[i:], b.rbs[i+1:])
  184. b.rbs = b.rbs[:len(b.rbs)-1]
  185. break
  186. }
  187. }
  188. default:
  189. grpclog.Println("Unknown update.Op ", update.Op)
  190. }
  191. }
  192. // TODO: Fall back to the basic round-robin load balancing if the resulting address is
  193. // not a load balancer.
  194. select {
  195. case <-ch:
  196. default:
  197. }
  198. ch <- b.rbs
  199. return nil
  200. }
  201. func (b *balancer) serverListExpire(seq int) {
  202. b.mu.Lock()
  203. defer b.mu.Unlock()
  204. // TODO: gRPC interanls do not clear the connections when the server list is stale.
  205. // This means RPCs will keep using the existing server list until b receives new
  206. // server list even though the list is expired. Revisit this behavior later.
  207. if b.done || seq < b.seq {
  208. return
  209. }
  210. b.next = 0
  211. b.addrs = nil
  212. // Ask grpc internals to close all the corresponding connections.
  213. b.addrCh <- nil
  214. }
  215. func convertDuration(d *lbpb.Duration) time.Duration {
  216. if d == nil {
  217. return 0
  218. }
  219. return time.Duration(d.Seconds)*time.Second + time.Duration(d.Nanos)*time.Nanosecond
  220. }
  221. func (b *balancer) processServerList(l *lbpb.ServerList, seq int) {
  222. if l == nil {
  223. return
  224. }
  225. servers := l.GetServers()
  226. expiration := convertDuration(l.GetExpirationInterval())
  227. var (
  228. sl []*grpclbAddrInfo
  229. addrs []Address
  230. )
  231. for _, s := range servers {
  232. md := metadata.Pairs("lb-token", s.LoadBalanceToken)
  233. addr := Address{
  234. Addr: fmt.Sprintf("%s:%d", net.IP(s.IpAddress), s.Port),
  235. Metadata: &md,
  236. }
  237. sl = append(sl, &grpclbAddrInfo{
  238. addr: addr,
  239. dropForRateLimiting: s.DropForRateLimiting,
  240. dropForLoadBalancing: s.DropForLoadBalancing,
  241. })
  242. addrs = append(addrs, addr)
  243. }
  244. b.mu.Lock()
  245. defer b.mu.Unlock()
  246. if b.done || seq < b.seq {
  247. return
  248. }
  249. if len(sl) > 0 {
  250. // reset b.next to 0 when replacing the server list.
  251. b.next = 0
  252. b.addrs = sl
  253. b.addrCh <- addrs
  254. if b.expTimer != nil {
  255. b.expTimer.Stop()
  256. b.expTimer = nil
  257. }
  258. if expiration > 0 {
  259. b.expTimer = time.AfterFunc(expiration, func() {
  260. b.serverListExpire(seq)
  261. })
  262. }
  263. }
  264. return
  265. }
  266. func (b *balancer) sendLoadReport(s *balanceLoadClientStream, interval time.Duration, done <-chan struct{}) {
  267. ticker := time.NewTicker(interval)
  268. defer ticker.Stop()
  269. for {
  270. select {
  271. case <-ticker.C:
  272. case <-done:
  273. return
  274. }
  275. b.mu.Lock()
  276. stats := b.clientStats
  277. b.clientStats = lbpb.ClientStats{} // Clear the stats.
  278. b.mu.Unlock()
  279. t := time.Now()
  280. stats.Timestamp = &lbpb.Timestamp{
  281. Seconds: t.Unix(),
  282. Nanos: int32(t.Nanosecond()),
  283. }
  284. if err := s.Send(&lbpb.LoadBalanceRequest{
  285. LoadBalanceRequestType: &lbpb.LoadBalanceRequest_ClientStats{
  286. ClientStats: &stats,
  287. },
  288. }); err != nil {
  289. grpclog.Printf("grpclb: failed to send load report: %v", err)
  290. return
  291. }
  292. }
  293. }
  294. func (b *balancer) callRemoteBalancer(lbc *loadBalancerClient, seq int) (retry bool) {
  295. ctx, cancel := context.WithCancel(context.Background())
  296. defer cancel()
  297. stream, err := lbc.BalanceLoad(ctx)
  298. if err != nil {
  299. grpclog.Printf("grpclb: failed to perform RPC to the remote balancer %v", err)
  300. return
  301. }
  302. b.mu.Lock()
  303. if b.done {
  304. b.mu.Unlock()
  305. return
  306. }
  307. b.mu.Unlock()
  308. initReq := &lbpb.LoadBalanceRequest{
  309. LoadBalanceRequestType: &lbpb.LoadBalanceRequest_InitialRequest{
  310. InitialRequest: &lbpb.InitialLoadBalanceRequest{
  311. Name: b.target,
  312. },
  313. },
  314. }
  315. if err := stream.Send(initReq); err != nil {
  316. grpclog.Printf("grpclb: failed to send init request: %v", err)
  317. // TODO: backoff on retry?
  318. return true
  319. }
  320. reply, err := stream.Recv()
  321. if err != nil {
  322. grpclog.Printf("grpclb: failed to recv init response: %v", err)
  323. // TODO: backoff on retry?
  324. return true
  325. }
  326. initResp := reply.GetInitialResponse()
  327. if initResp == nil {
  328. grpclog.Println("grpclb: reply from remote balancer did not include initial response.")
  329. return
  330. }
  331. // TODO: Support delegation.
  332. if initResp.LoadBalancerDelegate != "" {
  333. // delegation
  334. grpclog.Println("TODO: Delegation is not supported yet.")
  335. return
  336. }
  337. streamDone := make(chan struct{})
  338. defer close(streamDone)
  339. b.mu.Lock()
  340. b.clientStats = lbpb.ClientStats{} // Clear client stats.
  341. b.mu.Unlock()
  342. if d := convertDuration(initResp.ClientStatsReportInterval); d > 0 {
  343. go b.sendLoadReport(stream, d, streamDone)
  344. }
  345. // Retrieve the server list.
  346. for {
  347. reply, err := stream.Recv()
  348. if err != nil {
  349. grpclog.Printf("grpclb: failed to recv server list: %v", err)
  350. break
  351. }
  352. b.mu.Lock()
  353. if b.done || seq < b.seq {
  354. b.mu.Unlock()
  355. return
  356. }
  357. b.seq++ // tick when receiving a new list of servers.
  358. seq = b.seq
  359. b.mu.Unlock()
  360. if serverList := reply.GetServerList(); serverList != nil {
  361. b.processServerList(serverList, seq)
  362. }
  363. }
  364. return true
  365. }
  366. func (b *balancer) Start(target string, config BalancerConfig) error {
  367. b.rand = rand.New(rand.NewSource(time.Now().Unix()))
  368. // TODO: Fall back to the basic direct connection if there is no name resolver.
  369. if b.r == nil {
  370. return errors.New("there is no name resolver installed")
  371. }
  372. b.target = target
  373. b.mu.Lock()
  374. if b.done {
  375. b.mu.Unlock()
  376. return ErrClientConnClosing
  377. }
  378. b.addrCh = make(chan []Address)
  379. w, err := b.r.Resolve(target)
  380. if err != nil {
  381. b.mu.Unlock()
  382. grpclog.Printf("grpclb: failed to resolve address: %v, err: %v", target, err)
  383. return err
  384. }
  385. b.w = w
  386. b.mu.Unlock()
  387. balancerAddrsCh := make(chan []remoteBalancerInfo, 1)
  388. // Spawn a goroutine to monitor the name resolution of remote load balancer.
  389. go func() {
  390. for {
  391. if err := b.watchAddrUpdates(w, balancerAddrsCh); err != nil {
  392. grpclog.Printf("grpclb: the naming watcher stops working due to %v.\n", err)
  393. close(balancerAddrsCh)
  394. return
  395. }
  396. }
  397. }()
  398. // Spawn a goroutine to talk to the remote load balancer.
  399. go func() {
  400. var (
  401. cc *ClientConn
  402. // ccError is closed when there is an error in the current cc.
  403. // A new rb should be picked from rbs and connected.
  404. ccError chan struct{}
  405. rb *remoteBalancerInfo
  406. rbs []remoteBalancerInfo
  407. rbIdx int
  408. )
  409. defer func() {
  410. if ccError != nil {
  411. select {
  412. case <-ccError:
  413. default:
  414. close(ccError)
  415. }
  416. }
  417. if cc != nil {
  418. cc.Close()
  419. }
  420. }()
  421. for {
  422. var ok bool
  423. select {
  424. case rbs, ok = <-balancerAddrsCh:
  425. if !ok {
  426. return
  427. }
  428. foundIdx := -1
  429. if rb != nil {
  430. for i, trb := range rbs {
  431. if trb == *rb {
  432. foundIdx = i
  433. break
  434. }
  435. }
  436. }
  437. if foundIdx >= 0 {
  438. if foundIdx >= 1 {
  439. // Move the address in use to the beginning of the list.
  440. b.rbs[0], b.rbs[foundIdx] = b.rbs[foundIdx], b.rbs[0]
  441. rbIdx = 0
  442. }
  443. continue // If found, don't dial new cc.
  444. } else if len(rbs) > 0 {
  445. // Pick a random one from the list, instead of always using the first one.
  446. if l := len(rbs); l > 1 && rb != nil {
  447. tmpIdx := b.rand.Intn(l - 1)
  448. b.rbs[0], b.rbs[tmpIdx] = b.rbs[tmpIdx], b.rbs[0]
  449. }
  450. rbIdx = 0
  451. rb = &rbs[0]
  452. } else {
  453. // foundIdx < 0 && len(rbs) <= 0.
  454. rb = nil
  455. }
  456. case <-ccError:
  457. ccError = nil
  458. if rbIdx < len(rbs)-1 {
  459. rbIdx++
  460. rb = &rbs[rbIdx]
  461. } else {
  462. rb = nil
  463. }
  464. }
  465. if rb == nil {
  466. continue
  467. }
  468. if cc != nil {
  469. cc.Close()
  470. }
  471. // Talk to the remote load balancer to get the server list.
  472. var (
  473. err error
  474. dopts []DialOption
  475. )
  476. if creds := config.DialCreds; creds != nil {
  477. if rb.name != "" {
  478. if err := creds.OverrideServerName(rb.name); err != nil {
  479. grpclog.Printf("grpclb: failed to override the server name in the credentials: %v", err)
  480. continue
  481. }
  482. }
  483. dopts = append(dopts, WithTransportCredentials(creds))
  484. } else {
  485. dopts = append(dopts, WithInsecure())
  486. }
  487. if dialer := config.Dialer; dialer != nil {
  488. // WithDialer takes a different type of function, so we instead use a special DialOption here.
  489. dopts = append(dopts, func(o *dialOptions) { o.copts.Dialer = dialer })
  490. }
  491. ccError = make(chan struct{})
  492. cc, err = Dial(rb.addr, dopts...)
  493. if err != nil {
  494. grpclog.Printf("grpclb: failed to setup a connection to the remote balancer %v: %v", rb.addr, err)
  495. close(ccError)
  496. continue
  497. }
  498. b.mu.Lock()
  499. b.seq++ // tick when getting a new balancer address
  500. seq := b.seq
  501. b.next = 0
  502. b.mu.Unlock()
  503. go func(cc *ClientConn, ccError chan struct{}) {
  504. lbc := &loadBalancerClient{cc}
  505. b.callRemoteBalancer(lbc, seq)
  506. cc.Close()
  507. select {
  508. case <-ccError:
  509. default:
  510. close(ccError)
  511. }
  512. }(cc, ccError)
  513. }
  514. }()
  515. return nil
  516. }
  517. func (b *balancer) down(addr Address, err error) {
  518. b.mu.Lock()
  519. defer b.mu.Unlock()
  520. for _, a := range b.addrs {
  521. if addr == a.addr {
  522. a.connected = false
  523. break
  524. }
  525. }
  526. }
  527. func (b *balancer) Up(addr Address) func(error) {
  528. b.mu.Lock()
  529. defer b.mu.Unlock()
  530. if b.done {
  531. return nil
  532. }
  533. var cnt int
  534. for _, a := range b.addrs {
  535. if a.addr == addr {
  536. if a.connected {
  537. return nil
  538. }
  539. a.connected = true
  540. }
  541. if a.connected && !a.dropForRateLimiting && !a.dropForLoadBalancing {
  542. cnt++
  543. }
  544. }
  545. // addr is the only one which is connected. Notify the Get() callers who are blocking.
  546. if cnt == 1 && b.waitCh != nil {
  547. close(b.waitCh)
  548. b.waitCh = nil
  549. }
  550. return func(err error) {
  551. b.down(addr, err)
  552. }
  553. }
  554. func (b *balancer) Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error) {
  555. var ch chan struct{}
  556. b.mu.Lock()
  557. if b.done {
  558. b.mu.Unlock()
  559. err = ErrClientConnClosing
  560. return
  561. }
  562. seq := b.seq
  563. defer func() {
  564. if err != nil {
  565. return
  566. }
  567. put = func() {
  568. s, ok := rpcInfoFromContext(ctx)
  569. if !ok {
  570. return
  571. }
  572. b.mu.Lock()
  573. defer b.mu.Unlock()
  574. if b.done || seq < b.seq {
  575. return
  576. }
  577. b.clientStats.NumCallsFinished++
  578. if !s.bytesSent {
  579. b.clientStats.NumCallsFinishedWithClientFailedToSend++
  580. } else if s.bytesReceived {
  581. b.clientStats.NumCallsFinishedKnownReceived++
  582. }
  583. }
  584. }()
  585. b.clientStats.NumCallsStarted++
  586. if len(b.addrs) > 0 {
  587. if b.next >= len(b.addrs) {
  588. b.next = 0
  589. }
  590. next := b.next
  591. for {
  592. a := b.addrs[next]
  593. next = (next + 1) % len(b.addrs)
  594. if a.connected {
  595. if !a.dropForRateLimiting && !a.dropForLoadBalancing {
  596. addr = a.addr
  597. b.next = next
  598. b.mu.Unlock()
  599. return
  600. }
  601. if !opts.BlockingWait {
  602. b.next = next
  603. if a.dropForLoadBalancing {
  604. b.clientStats.NumCallsFinished++
  605. b.clientStats.NumCallsFinishedWithDropForLoadBalancing++
  606. } else if a.dropForRateLimiting {
  607. b.clientStats.NumCallsFinished++
  608. b.clientStats.NumCallsFinishedWithDropForRateLimiting++
  609. }
  610. b.mu.Unlock()
  611. err = Errorf(codes.Unavailable, "%s drops requests", a.addr.Addr)
  612. return
  613. }
  614. }
  615. if next == b.next {
  616. // Has iterated all the possible address but none is connected.
  617. break
  618. }
  619. }
  620. }
  621. if !opts.BlockingWait {
  622. if len(b.addrs) == 0 {
  623. b.clientStats.NumCallsFinished++
  624. b.clientStats.NumCallsFinishedWithClientFailedToSend++
  625. b.mu.Unlock()
  626. err = Errorf(codes.Unavailable, "there is no address available")
  627. return
  628. }
  629. // Returns the next addr on b.addrs for a failfast RPC.
  630. addr = b.addrs[b.next].addr
  631. b.next++
  632. b.mu.Unlock()
  633. return
  634. }
  635. // Wait on b.waitCh for non-failfast RPCs.
  636. if b.waitCh == nil {
  637. ch = make(chan struct{})
  638. b.waitCh = ch
  639. } else {
  640. ch = b.waitCh
  641. }
  642. b.mu.Unlock()
  643. for {
  644. select {
  645. case <-ctx.Done():
  646. b.mu.Lock()
  647. b.clientStats.NumCallsFinished++
  648. b.clientStats.NumCallsFinishedWithClientFailedToSend++
  649. b.mu.Unlock()
  650. err = ctx.Err()
  651. return
  652. case <-ch:
  653. b.mu.Lock()
  654. if b.done {
  655. b.clientStats.NumCallsFinished++
  656. b.clientStats.NumCallsFinishedWithClientFailedToSend++
  657. b.mu.Unlock()
  658. err = ErrClientConnClosing
  659. return
  660. }
  661. if len(b.addrs) > 0 {
  662. if b.next >= len(b.addrs) {
  663. b.next = 0
  664. }
  665. next := b.next
  666. for {
  667. a := b.addrs[next]
  668. next = (next + 1) % len(b.addrs)
  669. if a.connected {
  670. if !a.dropForRateLimiting && !a.dropForLoadBalancing {
  671. addr = a.addr
  672. b.next = next
  673. b.mu.Unlock()
  674. return
  675. }
  676. if !opts.BlockingWait {
  677. b.next = next
  678. if a.dropForLoadBalancing {
  679. b.clientStats.NumCallsFinished++
  680. b.clientStats.NumCallsFinishedWithDropForLoadBalancing++
  681. } else if a.dropForRateLimiting {
  682. b.clientStats.NumCallsFinished++
  683. b.clientStats.NumCallsFinishedWithDropForRateLimiting++
  684. }
  685. b.mu.Unlock()
  686. err = Errorf(codes.Unavailable, "drop requests for the addreess %s", a.addr.Addr)
  687. return
  688. }
  689. }
  690. if next == b.next {
  691. // Has iterated all the possible address but none is connected.
  692. break
  693. }
  694. }
  695. }
  696. // The newly added addr got removed by Down() again.
  697. if b.waitCh == nil {
  698. ch = make(chan struct{})
  699. b.waitCh = ch
  700. } else {
  701. ch = b.waitCh
  702. }
  703. b.mu.Unlock()
  704. }
  705. }
  706. }
  707. func (b *balancer) Notify() <-chan []Address {
  708. return b.addrCh
  709. }
  710. func (b *balancer) Close() error {
  711. b.mu.Lock()
  712. defer b.mu.Unlock()
  713. if b.done {
  714. return errBalancerClosed
  715. }
  716. b.done = true
  717. if b.expTimer != nil {
  718. b.expTimer.Stop()
  719. }
  720. if b.waitCh != nil {
  721. close(b.waitCh)
  722. }
  723. if b.addrCh != nil {
  724. close(b.addrCh)
  725. }
  726. if b.w != nil {
  727. b.w.Close()
  728. }
  729. return nil
  730. }