| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272 |
- // Copyright 2018 The etcd Authors
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package balancer
- import (
- "fmt"
- "strconv"
- "sync"
- "time"
- "go.etcd.io/etcd/clientv3/balancer/picker"
- "go.uber.org/zap"
- "google.golang.org/grpc/balancer"
- "google.golang.org/grpc/connectivity"
- "google.golang.org/grpc/resolver"
- _ "google.golang.org/grpc/resolver/dns" // register DNS resolver
- _ "google.golang.org/grpc/resolver/passthrough" // register passthrough resolver
- )
- // RegisterBuilder creates and registers a builder. Since this function calls balancer.Register, it
- // must be invoked at initialization time.
- func RegisterBuilder(cfg Config) {
- bb := &builder{cfg}
- balancer.Register(bb)
- bb.cfg.Logger.Debug(
- "registered balancer",
- zap.String("policy", bb.cfg.Policy.String()),
- zap.String("name", bb.cfg.Name),
- )
- }
- type builder struct {
- cfg Config
- }
- // Build is called initially when creating "ccBalancerWrapper".
- // "grpc.Dial" is called to this client connection.
- // Then, resolved addresses will be handled via "HandleResolvedAddrs".
- func (b *builder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
- bb := &baseBalancer{
- id: strconv.FormatInt(time.Now().UnixNano(), 36),
- policy: b.cfg.Policy,
- name: b.cfg.Name,
- lg: b.cfg.Logger,
- addrToSc: make(map[resolver.Address]balancer.SubConn),
- scToAddr: make(map[balancer.SubConn]resolver.Address),
- scToSt: make(map[balancer.SubConn]connectivity.State),
- currentConn: nil,
- csEvltr: &connectivityStateEvaluator{},
- // initialize picker always returns "ErrNoSubConnAvailable"
- Picker: picker.NewErr(balancer.ErrNoSubConnAvailable),
- }
- if bb.lg == nil {
- bb.lg = zap.NewNop()
- }
- // TODO: support multiple connections
- bb.mu.Lock()
- bb.currentConn = cc
- bb.mu.Unlock()
- bb.lg.Info(
- "built balancer",
- zap.String("balancer-id", bb.id),
- zap.String("policy", bb.policy.String()),
- zap.String("resolver-target", cc.Target()),
- )
- return bb
- }
- // Name implements "grpc/balancer.Builder" interface.
- func (b *builder) Name() string { return b.cfg.Name }
- // Balancer defines client balancer interface.
- type Balancer interface {
- // Balancer is called on specified client connection. Client initiates gRPC
- // connection with "grpc.Dial(addr, grpc.WithBalancerName)", and then those resolved
- // addresses are passed to "grpc/balancer.Balancer.HandleResolvedAddrs".
- // For each resolved address, balancer calls "balancer.ClientConn.NewSubConn".
- // "grpc/balancer.Balancer.HandleSubConnStateChange" is called when connectivity state
- // changes, thus requires failover logic in this method.
- balancer.Balancer
- // Picker calls "Pick" for every client request.
- picker.Picker
- }
- type baseBalancer struct {
- id string
- policy picker.Policy
- name string
- lg *zap.Logger
- mu sync.RWMutex
- addrToSc map[resolver.Address]balancer.SubConn
- scToAddr map[balancer.SubConn]resolver.Address
- scToSt map[balancer.SubConn]connectivity.State
- currentConn balancer.ClientConn
- currentState connectivity.State
- csEvltr *connectivityStateEvaluator
- picker.Picker
- }
- // HandleResolvedAddrs implements "grpc/balancer.Balancer" interface.
- // gRPC sends initial or updated resolved addresses from "Build".
- func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
- if err != nil {
- bb.lg.Warn("HandleResolvedAddrs called with error", zap.String("balancer-id", bb.id), zap.Error(err))
- return
- }
- bb.lg.Info("resolved", zap.String("balancer-id", bb.id), zap.Strings("addresses", addrsToStrings(addrs)))
- bb.mu.Lock()
- defer bb.mu.Unlock()
- resolved := make(map[resolver.Address]struct{})
- for _, addr := range addrs {
- resolved[addr] = struct{}{}
- if _, ok := bb.addrToSc[addr]; !ok {
- sc, err := bb.currentConn.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{})
- if err != nil {
- bb.lg.Warn("NewSubConn failed", zap.String("balancer-id", bb.id), zap.Error(err), zap.String("address", addr.Addr))
- continue
- }
- bb.addrToSc[addr] = sc
- bb.scToAddr[sc] = addr
- bb.scToSt[sc] = connectivity.Idle
- sc.Connect()
- }
- }
- for addr, sc := range bb.addrToSc {
- if _, ok := resolved[addr]; !ok {
- // was removed by resolver or failed to create subconn
- bb.currentConn.RemoveSubConn(sc)
- delete(bb.addrToSc, addr)
- bb.lg.Info(
- "removed subconn",
- zap.String("balancer-id", bb.id),
- zap.String("address", addr.Addr),
- zap.String("subconn", scToString(sc)),
- )
- // Keep the state of this sc in bb.scToSt until sc's state becomes Shutdown.
- // The entry will be deleted in HandleSubConnStateChange.
- // (DO NOT) delete(bb.scToAddr, sc)
- // (DO NOT) delete(bb.scToSt, sc)
- }
- }
- }
- // HandleSubConnStateChange implements "grpc/balancer.Balancer" interface.
- func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
- bb.mu.Lock()
- defer bb.mu.Unlock()
- old, ok := bb.scToSt[sc]
- if !ok {
- bb.lg.Warn(
- "state change for an unknown subconn",
- zap.String("balancer-id", bb.id),
- zap.String("subconn", scToString(sc)),
- zap.String("state", s.String()),
- )
- return
- }
- bb.lg.Info(
- "state changed",
- zap.String("balancer-id", bb.id),
- zap.Bool("connected", s == connectivity.Ready),
- zap.String("subconn", scToString(sc)),
- zap.String("address", bb.scToAddr[sc].Addr),
- zap.String("old-state", old.String()),
- zap.String("new-state", s.String()),
- )
- bb.scToSt[sc] = s
- switch s {
- case connectivity.Idle:
- sc.Connect()
- case connectivity.Shutdown:
- // When an address was removed by resolver, b called RemoveSubConn but
- // kept the sc's state in scToSt. Remove state for this sc here.
- delete(bb.scToAddr, sc)
- delete(bb.scToSt, sc)
- }
- oldAggrState := bb.currentState
- bb.currentState = bb.csEvltr.recordTransition(old, s)
- // Regenerate picker when one of the following happens:
- // - this sc became ready from not-ready
- // - this sc became not-ready from ready
- // - the aggregated state of balancer became TransientFailure from non-TransientFailure
- // - the aggregated state of balancer became non-TransientFailure from TransientFailure
- if (s == connectivity.Ready) != (old == connectivity.Ready) ||
- (bb.currentState == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) {
- bb.regeneratePicker()
- }
- bb.currentConn.UpdateBalancerState(bb.currentState, bb.Picker)
- return
- }
- func (bb *baseBalancer) regeneratePicker() {
- if bb.currentState == connectivity.TransientFailure {
- bb.lg.Info(
- "generated transient error picker",
- zap.String("balancer-id", bb.id),
- zap.String("policy", bb.policy.String()),
- )
- bb.Picker = picker.NewErr(balancer.ErrTransientFailure)
- return
- }
- // only pass ready subconns to picker
- scs := make([]balancer.SubConn, 0)
- addrToSc := make(map[resolver.Address]balancer.SubConn)
- scToAddr := make(map[balancer.SubConn]resolver.Address)
- for addr, sc := range bb.addrToSc {
- if st, ok := bb.scToSt[sc]; ok && st == connectivity.Ready {
- scs = append(scs, sc)
- addrToSc[addr] = sc
- scToAddr[sc] = addr
- }
- }
- switch bb.policy {
- case picker.RoundrobinBalanced:
- bb.Picker = picker.NewRoundrobinBalanced(bb.lg, scs, addrToSc, scToAddr)
- default:
- panic(fmt.Errorf("invalid balancer picker policy (%d)", bb.policy))
- }
- bb.lg.Info(
- "generated picker",
- zap.String("balancer-id", bb.id),
- zap.String("policy", bb.policy.String()),
- zap.Strings("subconn-ready", scsToStrings(addrToSc)),
- zap.Int("subconn-size", len(addrToSc)),
- )
- }
- // Close implements "grpc/balancer.Balancer" interface.
- // Close is a nop because base balancer doesn't have internal state to clean up,
- // and it doesn't need to call RemoveSubConn for the SubConns.
- func (bb *baseBalancer) Close() {
- // TODO
- }
|