grpclb.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757
  1. /*
  2. *
  3. * Copyright 2016 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "errors"
  21. "fmt"
  22. "math/rand"
  23. "net"
  24. "sync"
  25. "time"
  26. "golang.org/x/net/context"
  27. "google.golang.org/grpc/codes"
  28. lbpb "google.golang.org/grpc/grpclb/grpc_lb_v1"
  29. "google.golang.org/grpc/grpclog"
  30. "google.golang.org/grpc/metadata"
  31. "google.golang.org/grpc/naming"
  32. )
  33. // Client API for LoadBalancer service.
  34. // Mostly copied from generated pb.go file.
  35. // To avoid circular dependency.
  36. type loadBalancerClient struct {
  37. cc *ClientConn
  38. }
  39. func (c *loadBalancerClient) BalanceLoad(ctx context.Context, opts ...CallOption) (*balanceLoadClientStream, error) {
  40. desc := &StreamDesc{
  41. StreamName: "BalanceLoad",
  42. ServerStreams: true,
  43. ClientStreams: true,
  44. }
  45. stream, err := NewClientStream(ctx, desc, c.cc, "/grpc.lb.v1.LoadBalancer/BalanceLoad", opts...)
  46. if err != nil {
  47. return nil, err
  48. }
  49. x := &balanceLoadClientStream{stream}
  50. return x, nil
  51. }
  52. type balanceLoadClientStream struct {
  53. ClientStream
  54. }
  55. func (x *balanceLoadClientStream) Send(m *lbpb.LoadBalanceRequest) error {
  56. return x.ClientStream.SendMsg(m)
  57. }
  58. func (x *balanceLoadClientStream) Recv() (*lbpb.LoadBalanceResponse, error) {
  59. m := new(lbpb.LoadBalanceResponse)
  60. if err := x.ClientStream.RecvMsg(m); err != nil {
  61. return nil, err
  62. }
  63. return m, nil
  64. }
  65. // AddressType indicates the address type returned by name resolution.
  66. type AddressType uint8
  67. const (
  68. // Backend indicates the server is a backend server.
  69. Backend AddressType = iota
  70. // GRPCLB indicates the server is a grpclb load balancer.
  71. GRPCLB
  72. )
  73. // AddrMetadataGRPCLB contains the information the name resolver for grpclb should provide. The
  74. // name resolver used by the grpclb balancer is required to provide this type of metadata in
  75. // its address updates.
  76. type AddrMetadataGRPCLB struct {
  77. // AddrType is the type of server (grpc load balancer or backend).
  78. AddrType AddressType
  79. // ServerName is the name of the grpc load balancer. Used for authentication.
  80. ServerName string
  81. }
  82. // NewGRPCLBBalancer creates a grpclb load balancer.
  83. func NewGRPCLBBalancer(r naming.Resolver) Balancer {
  84. return &balancer{
  85. r: r,
  86. }
  87. }
  88. type remoteBalancerInfo struct {
  89. addr string
  90. // the server name used for authentication with the remote LB server.
  91. name string
  92. }
  93. // grpclbAddrInfo consists of the information of a backend server.
  94. type grpclbAddrInfo struct {
  95. addr Address
  96. connected bool
  97. // dropForRateLimiting indicates whether this particular request should be
  98. // dropped by the client for rate limiting.
  99. dropForRateLimiting bool
  100. // dropForLoadBalancing indicates whether this particular request should be
  101. // dropped by the client for load balancing.
  102. dropForLoadBalancing bool
  103. }
  104. type balancer struct {
  105. r naming.Resolver
  106. target string
  107. mu sync.Mutex
  108. seq int // a sequence number to make sure addrCh does not get stale addresses.
  109. w naming.Watcher
  110. addrCh chan []Address
  111. rbs []remoteBalancerInfo
  112. addrs []*grpclbAddrInfo
  113. next int
  114. waitCh chan struct{}
  115. done bool
  116. expTimer *time.Timer
  117. rand *rand.Rand
  118. clientStats lbpb.ClientStats
  119. }
  120. func (b *balancer) watchAddrUpdates(w naming.Watcher, ch chan []remoteBalancerInfo) error {
  121. updates, err := w.Next()
  122. if err != nil {
  123. grpclog.Warningf("grpclb: failed to get next addr update from watcher: %v", err)
  124. return err
  125. }
  126. b.mu.Lock()
  127. defer b.mu.Unlock()
  128. if b.done {
  129. return ErrClientConnClosing
  130. }
  131. for _, update := range updates {
  132. switch update.Op {
  133. case naming.Add:
  134. var exist bool
  135. for _, v := range b.rbs {
  136. // TODO: Is the same addr with different server name a different balancer?
  137. if update.Addr == v.addr {
  138. exist = true
  139. break
  140. }
  141. }
  142. if exist {
  143. continue
  144. }
  145. md, ok := update.Metadata.(*AddrMetadataGRPCLB)
  146. if !ok {
  147. // TODO: Revisit the handling here and may introduce some fallback mechanism.
  148. grpclog.Errorf("The name resolution contains unexpected metadata %v", update.Metadata)
  149. continue
  150. }
  151. switch md.AddrType {
  152. case Backend:
  153. // TODO: Revisit the handling here and may introduce some fallback mechanism.
  154. grpclog.Errorf("The name resolution does not give grpclb addresses")
  155. continue
  156. case GRPCLB:
  157. b.rbs = append(b.rbs, remoteBalancerInfo{
  158. addr: update.Addr,
  159. name: md.ServerName,
  160. })
  161. default:
  162. grpclog.Errorf("Received unknow address type %d", md.AddrType)
  163. continue
  164. }
  165. case naming.Delete:
  166. for i, v := range b.rbs {
  167. if update.Addr == v.addr {
  168. copy(b.rbs[i:], b.rbs[i+1:])
  169. b.rbs = b.rbs[:len(b.rbs)-1]
  170. break
  171. }
  172. }
  173. default:
  174. grpclog.Errorf("Unknown update.Op %v", update.Op)
  175. }
  176. }
  177. // TODO: Fall back to the basic round-robin load balancing if the resulting address is
  178. // not a load balancer.
  179. select {
  180. case <-ch:
  181. default:
  182. }
  183. ch <- b.rbs
  184. return nil
  185. }
  186. func (b *balancer) serverListExpire(seq int) {
  187. b.mu.Lock()
  188. defer b.mu.Unlock()
  189. // TODO: gRPC interanls do not clear the connections when the server list is stale.
  190. // This means RPCs will keep using the existing server list until b receives new
  191. // server list even though the list is expired. Revisit this behavior later.
  192. if b.done || seq < b.seq {
  193. return
  194. }
  195. b.next = 0
  196. b.addrs = nil
  197. // Ask grpc internals to close all the corresponding connections.
  198. b.addrCh <- nil
  199. }
  200. func convertDuration(d *lbpb.Duration) time.Duration {
  201. if d == nil {
  202. return 0
  203. }
  204. return time.Duration(d.Seconds)*time.Second + time.Duration(d.Nanos)*time.Nanosecond
  205. }
  206. func (b *balancer) processServerList(l *lbpb.ServerList, seq int) {
  207. if l == nil {
  208. return
  209. }
  210. servers := l.GetServers()
  211. expiration := convertDuration(l.GetExpirationInterval())
  212. var (
  213. sl []*grpclbAddrInfo
  214. addrs []Address
  215. )
  216. for _, s := range servers {
  217. md := metadata.Pairs("lb-token", s.LoadBalanceToken)
  218. ip := net.IP(s.IpAddress)
  219. ipStr := ip.String()
  220. if ip.To4() == nil {
  221. // Add square brackets to ipv6 addresses, otherwise net.Dial() and
  222. // net.SplitHostPort() will return too many colons error.
  223. ipStr = fmt.Sprintf("[%s]", ipStr)
  224. }
  225. addr := Address{
  226. Addr: fmt.Sprintf("%s:%d", ipStr, s.Port),
  227. Metadata: &md,
  228. }
  229. sl = append(sl, &grpclbAddrInfo{
  230. addr: addr,
  231. dropForRateLimiting: s.DropForRateLimiting,
  232. dropForLoadBalancing: s.DropForLoadBalancing,
  233. })
  234. addrs = append(addrs, addr)
  235. }
  236. b.mu.Lock()
  237. defer b.mu.Unlock()
  238. if b.done || seq < b.seq {
  239. return
  240. }
  241. if len(sl) > 0 {
  242. // reset b.next to 0 when replacing the server list.
  243. b.next = 0
  244. b.addrs = sl
  245. b.addrCh <- addrs
  246. if b.expTimer != nil {
  247. b.expTimer.Stop()
  248. b.expTimer = nil
  249. }
  250. if expiration > 0 {
  251. b.expTimer = time.AfterFunc(expiration, func() {
  252. b.serverListExpire(seq)
  253. })
  254. }
  255. }
  256. return
  257. }
  258. func (b *balancer) sendLoadReport(s *balanceLoadClientStream, interval time.Duration, done <-chan struct{}) {
  259. ticker := time.NewTicker(interval)
  260. defer ticker.Stop()
  261. for {
  262. select {
  263. case <-ticker.C:
  264. case <-done:
  265. return
  266. }
  267. b.mu.Lock()
  268. stats := b.clientStats
  269. b.clientStats = lbpb.ClientStats{} // Clear the stats.
  270. b.mu.Unlock()
  271. t := time.Now()
  272. stats.Timestamp = &lbpb.Timestamp{
  273. Seconds: t.Unix(),
  274. Nanos: int32(t.Nanosecond()),
  275. }
  276. if err := s.Send(&lbpb.LoadBalanceRequest{
  277. LoadBalanceRequestType: &lbpb.LoadBalanceRequest_ClientStats{
  278. ClientStats: &stats,
  279. },
  280. }); err != nil {
  281. grpclog.Errorf("grpclb: failed to send load report: %v", err)
  282. return
  283. }
  284. }
  285. }
  286. func (b *balancer) callRemoteBalancer(lbc *loadBalancerClient, seq int) (retry bool) {
  287. ctx, cancel := context.WithCancel(context.Background())
  288. defer cancel()
  289. stream, err := lbc.BalanceLoad(ctx)
  290. if err != nil {
  291. grpclog.Errorf("grpclb: failed to perform RPC to the remote balancer %v", err)
  292. return
  293. }
  294. b.mu.Lock()
  295. if b.done {
  296. b.mu.Unlock()
  297. return
  298. }
  299. b.mu.Unlock()
  300. initReq := &lbpb.LoadBalanceRequest{
  301. LoadBalanceRequestType: &lbpb.LoadBalanceRequest_InitialRequest{
  302. InitialRequest: &lbpb.InitialLoadBalanceRequest{
  303. Name: b.target,
  304. },
  305. },
  306. }
  307. if err := stream.Send(initReq); err != nil {
  308. grpclog.Errorf("grpclb: failed to send init request: %v", err)
  309. // TODO: backoff on retry?
  310. return true
  311. }
  312. reply, err := stream.Recv()
  313. if err != nil {
  314. grpclog.Errorf("grpclb: failed to recv init response: %v", err)
  315. // TODO: backoff on retry?
  316. return true
  317. }
  318. initResp := reply.GetInitialResponse()
  319. if initResp == nil {
  320. grpclog.Errorf("grpclb: reply from remote balancer did not include initial response.")
  321. return
  322. }
  323. // TODO: Support delegation.
  324. if initResp.LoadBalancerDelegate != "" {
  325. // delegation
  326. grpclog.Errorf("TODO: Delegation is not supported yet.")
  327. return
  328. }
  329. streamDone := make(chan struct{})
  330. defer close(streamDone)
  331. b.mu.Lock()
  332. b.clientStats = lbpb.ClientStats{} // Clear client stats.
  333. b.mu.Unlock()
  334. if d := convertDuration(initResp.ClientStatsReportInterval); d > 0 {
  335. go b.sendLoadReport(stream, d, streamDone)
  336. }
  337. // Retrieve the server list.
  338. for {
  339. reply, err := stream.Recv()
  340. if err != nil {
  341. grpclog.Errorf("grpclb: failed to recv server list: %v", err)
  342. break
  343. }
  344. b.mu.Lock()
  345. if b.done || seq < b.seq {
  346. b.mu.Unlock()
  347. return
  348. }
  349. b.seq++ // tick when receiving a new list of servers.
  350. seq = b.seq
  351. b.mu.Unlock()
  352. if serverList := reply.GetServerList(); serverList != nil {
  353. b.processServerList(serverList, seq)
  354. }
  355. }
  356. return true
  357. }
  358. func (b *balancer) Start(target string, config BalancerConfig) error {
  359. b.rand = rand.New(rand.NewSource(time.Now().Unix()))
  360. // TODO: Fall back to the basic direct connection if there is no name resolver.
  361. if b.r == nil {
  362. return errors.New("there is no name resolver installed")
  363. }
  364. b.target = target
  365. b.mu.Lock()
  366. if b.done {
  367. b.mu.Unlock()
  368. return ErrClientConnClosing
  369. }
  370. b.addrCh = make(chan []Address)
  371. w, err := b.r.Resolve(target)
  372. if err != nil {
  373. b.mu.Unlock()
  374. grpclog.Errorf("grpclb: failed to resolve address: %v, err: %v", target, err)
  375. return err
  376. }
  377. b.w = w
  378. b.mu.Unlock()
  379. balancerAddrsCh := make(chan []remoteBalancerInfo, 1)
  380. // Spawn a goroutine to monitor the name resolution of remote load balancer.
  381. go func() {
  382. for {
  383. if err := b.watchAddrUpdates(w, balancerAddrsCh); err != nil {
  384. grpclog.Warningf("grpclb: the naming watcher stops working due to %v.\n", err)
  385. close(balancerAddrsCh)
  386. return
  387. }
  388. }
  389. }()
  390. // Spawn a goroutine to talk to the remote load balancer.
  391. go func() {
  392. var (
  393. cc *ClientConn
  394. // ccError is closed when there is an error in the current cc.
  395. // A new rb should be picked from rbs and connected.
  396. ccError chan struct{}
  397. rb *remoteBalancerInfo
  398. rbs []remoteBalancerInfo
  399. rbIdx int
  400. )
  401. defer func() {
  402. if ccError != nil {
  403. select {
  404. case <-ccError:
  405. default:
  406. close(ccError)
  407. }
  408. }
  409. if cc != nil {
  410. cc.Close()
  411. }
  412. }()
  413. for {
  414. var ok bool
  415. select {
  416. case rbs, ok = <-balancerAddrsCh:
  417. if !ok {
  418. return
  419. }
  420. foundIdx := -1
  421. if rb != nil {
  422. for i, trb := range rbs {
  423. if trb == *rb {
  424. foundIdx = i
  425. break
  426. }
  427. }
  428. }
  429. if foundIdx >= 0 {
  430. if foundIdx >= 1 {
  431. // Move the address in use to the beginning of the list.
  432. b.rbs[0], b.rbs[foundIdx] = b.rbs[foundIdx], b.rbs[0]
  433. rbIdx = 0
  434. }
  435. continue // If found, don't dial new cc.
  436. } else if len(rbs) > 0 {
  437. // Pick a random one from the list, instead of always using the first one.
  438. if l := len(rbs); l > 1 && rb != nil {
  439. tmpIdx := b.rand.Intn(l - 1)
  440. b.rbs[0], b.rbs[tmpIdx] = b.rbs[tmpIdx], b.rbs[0]
  441. }
  442. rbIdx = 0
  443. rb = &rbs[0]
  444. } else {
  445. // foundIdx < 0 && len(rbs) <= 0.
  446. rb = nil
  447. }
  448. case <-ccError:
  449. ccError = nil
  450. if rbIdx < len(rbs)-1 {
  451. rbIdx++
  452. rb = &rbs[rbIdx]
  453. } else {
  454. rb = nil
  455. }
  456. }
  457. if rb == nil {
  458. continue
  459. }
  460. if cc != nil {
  461. cc.Close()
  462. }
  463. // Talk to the remote load balancer to get the server list.
  464. var (
  465. err error
  466. dopts []DialOption
  467. )
  468. if creds := config.DialCreds; creds != nil {
  469. if rb.name != "" {
  470. if err := creds.OverrideServerName(rb.name); err != nil {
  471. grpclog.Warningf("grpclb: failed to override the server name in the credentials: %v", err)
  472. continue
  473. }
  474. }
  475. dopts = append(dopts, WithTransportCredentials(creds))
  476. } else {
  477. dopts = append(dopts, WithInsecure())
  478. }
  479. if dialer := config.Dialer; dialer != nil {
  480. // WithDialer takes a different type of function, so we instead use a special DialOption here.
  481. dopts = append(dopts, func(o *dialOptions) { o.copts.Dialer = dialer })
  482. }
  483. ccError = make(chan struct{})
  484. cc, err = Dial(rb.addr, dopts...)
  485. if err != nil {
  486. grpclog.Warningf("grpclb: failed to setup a connection to the remote balancer %v: %v", rb.addr, err)
  487. close(ccError)
  488. continue
  489. }
  490. b.mu.Lock()
  491. b.seq++ // tick when getting a new balancer address
  492. seq := b.seq
  493. b.next = 0
  494. b.mu.Unlock()
  495. go func(cc *ClientConn, ccError chan struct{}) {
  496. lbc := &loadBalancerClient{cc}
  497. b.callRemoteBalancer(lbc, seq)
  498. cc.Close()
  499. select {
  500. case <-ccError:
  501. default:
  502. close(ccError)
  503. }
  504. }(cc, ccError)
  505. }
  506. }()
  507. return nil
  508. }
  509. func (b *balancer) down(addr Address, err error) {
  510. b.mu.Lock()
  511. defer b.mu.Unlock()
  512. for _, a := range b.addrs {
  513. if addr == a.addr {
  514. a.connected = false
  515. break
  516. }
  517. }
  518. }
  519. func (b *balancer) Up(addr Address) func(error) {
  520. b.mu.Lock()
  521. defer b.mu.Unlock()
  522. if b.done {
  523. return nil
  524. }
  525. var cnt int
  526. for _, a := range b.addrs {
  527. if a.addr == addr {
  528. if a.connected {
  529. return nil
  530. }
  531. a.connected = true
  532. }
  533. if a.connected && !a.dropForRateLimiting && !a.dropForLoadBalancing {
  534. cnt++
  535. }
  536. }
  537. // addr is the only one which is connected. Notify the Get() callers who are blocking.
  538. if cnt == 1 && b.waitCh != nil {
  539. close(b.waitCh)
  540. b.waitCh = nil
  541. }
  542. return func(err error) {
  543. b.down(addr, err)
  544. }
  545. }
  546. func (b *balancer) Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error) {
  547. var ch chan struct{}
  548. b.mu.Lock()
  549. if b.done {
  550. b.mu.Unlock()
  551. err = ErrClientConnClosing
  552. return
  553. }
  554. seq := b.seq
  555. defer func() {
  556. if err != nil {
  557. return
  558. }
  559. put = func() {
  560. s, ok := rpcInfoFromContext(ctx)
  561. if !ok {
  562. return
  563. }
  564. b.mu.Lock()
  565. defer b.mu.Unlock()
  566. if b.done || seq < b.seq {
  567. return
  568. }
  569. b.clientStats.NumCallsFinished++
  570. if !s.bytesSent {
  571. b.clientStats.NumCallsFinishedWithClientFailedToSend++
  572. } else if s.bytesReceived {
  573. b.clientStats.NumCallsFinishedKnownReceived++
  574. }
  575. }
  576. }()
  577. b.clientStats.NumCallsStarted++
  578. if len(b.addrs) > 0 {
  579. if b.next >= len(b.addrs) {
  580. b.next = 0
  581. }
  582. next := b.next
  583. for {
  584. a := b.addrs[next]
  585. next = (next + 1) % len(b.addrs)
  586. if a.connected {
  587. if !a.dropForRateLimiting && !a.dropForLoadBalancing {
  588. addr = a.addr
  589. b.next = next
  590. b.mu.Unlock()
  591. return
  592. }
  593. if !opts.BlockingWait {
  594. b.next = next
  595. if a.dropForLoadBalancing {
  596. b.clientStats.NumCallsFinished++
  597. b.clientStats.NumCallsFinishedWithDropForLoadBalancing++
  598. } else if a.dropForRateLimiting {
  599. b.clientStats.NumCallsFinished++
  600. b.clientStats.NumCallsFinishedWithDropForRateLimiting++
  601. }
  602. b.mu.Unlock()
  603. err = Errorf(codes.Unavailable, "%s drops requests", a.addr.Addr)
  604. return
  605. }
  606. }
  607. if next == b.next {
  608. // Has iterated all the possible address but none is connected.
  609. break
  610. }
  611. }
  612. }
  613. if !opts.BlockingWait {
  614. if len(b.addrs) == 0 {
  615. b.clientStats.NumCallsFinished++
  616. b.clientStats.NumCallsFinishedWithClientFailedToSend++
  617. b.mu.Unlock()
  618. err = Errorf(codes.Unavailable, "there is no address available")
  619. return
  620. }
  621. // Returns the next addr on b.addrs for a failfast RPC.
  622. addr = b.addrs[b.next].addr
  623. b.next++
  624. b.mu.Unlock()
  625. return
  626. }
  627. // Wait on b.waitCh for non-failfast RPCs.
  628. if b.waitCh == nil {
  629. ch = make(chan struct{})
  630. b.waitCh = ch
  631. } else {
  632. ch = b.waitCh
  633. }
  634. b.mu.Unlock()
  635. for {
  636. select {
  637. case <-ctx.Done():
  638. b.mu.Lock()
  639. b.clientStats.NumCallsFinished++
  640. b.clientStats.NumCallsFinishedWithClientFailedToSend++
  641. b.mu.Unlock()
  642. err = ctx.Err()
  643. return
  644. case <-ch:
  645. b.mu.Lock()
  646. if b.done {
  647. b.clientStats.NumCallsFinished++
  648. b.clientStats.NumCallsFinishedWithClientFailedToSend++
  649. b.mu.Unlock()
  650. err = ErrClientConnClosing
  651. return
  652. }
  653. if len(b.addrs) > 0 {
  654. if b.next >= len(b.addrs) {
  655. b.next = 0
  656. }
  657. next := b.next
  658. for {
  659. a := b.addrs[next]
  660. next = (next + 1) % len(b.addrs)
  661. if a.connected {
  662. if !a.dropForRateLimiting && !a.dropForLoadBalancing {
  663. addr = a.addr
  664. b.next = next
  665. b.mu.Unlock()
  666. return
  667. }
  668. if !opts.BlockingWait {
  669. b.next = next
  670. if a.dropForLoadBalancing {
  671. b.clientStats.NumCallsFinished++
  672. b.clientStats.NumCallsFinishedWithDropForLoadBalancing++
  673. } else if a.dropForRateLimiting {
  674. b.clientStats.NumCallsFinished++
  675. b.clientStats.NumCallsFinishedWithDropForRateLimiting++
  676. }
  677. b.mu.Unlock()
  678. err = Errorf(codes.Unavailable, "drop requests for the addreess %s", a.addr.Addr)
  679. return
  680. }
  681. }
  682. if next == b.next {
  683. // Has iterated all the possible address but none is connected.
  684. break
  685. }
  686. }
  687. }
  688. // The newly added addr got removed by Down() again.
  689. if b.waitCh == nil {
  690. ch = make(chan struct{})
  691. b.waitCh = ch
  692. } else {
  693. ch = b.waitCh
  694. }
  695. b.mu.Unlock()
  696. }
  697. }
  698. }
  699. func (b *balancer) Notify() <-chan []Address {
  700. return b.addrCh
  701. }
  702. func (b *balancer) Close() error {
  703. b.mu.Lock()
  704. defer b.mu.Unlock()
  705. if b.done {
  706. return errBalancerClosed
  707. }
  708. b.done = true
  709. if b.expTimer != nil {
  710. b.expTimer.Stop()
  711. }
  712. if b.waitCh != nil {
  713. close(b.waitCh)
  714. }
  715. if b.addrCh != nil {
  716. close(b.addrCh)
  717. }
  718. if b.w != nil {
  719. b.w.Close()
  720. }
  721. return nil
  722. }