grpclb.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341
  1. /*
  2. *
  3. * Copyright 2016 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "strconv"
  21. "strings"
  22. "sync"
  23. "time"
  24. "golang.org/x/net/context"
  25. "google.golang.org/grpc/balancer"
  26. "google.golang.org/grpc/connectivity"
  27. lbpb "google.golang.org/grpc/grpclb/grpc_lb_v1/messages"
  28. "google.golang.org/grpc/grpclog"
  29. "google.golang.org/grpc/resolver"
  30. )
  31. const (
  32. lbTokeyKey = "lb-token"
  33. defaultFallbackTimeout = 10 * time.Second
  34. grpclbName = "grpclb"
  35. )
  36. func convertDuration(d *lbpb.Duration) time.Duration {
  37. if d == nil {
  38. return 0
  39. }
  40. return time.Duration(d.Seconds)*time.Second + time.Duration(d.Nanos)*time.Nanosecond
  41. }
  42. // Client API for LoadBalancer service.
  43. // Mostly copied from generated pb.go file.
  44. // To avoid circular dependency.
  45. type loadBalancerClient struct {
  46. cc *ClientConn
  47. }
  48. func (c *loadBalancerClient) BalanceLoad(ctx context.Context, opts ...CallOption) (*balanceLoadClientStream, error) {
  49. desc := &StreamDesc{
  50. StreamName: "BalanceLoad",
  51. ServerStreams: true,
  52. ClientStreams: true,
  53. }
  54. stream, err := c.cc.NewStream(ctx, desc, "/grpc.lb.v1.LoadBalancer/BalanceLoad", opts...)
  55. if err != nil {
  56. return nil, err
  57. }
  58. x := &balanceLoadClientStream{stream}
  59. return x, nil
  60. }
  61. type balanceLoadClientStream struct {
  62. ClientStream
  63. }
  64. func (x *balanceLoadClientStream) Send(m *lbpb.LoadBalanceRequest) error {
  65. return x.ClientStream.SendMsg(m)
  66. }
  67. func (x *balanceLoadClientStream) Recv() (*lbpb.LoadBalanceResponse, error) {
  68. m := new(lbpb.LoadBalanceResponse)
  69. if err := x.ClientStream.RecvMsg(m); err != nil {
  70. return nil, err
  71. }
  72. return m, nil
  73. }
  74. func init() {
  75. balancer.Register(newLBBuilder())
  76. }
  77. // newLBBuilder creates a builder for grpclb.
  78. func newLBBuilder() balancer.Builder {
  79. return NewLBBuilderWithFallbackTimeout(defaultFallbackTimeout)
  80. }
  81. // NewLBBuilderWithFallbackTimeout creates a grpclb builder with the given
  82. // fallbackTimeout. If no response is received from the remote balancer within
  83. // fallbackTimeout, the backend addresses from the resolved address list will be
  84. // used.
  85. //
  86. // Only call this function when a non-default fallback timeout is needed.
  87. func NewLBBuilderWithFallbackTimeout(fallbackTimeout time.Duration) balancer.Builder {
  88. return &lbBuilder{
  89. fallbackTimeout: fallbackTimeout,
  90. }
  91. }
  92. type lbBuilder struct {
  93. fallbackTimeout time.Duration
  94. }
  95. func (b *lbBuilder) Name() string {
  96. return grpclbName
  97. }
  98. func (b *lbBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
  99. // This generates a manual resolver builder with a random scheme. This
  100. // scheme will be used to dial to remote LB, so we can send filtered address
  101. // updates to remote LB ClientConn using this manual resolver.
  102. scheme := "grpclb_internal_" + strconv.FormatInt(time.Now().UnixNano(), 36)
  103. r := &lbManualResolver{scheme: scheme, ccb: cc}
  104. var target string
  105. targetSplitted := strings.Split(cc.Target(), ":///")
  106. if len(targetSplitted) < 2 {
  107. target = cc.Target()
  108. } else {
  109. target = targetSplitted[1]
  110. }
  111. lb := &lbBalancer{
  112. cc: newLBCacheClientConn(cc),
  113. target: target,
  114. opt: opt,
  115. fallbackTimeout: b.fallbackTimeout,
  116. doneCh: make(chan struct{}),
  117. manualResolver: r,
  118. csEvltr: &connectivityStateEvaluator{},
  119. subConns: make(map[resolver.Address]balancer.SubConn),
  120. scStates: make(map[balancer.SubConn]connectivity.State),
  121. picker: &errPicker{err: balancer.ErrNoSubConnAvailable},
  122. clientStats: &rpcStats{},
  123. }
  124. return lb
  125. }
  126. type lbBalancer struct {
  127. cc *lbCacheClientConn
  128. target string
  129. opt balancer.BuildOptions
  130. fallbackTimeout time.Duration
  131. doneCh chan struct{}
  132. // manualResolver is used in the remote LB ClientConn inside grpclb. When
  133. // resolved address updates are received by grpclb, filtered updates will be
  134. // send to remote LB ClientConn through this resolver.
  135. manualResolver *lbManualResolver
  136. // The ClientConn to talk to the remote balancer.
  137. ccRemoteLB *ClientConn
  138. // Support client side load reporting. Each picker gets a reference to this,
  139. // and will update its content.
  140. clientStats *rpcStats
  141. mu sync.Mutex // guards everything following.
  142. // The full server list including drops, used to check if the newly received
  143. // serverList contains anything new. Each generate picker will also have
  144. // reference to this list to do the first layer pick.
  145. fullServerList []*lbpb.Server
  146. // All backends addresses, with metadata set to nil. This list contains all
  147. // backend addresses in the same order and with the same duplicates as in
  148. // serverlist. When generating picker, a SubConn slice with the same order
  149. // but with only READY SCs will be gerenated.
  150. backendAddrs []resolver.Address
  151. // Roundrobin functionalities.
  152. csEvltr *connectivityStateEvaluator
  153. state connectivity.State
  154. subConns map[resolver.Address]balancer.SubConn // Used to new/remove SubConn.
  155. scStates map[balancer.SubConn]connectivity.State // Used to filter READY SubConns.
  156. picker balancer.Picker
  157. // Support fallback to resolved backend addresses if there's no response
  158. // from remote balancer within fallbackTimeout.
  159. fallbackTimerExpired bool
  160. serverListReceived bool
  161. // resolvedBackendAddrs is resolvedAddrs minus remote balancers. It's set
  162. // when resolved address updates are received, and read in the goroutine
  163. // handling fallback.
  164. resolvedBackendAddrs []resolver.Address
  165. }
  166. // regeneratePicker takes a snapshot of the balancer, and generates a picker from
  167. // it. The picker
  168. // - always returns ErrTransientFailure if the balancer is in TransientFailure,
  169. // - does two layer roundrobin pick otherwise.
  170. // Caller must hold lb.mu.
  171. func (lb *lbBalancer) regeneratePicker() {
  172. if lb.state == connectivity.TransientFailure {
  173. lb.picker = &errPicker{err: balancer.ErrTransientFailure}
  174. return
  175. }
  176. var readySCs []balancer.SubConn
  177. for _, a := range lb.backendAddrs {
  178. if sc, ok := lb.subConns[a]; ok {
  179. if st, ok := lb.scStates[sc]; ok && st == connectivity.Ready {
  180. readySCs = append(readySCs, sc)
  181. }
  182. }
  183. }
  184. if len(lb.fullServerList) <= 0 {
  185. if len(readySCs) <= 0 {
  186. lb.picker = &errPicker{err: balancer.ErrNoSubConnAvailable}
  187. return
  188. }
  189. lb.picker = &rrPicker{subConns: readySCs}
  190. return
  191. }
  192. lb.picker = &lbPicker{
  193. serverList: lb.fullServerList,
  194. subConns: readySCs,
  195. stats: lb.clientStats,
  196. }
  197. }
  198. func (lb *lbBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
  199. grpclog.Infof("lbBalancer: handle SubConn state change: %p, %v", sc, s)
  200. lb.mu.Lock()
  201. defer lb.mu.Unlock()
  202. oldS, ok := lb.scStates[sc]
  203. if !ok {
  204. grpclog.Infof("lbBalancer: got state changes for an unknown SubConn: %p, %v", sc, s)
  205. return
  206. }
  207. lb.scStates[sc] = s
  208. switch s {
  209. case connectivity.Idle:
  210. sc.Connect()
  211. case connectivity.Shutdown:
  212. // When an address was removed by resolver, b called RemoveSubConn but
  213. // kept the sc's state in scStates. Remove state for this sc here.
  214. delete(lb.scStates, sc)
  215. }
  216. oldAggrState := lb.state
  217. lb.state = lb.csEvltr.recordTransition(oldS, s)
  218. // Regenerate picker when one of the following happens:
  219. // - this sc became ready from not-ready
  220. // - this sc became not-ready from ready
  221. // - the aggregated state of balancer became TransientFailure from non-TransientFailure
  222. // - the aggregated state of balancer became non-TransientFailure from TransientFailure
  223. if (oldS == connectivity.Ready) != (s == connectivity.Ready) ||
  224. (lb.state == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) {
  225. lb.regeneratePicker()
  226. }
  227. lb.cc.UpdateBalancerState(lb.state, lb.picker)
  228. }
  229. // fallbackToBackendsAfter blocks for fallbackTimeout and falls back to use
  230. // resolved backends (backends received from resolver, not from remote balancer)
  231. // if no connection to remote balancers was successful.
  232. func (lb *lbBalancer) fallbackToBackendsAfter(fallbackTimeout time.Duration) {
  233. timer := time.NewTimer(fallbackTimeout)
  234. defer timer.Stop()
  235. select {
  236. case <-timer.C:
  237. case <-lb.doneCh:
  238. return
  239. }
  240. lb.mu.Lock()
  241. if lb.serverListReceived {
  242. lb.mu.Unlock()
  243. return
  244. }
  245. lb.fallbackTimerExpired = true
  246. lb.refreshSubConns(lb.resolvedBackendAddrs)
  247. lb.mu.Unlock()
  248. }
  249. // HandleResolvedAddrs sends the updated remoteLB addresses to remoteLB
  250. // clientConn. The remoteLB clientConn will handle creating/removing remoteLB
  251. // connections.
  252. func (lb *lbBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
  253. grpclog.Infof("lbBalancer: handleResolvedResult: %+v", addrs)
  254. if len(addrs) <= 0 {
  255. return
  256. }
  257. var remoteBalancerAddrs, backendAddrs []resolver.Address
  258. for _, a := range addrs {
  259. if a.Type == resolver.GRPCLB {
  260. remoteBalancerAddrs = append(remoteBalancerAddrs, a)
  261. } else {
  262. backendAddrs = append(backendAddrs, a)
  263. }
  264. }
  265. if lb.ccRemoteLB == nil {
  266. if len(remoteBalancerAddrs) <= 0 {
  267. grpclog.Errorf("grpclb: no remote balancer address is available, should never happen")
  268. return
  269. }
  270. // First time receiving resolved addresses, create a cc to remote
  271. // balancers.
  272. lb.dialRemoteLB(remoteBalancerAddrs[0].ServerName)
  273. // Start the fallback goroutine.
  274. go lb.fallbackToBackendsAfter(lb.fallbackTimeout)
  275. }
  276. // cc to remote balancers uses lb.manualResolver. Send the updated remote
  277. // balancer addresses to it through manualResolver.
  278. lb.manualResolver.NewAddress(remoteBalancerAddrs)
  279. lb.mu.Lock()
  280. lb.resolvedBackendAddrs = backendAddrs
  281. // If serverListReceived is true, connection to remote balancer was
  282. // successful and there's no need to do fallback anymore.
  283. // If fallbackTimerExpired is false, fallback hasn't happened yet.
  284. if !lb.serverListReceived && lb.fallbackTimerExpired {
  285. // This means we received a new list of resolved backends, and we are
  286. // still in fallback mode. Need to update the list of backends we are
  287. // using to the new list of backends.
  288. lb.refreshSubConns(lb.resolvedBackendAddrs)
  289. }
  290. lb.mu.Unlock()
  291. }
  292. func (lb *lbBalancer) Close() {
  293. select {
  294. case <-lb.doneCh:
  295. return
  296. default:
  297. }
  298. close(lb.doneCh)
  299. if lb.ccRemoteLB != nil {
  300. lb.ccRemoteLB.Close()
  301. }
  302. lb.cc.close()
  303. }