| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356 |
- // Copyright 2016 The etcd Authors
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package clientv3
- import (
- "net/url"
- "strings"
- "sync"
- "golang.org/x/net/context"
- "google.golang.org/grpc"
- "google.golang.org/grpc/codes"
- )
- // ErrNoAddrAvilable is returned by Get() when the balancer does not have
- // any active connection to endpoints at the time.
- // This error is returned only when opts.BlockingWait is true.
- var ErrNoAddrAvilable = grpc.Errorf(codes.Unavailable, "there is no address available")
- // simpleBalancer does the bare minimum to expose multiple eps
- // to the grpc reconnection code path
- type simpleBalancer struct {
- // addrs are the client's endpoints for grpc
- addrs []grpc.Address
- // notifyCh notifies grpc of the set of addresses for connecting
- notifyCh chan []grpc.Address
- // readyc closes once the first connection is up
- readyc chan struct{}
- readyOnce sync.Once
- // mu protects upEps, pinAddr, and connectingAddr
- mu sync.RWMutex
- // upc closes when upEps transitions from empty to non-zero or the balancer closes.
- upc chan struct{}
- // downc closes when grpc calls down() on pinAddr
- downc chan struct{}
- // stopc is closed to signal updateNotifyLoop should stop.
- stopc chan struct{}
- // donec closes when all goroutines are exited
- donec chan struct{}
- // updateAddrsC notifies updateNotifyLoop to update addrs.
- updateAddrsC chan struct{}
- // grpc issues TLS cert checks using the string passed into dial so
- // that string must be the host. To recover the full scheme://host URL,
- // have a map from hosts to the original endpoint.
- host2ep map[string]string
- // pinAddr is the currently pinned address; set to the empty string on
- // intialization and shutdown.
- pinAddr string
- closed bool
- }
- func newSimpleBalancer(eps []string) *simpleBalancer {
- notifyCh := make(chan []grpc.Address, 1)
- addrs := make([]grpc.Address, len(eps))
- for i := range eps {
- addrs[i].Addr = getHost(eps[i])
- }
- sb := &simpleBalancer{
- addrs: addrs,
- notifyCh: notifyCh,
- readyc: make(chan struct{}),
- upc: make(chan struct{}),
- stopc: make(chan struct{}),
- downc: make(chan struct{}),
- donec: make(chan struct{}),
- updateAddrsC: make(chan struct{}, 1),
- host2ep: getHost2ep(eps),
- }
- close(sb.downc)
- go sb.updateNotifyLoop()
- return sb
- }
- func (b *simpleBalancer) Start(target string, config grpc.BalancerConfig) error { return nil }
- func (b *simpleBalancer) ConnectNotify() <-chan struct{} {
- b.mu.Lock()
- defer b.mu.Unlock()
- return b.upc
- }
- func (b *simpleBalancer) getEndpoint(host string) string {
- b.mu.Lock()
- defer b.mu.Unlock()
- return b.host2ep[host]
- }
- func getHost2ep(eps []string) map[string]string {
- hm := make(map[string]string, len(eps))
- for i := range eps {
- _, host, _ := parseEndpoint(eps[i])
- hm[host] = eps[i]
- }
- return hm
- }
- func (b *simpleBalancer) updateAddrs(eps []string) {
- np := getHost2ep(eps)
- b.mu.Lock()
- match := len(np) == len(b.host2ep)
- for k, v := range np {
- if b.host2ep[k] != v {
- match = false
- break
- }
- }
- if match {
- // same endpoints, so no need to update address
- b.mu.Unlock()
- return
- }
- b.host2ep = np
- addrs := make([]grpc.Address, 0, len(eps))
- for i := range eps {
- addrs = append(addrs, grpc.Address{Addr: getHost(eps[i])})
- }
- b.addrs = addrs
- // updating notifyCh can trigger new connections,
- // only update addrs if all connections are down
- // or addrs does not include pinAddr.
- update := !hasAddr(addrs, b.pinAddr)
- b.mu.Unlock()
- if update {
- select {
- case b.updateAddrsC <- struct{}{}:
- case <-b.stopc:
- }
- }
- }
- func hasAddr(addrs []grpc.Address, targetAddr string) bool {
- for _, addr := range addrs {
- if targetAddr == addr.Addr {
- return true
- }
- }
- return false
- }
- func (b *simpleBalancer) updateNotifyLoop() {
- defer close(b.donec)
- for {
- b.mu.RLock()
- upc, downc, addr := b.upc, b.downc, b.pinAddr
- b.mu.RUnlock()
- // downc or upc should be closed
- select {
- case <-downc:
- downc = nil
- default:
- }
- select {
- case <-upc:
- upc = nil
- default:
- }
- switch {
- case downc == nil && upc == nil:
- // stale
- select {
- case <-b.stopc:
- return
- default:
- }
- case downc == nil:
- b.notifyAddrs()
- select {
- case <-upc:
- case <-b.updateAddrsC:
- b.notifyAddrs()
- case <-b.stopc:
- return
- }
- case upc == nil:
- select {
- // close connections that are not the pinned address
- case b.notifyCh <- []grpc.Address{{Addr: addr}}:
- case <-downc:
- case <-b.stopc:
- return
- }
- select {
- case <-downc:
- case <-b.updateAddrsC:
- case <-b.stopc:
- return
- }
- b.notifyAddrs()
- }
- }
- }
- func (b *simpleBalancer) notifyAddrs() {
- b.mu.RLock()
- addrs := b.addrs
- b.mu.RUnlock()
- select {
- case b.notifyCh <- addrs:
- case <-b.stopc:
- }
- }
- func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
- b.mu.Lock()
- defer b.mu.Unlock()
- // gRPC might call Up after it called Close. We add this check
- // to "fix" it up at application layer. Or our simplerBalancer
- // might panic since b.upc is closed.
- if b.closed {
- return func(err error) {}
- }
- // gRPC might call Up on a stale address.
- // Prevent updating pinAddr with a stale address.
- if !hasAddr(b.addrs, addr.Addr) {
- return func(err error) {}
- }
- if b.pinAddr != "" {
- return func(err error) {}
- }
- // notify waiting Get()s and pin first connected address
- close(b.upc)
- b.downc = make(chan struct{})
- b.pinAddr = addr.Addr
- // notify client that a connection is up
- b.readyOnce.Do(func() { close(b.readyc) })
- return func(err error) {
- b.mu.Lock()
- b.upc = make(chan struct{})
- close(b.downc)
- b.pinAddr = ""
- b.mu.Unlock()
- }
- }
- func (b *simpleBalancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (grpc.Address, func(), error) {
- var (
- addr string
- closed bool
- )
- // If opts.BlockingWait is false (for fail-fast RPCs), it should return
- // an address it has notified via Notify immediately instead of blocking.
- if !opts.BlockingWait {
- b.mu.RLock()
- closed = b.closed
- addr = b.pinAddr
- b.mu.RUnlock()
- if closed {
- return grpc.Address{Addr: ""}, nil, grpc.ErrClientConnClosing
- }
- if addr == "" {
- return grpc.Address{Addr: ""}, nil, ErrNoAddrAvilable
- }
- return grpc.Address{Addr: addr}, func() {}, nil
- }
- for {
- b.mu.RLock()
- ch := b.upc
- b.mu.RUnlock()
- select {
- case <-ch:
- case <-b.donec:
- return grpc.Address{Addr: ""}, nil, grpc.ErrClientConnClosing
- case <-ctx.Done():
- return grpc.Address{Addr: ""}, nil, ctx.Err()
- }
- b.mu.RLock()
- closed = b.closed
- addr = b.pinAddr
- b.mu.RUnlock()
- // Close() which sets b.closed = true can be called before Get(), Get() must exit if balancer is closed.
- if closed {
- return grpc.Address{Addr: ""}, nil, grpc.ErrClientConnClosing
- }
- if addr != "" {
- break
- }
- }
- return grpc.Address{Addr: addr}, func() {}, nil
- }
- func (b *simpleBalancer) Notify() <-chan []grpc.Address { return b.notifyCh }
- func (b *simpleBalancer) Close() error {
- b.mu.Lock()
- // In case gRPC calls close twice. TODO: remove the checking
- // when we are sure that gRPC wont call close twice.
- if b.closed {
- b.mu.Unlock()
- <-b.donec
- return nil
- }
- b.closed = true
- close(b.stopc)
- b.pinAddr = ""
- // In the case of following scenario:
- // 1. upc is not closed; no pinned address
- // 2. client issues an rpc, calling invoke(), which calls Get(), enters for loop, blocks
- // 3. clientconn.Close() calls balancer.Close(); closed = true
- // 4. for loop in Get() never exits since ctx is the context passed in by the client and may not be canceled
- // we must close upc so Get() exits from blocking on upc
- select {
- case <-b.upc:
- default:
- // terminate all waiting Get()s
- close(b.upc)
- }
- b.mu.Unlock()
- // wait for updateNotifyLoop to finish
- <-b.donec
- close(b.notifyCh)
- return nil
- }
- func getHost(ep string) string {
- url, uerr := url.Parse(ep)
- if uerr != nil || !strings.Contains(ep, "://") {
- return ep
- }
- return url.Host
- }
|