grpclb_picker.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. /*
  2. *
  3. * Copyright 2017 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "sync"
  21. "sync/atomic"
  22. "golang.org/x/net/context"
  23. "google.golang.org/grpc/balancer"
  24. "google.golang.org/grpc/codes"
  25. lbpb "google.golang.org/grpc/grpclb/grpc_lb_v1/messages"
  26. "google.golang.org/grpc/status"
  27. )
  28. type rpcStats struct {
  29. NumCallsStarted int64
  30. NumCallsFinished int64
  31. NumCallsFinishedWithDropForRateLimiting int64
  32. NumCallsFinishedWithDropForLoadBalancing int64
  33. NumCallsFinishedWithClientFailedToSend int64
  34. NumCallsFinishedKnownReceived int64
  35. }
  36. // toClientStats converts rpcStats to lbpb.ClientStats, and clears rpcStats.
  37. func (s *rpcStats) toClientStats() *lbpb.ClientStats {
  38. stats := &lbpb.ClientStats{
  39. NumCallsStarted: atomic.SwapInt64(&s.NumCallsStarted, 0),
  40. NumCallsFinished: atomic.SwapInt64(&s.NumCallsFinished, 0),
  41. NumCallsFinishedWithDropForRateLimiting: atomic.SwapInt64(&s.NumCallsFinishedWithDropForRateLimiting, 0),
  42. NumCallsFinishedWithDropForLoadBalancing: atomic.SwapInt64(&s.NumCallsFinishedWithDropForLoadBalancing, 0),
  43. NumCallsFinishedWithClientFailedToSend: atomic.SwapInt64(&s.NumCallsFinishedWithClientFailedToSend, 0),
  44. NumCallsFinishedKnownReceived: atomic.SwapInt64(&s.NumCallsFinishedKnownReceived, 0),
  45. }
  46. return stats
  47. }
  48. func (s *rpcStats) dropForRateLimiting() {
  49. atomic.AddInt64(&s.NumCallsStarted, 1)
  50. atomic.AddInt64(&s.NumCallsFinishedWithDropForRateLimiting, 1)
  51. atomic.AddInt64(&s.NumCallsFinished, 1)
  52. }
  53. func (s *rpcStats) dropForLoadBalancing() {
  54. atomic.AddInt64(&s.NumCallsStarted, 1)
  55. atomic.AddInt64(&s.NumCallsFinishedWithDropForLoadBalancing, 1)
  56. atomic.AddInt64(&s.NumCallsFinished, 1)
  57. }
  58. func (s *rpcStats) failedToSend() {
  59. atomic.AddInt64(&s.NumCallsStarted, 1)
  60. atomic.AddInt64(&s.NumCallsFinishedWithClientFailedToSend, 1)
  61. atomic.AddInt64(&s.NumCallsFinished, 1)
  62. }
  63. func (s *rpcStats) knownReceived() {
  64. atomic.AddInt64(&s.NumCallsStarted, 1)
  65. atomic.AddInt64(&s.NumCallsFinishedKnownReceived, 1)
  66. atomic.AddInt64(&s.NumCallsFinished, 1)
  67. }
  68. type errPicker struct {
  69. // Pick always returns this err.
  70. err error
  71. }
  72. func (p *errPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
  73. return nil, nil, p.err
  74. }
  75. // rrPicker does roundrobin on subConns. It's typically used when there's no
  76. // response from remote balancer, and grpclb falls back to the resolved
  77. // backends.
  78. //
  79. // It guaranteed that len(subConns) > 0.
  80. type rrPicker struct {
  81. mu sync.Mutex
  82. subConns []balancer.SubConn // The subConns that were READY when taking the snapshot.
  83. subConnsNext int
  84. }
  85. func (p *rrPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
  86. p.mu.Lock()
  87. defer p.mu.Unlock()
  88. sc := p.subConns[p.subConnsNext]
  89. p.subConnsNext = (p.subConnsNext + 1) % len(p.subConns)
  90. return sc, nil, nil
  91. }
  92. // lbPicker does two layers of picks:
  93. //
  94. // First layer: roundrobin on all servers in serverList, including drops and backends.
  95. // - If it picks a drop, the RPC will fail as being dropped.
  96. // - If it picks a backend, do a second layer pick to pick the real backend.
  97. //
  98. // Second layer: roundrobin on all READY backends.
  99. //
  100. // It's guaranteed that len(serverList) > 0.
  101. type lbPicker struct {
  102. mu sync.Mutex
  103. serverList []*lbpb.Server
  104. serverListNext int
  105. subConns []balancer.SubConn // The subConns that were READY when taking the snapshot.
  106. subConnsNext int
  107. stats *rpcStats
  108. }
  109. func (p *lbPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
  110. p.mu.Lock()
  111. defer p.mu.Unlock()
  112. // Layer one roundrobin on serverList.
  113. s := p.serverList[p.serverListNext]
  114. p.serverListNext = (p.serverListNext + 1) % len(p.serverList)
  115. // If it's a drop, return an error and fail the RPC.
  116. if s.DropForRateLimiting {
  117. p.stats.dropForRateLimiting()
  118. return nil, nil, status.Errorf(codes.Unavailable, "request dropped by grpclb")
  119. }
  120. if s.DropForLoadBalancing {
  121. p.stats.dropForLoadBalancing()
  122. return nil, nil, status.Errorf(codes.Unavailable, "request dropped by grpclb")
  123. }
  124. // If not a drop but there's no ready subConns.
  125. if len(p.subConns) <= 0 {
  126. return nil, nil, balancer.ErrNoSubConnAvailable
  127. }
  128. // Return the next ready subConn in the list, also collect rpc stats.
  129. sc := p.subConns[p.subConnsNext]
  130. p.subConnsNext = (p.subConnsNext + 1) % len(p.subConns)
  131. done := func(info balancer.DoneInfo) {
  132. if !info.BytesSent {
  133. p.stats.failedToSend()
  134. } else if info.BytesReceived {
  135. p.stats.knownReceived()
  136. }
  137. }
  138. return sc, done, nil
  139. }