roundrobin_balanced.go 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. // Copyright 2018 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package picker
  15. import (
  16. "context"
  17. "sync"
  18. "go.uber.org/zap"
  19. "go.uber.org/zap/zapcore"
  20. "google.golang.org/grpc/balancer"
  21. "google.golang.org/grpc/resolver"
  22. )
  23. // newRoundrobinBalanced returns a new roundrobin balanced picker.
  24. func newRoundrobinBalanced(cfg Config) Picker {
  25. scs := make([]balancer.SubConn, 0, len(cfg.SubConnToResolverAddress))
  26. for sc := range cfg.SubConnToResolverAddress {
  27. scs = append(scs, sc)
  28. }
  29. return &rrBalanced{
  30. p: RoundrobinBalanced,
  31. lg: cfg.Logger,
  32. scs: scs,
  33. scToAddr: cfg.SubConnToResolverAddress,
  34. }
  35. }
  36. type rrBalanced struct {
  37. p Policy
  38. lg *zap.Logger
  39. mu sync.RWMutex
  40. next int
  41. scs []balancer.SubConn
  42. scToAddr map[balancer.SubConn]resolver.Address
  43. }
  44. func (rb *rrBalanced) String() string { return rb.p.String() }
  45. // Pick is called for every client request.
  46. func (rb *rrBalanced) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
  47. rb.mu.RLock()
  48. n := len(rb.scs)
  49. rb.mu.RUnlock()
  50. if n == 0 {
  51. return nil, nil, balancer.ErrNoSubConnAvailable
  52. }
  53. rb.mu.Lock()
  54. cur := rb.next
  55. sc := rb.scs[cur]
  56. picked := rb.scToAddr[sc].Addr
  57. rb.next = (rb.next + 1) % len(rb.scs)
  58. rb.mu.Unlock()
  59. rb.lg.Debug(
  60. "picked",
  61. zap.String("picker", rb.p.String()),
  62. zap.String("address", picked),
  63. zap.Int("subconn-index", cur),
  64. zap.Int("subconn-size", n),
  65. )
  66. doneFunc := func(info balancer.DoneInfo) {
  67. // TODO: error handling?
  68. fss := []zapcore.Field{
  69. zap.Error(info.Err),
  70. zap.String("picker", rb.p.String()),
  71. zap.String("address", picked),
  72. zap.Bool("success", info.Err == nil),
  73. zap.Bool("bytes-sent", info.BytesSent),
  74. zap.Bool("bytes-received", info.BytesReceived),
  75. }
  76. if info.Err == nil {
  77. rb.lg.Debug("balancer done", fss...)
  78. } else {
  79. rb.lg.Warn("balancer failed", fss...)
  80. }
  81. }
  82. return sc, doneFunc, nil
  83. }