grpclb.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708
  1. /*
  2. *
  3. * Copyright 2016 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "errors"
  21. "fmt"
  22. "math/rand"
  23. "net"
  24. "sync"
  25. "time"
  26. "golang.org/x/net/context"
  27. "google.golang.org/grpc/codes"
  28. lbmpb "google.golang.org/grpc/grpclb/grpc_lb_v1/messages"
  29. "google.golang.org/grpc/grpclog"
  30. "google.golang.org/grpc/metadata"
  31. "google.golang.org/grpc/naming"
  32. )
  33. // Client API for LoadBalancer service.
  34. // Mostly copied from generated pb.go file.
  35. // To avoid circular dependency.
  36. type loadBalancerClient struct {
  37. cc *ClientConn
  38. }
  39. func (c *loadBalancerClient) BalanceLoad(ctx context.Context, opts ...CallOption) (*balanceLoadClientStream, error) {
  40. desc := &StreamDesc{
  41. StreamName: "BalanceLoad",
  42. ServerStreams: true,
  43. ClientStreams: true,
  44. }
  45. stream, err := NewClientStream(ctx, desc, c.cc, "/grpc.lb.v1.LoadBalancer/BalanceLoad", opts...)
  46. if err != nil {
  47. return nil, err
  48. }
  49. x := &balanceLoadClientStream{stream}
  50. return x, nil
  51. }
  52. type balanceLoadClientStream struct {
  53. ClientStream
  54. }
  55. func (x *balanceLoadClientStream) Send(m *lbmpb.LoadBalanceRequest) error {
  56. return x.ClientStream.SendMsg(m)
  57. }
  58. func (x *balanceLoadClientStream) Recv() (*lbmpb.LoadBalanceResponse, error) {
  59. m := new(lbmpb.LoadBalanceResponse)
  60. if err := x.ClientStream.RecvMsg(m); err != nil {
  61. return nil, err
  62. }
  63. return m, nil
  64. }
  65. // NewGRPCLBBalancer creates a grpclb load balancer.
  66. func NewGRPCLBBalancer(r naming.Resolver) Balancer {
  67. return &balancer{
  68. r: r,
  69. }
  70. }
  71. type remoteBalancerInfo struct {
  72. addr string
  73. // the server name used for authentication with the remote LB server.
  74. name string
  75. }
  76. // grpclbAddrInfo consists of the information of a backend server.
  77. type grpclbAddrInfo struct {
  78. addr Address
  79. connected bool
  80. // dropForRateLimiting indicates whether this particular request should be
  81. // dropped by the client for rate limiting.
  82. dropForRateLimiting bool
  83. // dropForLoadBalancing indicates whether this particular request should be
  84. // dropped by the client for load balancing.
  85. dropForLoadBalancing bool
  86. }
  87. type balancer struct {
  88. r naming.Resolver
  89. target string
  90. mu sync.Mutex
  91. seq int // a sequence number to make sure addrCh does not get stale addresses.
  92. w naming.Watcher
  93. addrCh chan []Address
  94. rbs []remoteBalancerInfo
  95. addrs []*grpclbAddrInfo
  96. next int
  97. waitCh chan struct{}
  98. done bool
  99. rand *rand.Rand
  100. clientStats lbmpb.ClientStats
  101. }
  102. func (b *balancer) watchAddrUpdates(w naming.Watcher, ch chan []remoteBalancerInfo) error {
  103. updates, err := w.Next()
  104. if err != nil {
  105. grpclog.Warningf("grpclb: failed to get next addr update from watcher: %v", err)
  106. return err
  107. }
  108. b.mu.Lock()
  109. defer b.mu.Unlock()
  110. if b.done {
  111. return ErrClientConnClosing
  112. }
  113. for _, update := range updates {
  114. switch update.Op {
  115. case naming.Add:
  116. var exist bool
  117. for _, v := range b.rbs {
  118. // TODO: Is the same addr with different server name a different balancer?
  119. if update.Addr == v.addr {
  120. exist = true
  121. break
  122. }
  123. }
  124. if exist {
  125. continue
  126. }
  127. md, ok := update.Metadata.(*naming.AddrMetadataGRPCLB)
  128. if !ok {
  129. // TODO: Revisit the handling here and may introduce some fallback mechanism.
  130. grpclog.Errorf("The name resolution contains unexpected metadata %v", update.Metadata)
  131. continue
  132. }
  133. switch md.AddrType {
  134. case naming.Backend:
  135. // TODO: Revisit the handling here and may introduce some fallback mechanism.
  136. grpclog.Errorf("The name resolution does not give grpclb addresses")
  137. continue
  138. case naming.GRPCLB:
  139. b.rbs = append(b.rbs, remoteBalancerInfo{
  140. addr: update.Addr,
  141. name: md.ServerName,
  142. })
  143. default:
  144. grpclog.Errorf("Received unknow address type %d", md.AddrType)
  145. continue
  146. }
  147. case naming.Delete:
  148. for i, v := range b.rbs {
  149. if update.Addr == v.addr {
  150. copy(b.rbs[i:], b.rbs[i+1:])
  151. b.rbs = b.rbs[:len(b.rbs)-1]
  152. break
  153. }
  154. }
  155. default:
  156. grpclog.Errorf("Unknown update.Op %v", update.Op)
  157. }
  158. }
  159. // TODO: Fall back to the basic round-robin load balancing if the resulting address is
  160. // not a load balancer.
  161. select {
  162. case <-ch:
  163. default:
  164. }
  165. ch <- b.rbs
  166. return nil
  167. }
  168. func convertDuration(d *lbmpb.Duration) time.Duration {
  169. if d == nil {
  170. return 0
  171. }
  172. return time.Duration(d.Seconds)*time.Second + time.Duration(d.Nanos)*time.Nanosecond
  173. }
  174. func (b *balancer) processServerList(l *lbmpb.ServerList, seq int) {
  175. if l == nil {
  176. return
  177. }
  178. servers := l.GetServers()
  179. var (
  180. sl []*grpclbAddrInfo
  181. addrs []Address
  182. )
  183. for _, s := range servers {
  184. md := metadata.Pairs("lb-token", s.LoadBalanceToken)
  185. ip := net.IP(s.IpAddress)
  186. ipStr := ip.String()
  187. if ip.To4() == nil {
  188. // Add square brackets to ipv6 addresses, otherwise net.Dial() and
  189. // net.SplitHostPort() will return too many colons error.
  190. ipStr = fmt.Sprintf("[%s]", ipStr)
  191. }
  192. addr := Address{
  193. Addr: fmt.Sprintf("%s:%d", ipStr, s.Port),
  194. Metadata: &md,
  195. }
  196. sl = append(sl, &grpclbAddrInfo{
  197. addr: addr,
  198. dropForRateLimiting: s.DropForRateLimiting,
  199. dropForLoadBalancing: s.DropForLoadBalancing,
  200. })
  201. addrs = append(addrs, addr)
  202. }
  203. b.mu.Lock()
  204. defer b.mu.Unlock()
  205. if b.done || seq < b.seq {
  206. return
  207. }
  208. if len(sl) > 0 {
  209. // reset b.next to 0 when replacing the server list.
  210. b.next = 0
  211. b.addrs = sl
  212. b.addrCh <- addrs
  213. }
  214. return
  215. }
  216. func (b *balancer) sendLoadReport(s *balanceLoadClientStream, interval time.Duration, done <-chan struct{}) {
  217. ticker := time.NewTicker(interval)
  218. defer ticker.Stop()
  219. for {
  220. select {
  221. case <-ticker.C:
  222. case <-done:
  223. return
  224. }
  225. b.mu.Lock()
  226. stats := b.clientStats
  227. b.clientStats = lbmpb.ClientStats{} // Clear the stats.
  228. b.mu.Unlock()
  229. t := time.Now()
  230. stats.Timestamp = &lbmpb.Timestamp{
  231. Seconds: t.Unix(),
  232. Nanos: int32(t.Nanosecond()),
  233. }
  234. if err := s.Send(&lbmpb.LoadBalanceRequest{
  235. LoadBalanceRequestType: &lbmpb.LoadBalanceRequest_ClientStats{
  236. ClientStats: &stats,
  237. },
  238. }); err != nil {
  239. grpclog.Errorf("grpclb: failed to send load report: %v", err)
  240. return
  241. }
  242. }
  243. }
  244. func (b *balancer) callRemoteBalancer(lbc *loadBalancerClient, seq int) (retry bool) {
  245. ctx, cancel := context.WithCancel(context.Background())
  246. defer cancel()
  247. stream, err := lbc.BalanceLoad(ctx)
  248. if err != nil {
  249. grpclog.Errorf("grpclb: failed to perform RPC to the remote balancer %v", err)
  250. return
  251. }
  252. b.mu.Lock()
  253. if b.done {
  254. b.mu.Unlock()
  255. return
  256. }
  257. b.mu.Unlock()
  258. initReq := &lbmpb.LoadBalanceRequest{
  259. LoadBalanceRequestType: &lbmpb.LoadBalanceRequest_InitialRequest{
  260. InitialRequest: &lbmpb.InitialLoadBalanceRequest{
  261. Name: b.target,
  262. },
  263. },
  264. }
  265. if err := stream.Send(initReq); err != nil {
  266. grpclog.Errorf("grpclb: failed to send init request: %v", err)
  267. // TODO: backoff on retry?
  268. return true
  269. }
  270. reply, err := stream.Recv()
  271. if err != nil {
  272. grpclog.Errorf("grpclb: failed to recv init response: %v", err)
  273. // TODO: backoff on retry?
  274. return true
  275. }
  276. initResp := reply.GetInitialResponse()
  277. if initResp == nil {
  278. grpclog.Errorf("grpclb: reply from remote balancer did not include initial response.")
  279. return
  280. }
  281. // TODO: Support delegation.
  282. if initResp.LoadBalancerDelegate != "" {
  283. // delegation
  284. grpclog.Errorf("TODO: Delegation is not supported yet.")
  285. return
  286. }
  287. streamDone := make(chan struct{})
  288. defer close(streamDone)
  289. b.mu.Lock()
  290. b.clientStats = lbmpb.ClientStats{} // Clear client stats.
  291. b.mu.Unlock()
  292. if d := convertDuration(initResp.ClientStatsReportInterval); d > 0 {
  293. go b.sendLoadReport(stream, d, streamDone)
  294. }
  295. // Retrieve the server list.
  296. for {
  297. reply, err := stream.Recv()
  298. if err != nil {
  299. grpclog.Errorf("grpclb: failed to recv server list: %v", err)
  300. break
  301. }
  302. b.mu.Lock()
  303. if b.done || seq < b.seq {
  304. b.mu.Unlock()
  305. return
  306. }
  307. b.seq++ // tick when receiving a new list of servers.
  308. seq = b.seq
  309. b.mu.Unlock()
  310. if serverList := reply.GetServerList(); serverList != nil {
  311. b.processServerList(serverList, seq)
  312. }
  313. }
  314. return true
  315. }
  316. func (b *balancer) Start(target string, config BalancerConfig) error {
  317. b.rand = rand.New(rand.NewSource(time.Now().Unix()))
  318. // TODO: Fall back to the basic direct connection if there is no name resolver.
  319. if b.r == nil {
  320. return errors.New("there is no name resolver installed")
  321. }
  322. b.target = target
  323. b.mu.Lock()
  324. if b.done {
  325. b.mu.Unlock()
  326. return ErrClientConnClosing
  327. }
  328. b.addrCh = make(chan []Address)
  329. w, err := b.r.Resolve(target)
  330. if err != nil {
  331. b.mu.Unlock()
  332. grpclog.Errorf("grpclb: failed to resolve address: %v, err: %v", target, err)
  333. return err
  334. }
  335. b.w = w
  336. b.mu.Unlock()
  337. balancerAddrsCh := make(chan []remoteBalancerInfo, 1)
  338. // Spawn a goroutine to monitor the name resolution of remote load balancer.
  339. go func() {
  340. for {
  341. if err := b.watchAddrUpdates(w, balancerAddrsCh); err != nil {
  342. grpclog.Warningf("grpclb: the naming watcher stops working due to %v.\n", err)
  343. close(balancerAddrsCh)
  344. return
  345. }
  346. }
  347. }()
  348. // Spawn a goroutine to talk to the remote load balancer.
  349. go func() {
  350. var (
  351. cc *ClientConn
  352. // ccError is closed when there is an error in the current cc.
  353. // A new rb should be picked from rbs and connected.
  354. ccError chan struct{}
  355. rb *remoteBalancerInfo
  356. rbs []remoteBalancerInfo
  357. rbIdx int
  358. )
  359. defer func() {
  360. if ccError != nil {
  361. select {
  362. case <-ccError:
  363. default:
  364. close(ccError)
  365. }
  366. }
  367. if cc != nil {
  368. cc.Close()
  369. }
  370. }()
  371. for {
  372. var ok bool
  373. select {
  374. case rbs, ok = <-balancerAddrsCh:
  375. if !ok {
  376. return
  377. }
  378. foundIdx := -1
  379. if rb != nil {
  380. for i, trb := range rbs {
  381. if trb == *rb {
  382. foundIdx = i
  383. break
  384. }
  385. }
  386. }
  387. if foundIdx >= 0 {
  388. if foundIdx >= 1 {
  389. // Move the address in use to the beginning of the list.
  390. b.rbs[0], b.rbs[foundIdx] = b.rbs[foundIdx], b.rbs[0]
  391. rbIdx = 0
  392. }
  393. continue // If found, don't dial new cc.
  394. } else if len(rbs) > 0 {
  395. // Pick a random one from the list, instead of always using the first one.
  396. if l := len(rbs); l > 1 && rb != nil {
  397. tmpIdx := b.rand.Intn(l - 1)
  398. b.rbs[0], b.rbs[tmpIdx] = b.rbs[tmpIdx], b.rbs[0]
  399. }
  400. rbIdx = 0
  401. rb = &rbs[0]
  402. } else {
  403. // foundIdx < 0 && len(rbs) <= 0.
  404. rb = nil
  405. }
  406. case <-ccError:
  407. ccError = nil
  408. if rbIdx < len(rbs)-1 {
  409. rbIdx++
  410. rb = &rbs[rbIdx]
  411. } else {
  412. rb = nil
  413. }
  414. }
  415. if rb == nil {
  416. continue
  417. }
  418. if cc != nil {
  419. cc.Close()
  420. }
  421. // Talk to the remote load balancer to get the server list.
  422. var (
  423. err error
  424. dopts []DialOption
  425. )
  426. if creds := config.DialCreds; creds != nil {
  427. if rb.name != "" {
  428. if err := creds.OverrideServerName(rb.name); err != nil {
  429. grpclog.Warningf("grpclb: failed to override the server name in the credentials: %v", err)
  430. continue
  431. }
  432. }
  433. dopts = append(dopts, WithTransportCredentials(creds))
  434. } else {
  435. dopts = append(dopts, WithInsecure())
  436. }
  437. if dialer := config.Dialer; dialer != nil {
  438. // WithDialer takes a different type of function, so we instead use a special DialOption here.
  439. dopts = append(dopts, func(o *dialOptions) { o.copts.Dialer = dialer })
  440. }
  441. ccError = make(chan struct{})
  442. cc, err = Dial(rb.addr, dopts...)
  443. if err != nil {
  444. grpclog.Warningf("grpclb: failed to setup a connection to the remote balancer %v: %v", rb.addr, err)
  445. close(ccError)
  446. continue
  447. }
  448. b.mu.Lock()
  449. b.seq++ // tick when getting a new balancer address
  450. seq := b.seq
  451. b.next = 0
  452. b.mu.Unlock()
  453. go func(cc *ClientConn, ccError chan struct{}) {
  454. lbc := &loadBalancerClient{cc}
  455. b.callRemoteBalancer(lbc, seq)
  456. cc.Close()
  457. select {
  458. case <-ccError:
  459. default:
  460. close(ccError)
  461. }
  462. }(cc, ccError)
  463. }
  464. }()
  465. return nil
  466. }
  467. func (b *balancer) down(addr Address, err error) {
  468. b.mu.Lock()
  469. defer b.mu.Unlock()
  470. for _, a := range b.addrs {
  471. if addr == a.addr {
  472. a.connected = false
  473. break
  474. }
  475. }
  476. }
  477. func (b *balancer) Up(addr Address) func(error) {
  478. b.mu.Lock()
  479. defer b.mu.Unlock()
  480. if b.done {
  481. return nil
  482. }
  483. var cnt int
  484. for _, a := range b.addrs {
  485. if a.addr == addr {
  486. if a.connected {
  487. return nil
  488. }
  489. a.connected = true
  490. }
  491. if a.connected && !a.dropForRateLimiting && !a.dropForLoadBalancing {
  492. cnt++
  493. }
  494. }
  495. // addr is the only one which is connected. Notify the Get() callers who are blocking.
  496. if cnt == 1 && b.waitCh != nil {
  497. close(b.waitCh)
  498. b.waitCh = nil
  499. }
  500. return func(err error) {
  501. b.down(addr, err)
  502. }
  503. }
  504. func (b *balancer) Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error) {
  505. var ch chan struct{}
  506. b.mu.Lock()
  507. if b.done {
  508. b.mu.Unlock()
  509. err = ErrClientConnClosing
  510. return
  511. }
  512. seq := b.seq
  513. defer func() {
  514. if err != nil {
  515. return
  516. }
  517. put = func() {
  518. s, ok := rpcInfoFromContext(ctx)
  519. if !ok {
  520. return
  521. }
  522. b.mu.Lock()
  523. defer b.mu.Unlock()
  524. if b.done || seq < b.seq {
  525. return
  526. }
  527. b.clientStats.NumCallsFinished++
  528. if !s.bytesSent {
  529. b.clientStats.NumCallsFinishedWithClientFailedToSend++
  530. } else if s.bytesReceived {
  531. b.clientStats.NumCallsFinishedKnownReceived++
  532. }
  533. }
  534. }()
  535. b.clientStats.NumCallsStarted++
  536. if len(b.addrs) > 0 {
  537. if b.next >= len(b.addrs) {
  538. b.next = 0
  539. }
  540. next := b.next
  541. for {
  542. a := b.addrs[next]
  543. next = (next + 1) % len(b.addrs)
  544. if a.connected {
  545. if !a.dropForRateLimiting && !a.dropForLoadBalancing {
  546. addr = a.addr
  547. b.next = next
  548. b.mu.Unlock()
  549. return
  550. }
  551. if !opts.BlockingWait {
  552. b.next = next
  553. if a.dropForLoadBalancing {
  554. b.clientStats.NumCallsFinished++
  555. b.clientStats.NumCallsFinishedWithDropForLoadBalancing++
  556. } else if a.dropForRateLimiting {
  557. b.clientStats.NumCallsFinished++
  558. b.clientStats.NumCallsFinishedWithDropForRateLimiting++
  559. }
  560. b.mu.Unlock()
  561. err = Errorf(codes.Unavailable, "%s drops requests", a.addr.Addr)
  562. return
  563. }
  564. }
  565. if next == b.next {
  566. // Has iterated all the possible address but none is connected.
  567. break
  568. }
  569. }
  570. }
  571. if !opts.BlockingWait {
  572. if len(b.addrs) == 0 {
  573. b.clientStats.NumCallsFinished++
  574. b.clientStats.NumCallsFinishedWithClientFailedToSend++
  575. b.mu.Unlock()
  576. err = Errorf(codes.Unavailable, "there is no address available")
  577. return
  578. }
  579. // Returns the next addr on b.addrs for a failfast RPC.
  580. addr = b.addrs[b.next].addr
  581. b.next++
  582. b.mu.Unlock()
  583. return
  584. }
  585. // Wait on b.waitCh for non-failfast RPCs.
  586. if b.waitCh == nil {
  587. ch = make(chan struct{})
  588. b.waitCh = ch
  589. } else {
  590. ch = b.waitCh
  591. }
  592. b.mu.Unlock()
  593. for {
  594. select {
  595. case <-ctx.Done():
  596. b.mu.Lock()
  597. b.clientStats.NumCallsFinished++
  598. b.clientStats.NumCallsFinishedWithClientFailedToSend++
  599. b.mu.Unlock()
  600. err = ctx.Err()
  601. return
  602. case <-ch:
  603. b.mu.Lock()
  604. if b.done {
  605. b.clientStats.NumCallsFinished++
  606. b.clientStats.NumCallsFinishedWithClientFailedToSend++
  607. b.mu.Unlock()
  608. err = ErrClientConnClosing
  609. return
  610. }
  611. if len(b.addrs) > 0 {
  612. if b.next >= len(b.addrs) {
  613. b.next = 0
  614. }
  615. next := b.next
  616. for {
  617. a := b.addrs[next]
  618. next = (next + 1) % len(b.addrs)
  619. if a.connected {
  620. if !a.dropForRateLimiting && !a.dropForLoadBalancing {
  621. addr = a.addr
  622. b.next = next
  623. b.mu.Unlock()
  624. return
  625. }
  626. if !opts.BlockingWait {
  627. b.next = next
  628. if a.dropForLoadBalancing {
  629. b.clientStats.NumCallsFinished++
  630. b.clientStats.NumCallsFinishedWithDropForLoadBalancing++
  631. } else if a.dropForRateLimiting {
  632. b.clientStats.NumCallsFinished++
  633. b.clientStats.NumCallsFinishedWithDropForRateLimiting++
  634. }
  635. b.mu.Unlock()
  636. err = Errorf(codes.Unavailable, "drop requests for the addreess %s", a.addr.Addr)
  637. return
  638. }
  639. }
  640. if next == b.next {
  641. // Has iterated all the possible address but none is connected.
  642. break
  643. }
  644. }
  645. }
  646. // The newly added addr got removed by Down() again.
  647. if b.waitCh == nil {
  648. ch = make(chan struct{})
  649. b.waitCh = ch
  650. } else {
  651. ch = b.waitCh
  652. }
  653. b.mu.Unlock()
  654. }
  655. }
  656. }
  657. func (b *balancer) Notify() <-chan []Address {
  658. return b.addrCh
  659. }
  660. func (b *balancer) Close() error {
  661. b.mu.Lock()
  662. defer b.mu.Unlock()
  663. if b.done {
  664. return errBalancerClosed
  665. }
  666. b.done = true
  667. if b.waitCh != nil {
  668. close(b.waitCh)
  669. }
  670. if b.addrCh != nil {
  671. close(b.addrCh)
  672. }
  673. if b.w != nil {
  674. b.w.Close()
  675. }
  676. return nil
  677. }