grpclb_remote_balancer.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. /*
  2. *
  3. * Copyright 2017 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "fmt"
  21. "net"
  22. "reflect"
  23. "time"
  24. "golang.org/x/net/context"
  25. "google.golang.org/grpc/balancer"
  26. "google.golang.org/grpc/channelz"
  27. "google.golang.org/grpc/connectivity"
  28. lbpb "google.golang.org/grpc/grpclb/grpc_lb_v1/messages"
  29. "google.golang.org/grpc/grpclog"
  30. "google.golang.org/grpc/metadata"
  31. "google.golang.org/grpc/resolver"
  32. )
  33. // processServerList updates balaner's internal state, create/remove SubConns
  34. // and regenerates picker using the received serverList.
  35. func (lb *lbBalancer) processServerList(l *lbpb.ServerList) {
  36. grpclog.Infof("lbBalancer: processing server list: %+v", l)
  37. lb.mu.Lock()
  38. defer lb.mu.Unlock()
  39. // Set serverListReceived to true so fallback will not take effect if it has
  40. // not hit timeout.
  41. lb.serverListReceived = true
  42. // If the new server list == old server list, do nothing.
  43. if reflect.DeepEqual(lb.fullServerList, l.Servers) {
  44. grpclog.Infof("lbBalancer: new serverlist same as the previous one, ignoring")
  45. return
  46. }
  47. lb.fullServerList = l.Servers
  48. var backendAddrs []resolver.Address
  49. for _, s := range l.Servers {
  50. if s.DropForLoadBalancing || s.DropForRateLimiting {
  51. continue
  52. }
  53. md := metadata.Pairs(lbTokeyKey, s.LoadBalanceToken)
  54. ip := net.IP(s.IpAddress)
  55. ipStr := ip.String()
  56. if ip.To4() == nil {
  57. // Add square brackets to ipv6 addresses, otherwise net.Dial() and
  58. // net.SplitHostPort() will return too many colons error.
  59. ipStr = fmt.Sprintf("[%s]", ipStr)
  60. }
  61. addr := resolver.Address{
  62. Addr: fmt.Sprintf("%s:%d", ipStr, s.Port),
  63. Metadata: &md,
  64. }
  65. backendAddrs = append(backendAddrs, addr)
  66. }
  67. // Call refreshSubConns to create/remove SubConns.
  68. lb.refreshSubConns(backendAddrs)
  69. // Regenerate and update picker no matter if there's update on backends (if
  70. // any SubConn will be newed/removed). Because since the full serverList was
  71. // different, there might be updates in drops or pick weights(different
  72. // number of duplicates). We need to update picker with the fulllist.
  73. //
  74. // Now with cache, even if SubConn was newed/removed, there might be no
  75. // state changes.
  76. lb.regeneratePicker()
  77. lb.cc.UpdateBalancerState(lb.state, lb.picker)
  78. }
  79. // refreshSubConns creates/removes SubConns with backendAddrs. It returns a bool
  80. // indicating whether the backendAddrs are different from the cached
  81. // backendAddrs (whether any SubConn was newed/removed).
  82. // Caller must hold lb.mu.
  83. func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address) bool {
  84. lb.backendAddrs = nil
  85. var backendsUpdated bool
  86. // addrsSet is the set converted from backendAddrs, it's used to quick
  87. // lookup for an address.
  88. addrsSet := make(map[resolver.Address]struct{})
  89. // Create new SubConns.
  90. for _, addr := range backendAddrs {
  91. addrWithoutMD := addr
  92. addrWithoutMD.Metadata = nil
  93. addrsSet[addrWithoutMD] = struct{}{}
  94. lb.backendAddrs = append(lb.backendAddrs, addrWithoutMD)
  95. if _, ok := lb.subConns[addrWithoutMD]; !ok {
  96. backendsUpdated = true
  97. // Use addrWithMD to create the SubConn.
  98. sc, err := lb.cc.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{})
  99. if err != nil {
  100. grpclog.Warningf("roundrobinBalancer: failed to create new SubConn: %v", err)
  101. continue
  102. }
  103. lb.subConns[addrWithoutMD] = sc // Use the addr without MD as key for the map.
  104. if _, ok := lb.scStates[sc]; !ok {
  105. // Only set state of new sc to IDLE. The state could already be
  106. // READY for cached SubConns.
  107. lb.scStates[sc] = connectivity.Idle
  108. }
  109. sc.Connect()
  110. }
  111. }
  112. for a, sc := range lb.subConns {
  113. // a was removed by resolver.
  114. if _, ok := addrsSet[a]; !ok {
  115. backendsUpdated = true
  116. lb.cc.RemoveSubConn(sc)
  117. delete(lb.subConns, a)
  118. // Keep the state of this sc in b.scStates until sc's state becomes Shutdown.
  119. // The entry will be deleted in HandleSubConnStateChange.
  120. }
  121. }
  122. return backendsUpdated
  123. }
  124. func (lb *lbBalancer) readServerList(s *balanceLoadClientStream) error {
  125. for {
  126. reply, err := s.Recv()
  127. if err != nil {
  128. return fmt.Errorf("grpclb: failed to recv server list: %v", err)
  129. }
  130. if serverList := reply.GetServerList(); serverList != nil {
  131. lb.processServerList(serverList)
  132. }
  133. }
  134. }
  135. func (lb *lbBalancer) sendLoadReport(s *balanceLoadClientStream, interval time.Duration) {
  136. ticker := time.NewTicker(interval)
  137. defer ticker.Stop()
  138. for {
  139. select {
  140. case <-ticker.C:
  141. case <-s.Context().Done():
  142. return
  143. }
  144. stats := lb.clientStats.toClientStats()
  145. t := time.Now()
  146. stats.Timestamp = &lbpb.Timestamp{
  147. Seconds: t.Unix(),
  148. Nanos: int32(t.Nanosecond()),
  149. }
  150. if err := s.Send(&lbpb.LoadBalanceRequest{
  151. LoadBalanceRequestType: &lbpb.LoadBalanceRequest_ClientStats{
  152. ClientStats: stats,
  153. },
  154. }); err != nil {
  155. return
  156. }
  157. }
  158. }
  159. func (lb *lbBalancer) callRemoteBalancer() error {
  160. lbClient := &loadBalancerClient{cc: lb.ccRemoteLB}
  161. ctx, cancel := context.WithCancel(context.Background())
  162. defer cancel()
  163. stream, err := lbClient.BalanceLoad(ctx, FailFast(false))
  164. if err != nil {
  165. return fmt.Errorf("grpclb: failed to perform RPC to the remote balancer %v", err)
  166. }
  167. // grpclb handshake on the stream.
  168. initReq := &lbpb.LoadBalanceRequest{
  169. LoadBalanceRequestType: &lbpb.LoadBalanceRequest_InitialRequest{
  170. InitialRequest: &lbpb.InitialLoadBalanceRequest{
  171. Name: lb.target,
  172. },
  173. },
  174. }
  175. if err := stream.Send(initReq); err != nil {
  176. return fmt.Errorf("grpclb: failed to send init request: %v", err)
  177. }
  178. reply, err := stream.Recv()
  179. if err != nil {
  180. return fmt.Errorf("grpclb: failed to recv init response: %v", err)
  181. }
  182. initResp := reply.GetInitialResponse()
  183. if initResp == nil {
  184. return fmt.Errorf("grpclb: reply from remote balancer did not include initial response")
  185. }
  186. if initResp.LoadBalancerDelegate != "" {
  187. return fmt.Errorf("grpclb: Delegation is not supported")
  188. }
  189. go func() {
  190. if d := convertDuration(initResp.ClientStatsReportInterval); d > 0 {
  191. lb.sendLoadReport(stream, d)
  192. }
  193. }()
  194. return lb.readServerList(stream)
  195. }
  196. func (lb *lbBalancer) watchRemoteBalancer() {
  197. for {
  198. err := lb.callRemoteBalancer()
  199. select {
  200. case <-lb.doneCh:
  201. return
  202. default:
  203. if err != nil {
  204. grpclog.Error(err)
  205. }
  206. }
  207. }
  208. }
  209. func (lb *lbBalancer) dialRemoteLB(remoteLBName string) {
  210. var dopts []DialOption
  211. if creds := lb.opt.DialCreds; creds != nil {
  212. if err := creds.OverrideServerName(remoteLBName); err == nil {
  213. dopts = append(dopts, WithTransportCredentials(creds))
  214. } else {
  215. grpclog.Warningf("grpclb: failed to override the server name in the credentials: %v, using Insecure", err)
  216. dopts = append(dopts, WithInsecure())
  217. }
  218. } else {
  219. dopts = append(dopts, WithInsecure())
  220. }
  221. if lb.opt.Dialer != nil {
  222. // WithDialer takes a different type of function, so we instead use a
  223. // special DialOption here.
  224. dopts = append(dopts, withContextDialer(lb.opt.Dialer))
  225. }
  226. // Explicitly set pickfirst as the balancer.
  227. dopts = append(dopts, WithBalancerName(PickFirstBalancerName))
  228. dopts = append(dopts, withResolverBuilder(lb.manualResolver))
  229. if channelz.IsOn() {
  230. dopts = append(dopts, WithChannelzParentID(lb.opt.ChannelzParentID))
  231. }
  232. // DialContext using manualResolver.Scheme, which is a random scheme generated
  233. // when init grpclb. The target name is not important.
  234. cc, err := DialContext(context.Background(), "grpclb:///grpclb.server", dopts...)
  235. if err != nil {
  236. grpclog.Fatalf("failed to dial: %v", err)
  237. }
  238. lb.ccRemoteLB = cc
  239. go lb.watchRemoteBalancer()
  240. }