|
|
@@ -45,6 +45,8 @@ type simpleBalancer struct {
|
|
|
// pinAddr is the currently pinned address; set to the empty string on
|
|
|
// intialization and shutdown.
|
|
|
pinAddr string
|
|
|
+
|
|
|
+ closed bool
|
|
|
}
|
|
|
|
|
|
func newSimpleBalancer(eps []string) *simpleBalancer {
|
|
|
@@ -74,15 +76,25 @@ func (b *simpleBalancer) ConnectNotify() <-chan struct{} {
|
|
|
|
|
|
func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
|
|
|
b.mu.Lock()
|
|
|
+ defer b.mu.Unlock()
|
|
|
+
|
|
|
+ // gRPC might call Up after it called Close. We add this check
|
|
|
+ // to "fix" it up at application layer. Or our simplerBalancer
|
|
|
+ // might panic since b.upc is closed.
|
|
|
+ if b.closed {
|
|
|
+ return func(err error) {}
|
|
|
+ }
|
|
|
+
|
|
|
if len(b.upEps) == 0 {
|
|
|
// notify waiting Get()s and pin first connected address
|
|
|
close(b.upc)
|
|
|
b.pinAddr = addr.Addr
|
|
|
}
|
|
|
b.upEps[addr.Addr] = struct{}{}
|
|
|
- b.mu.Unlock()
|
|
|
+
|
|
|
// notify client that a connection is up
|
|
|
b.readyOnce.Do(func() { close(b.readyc) })
|
|
|
+
|
|
|
return func(err error) {
|
|
|
b.mu.Lock()
|
|
|
delete(b.upEps, addr.Addr)
|
|
|
@@ -128,13 +140,19 @@ func (b *simpleBalancer) Notify() <-chan []grpc.Address { return b.notifyCh }
|
|
|
|
|
|
func (b *simpleBalancer) Close() error {
|
|
|
b.mu.Lock()
|
|
|
+ defer b.mu.Unlock()
|
|
|
+ // In case gRPC calls close twice. TODO: remove the checking
|
|
|
+ // when we are sure that gRPC wont call close twice.
|
|
|
+ if b.closed {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ b.closed = true
|
|
|
close(b.notifyCh)
|
|
|
// terminate all waiting Get()s
|
|
|
b.pinAddr = ""
|
|
|
if len(b.upEps) == 0 {
|
|
|
close(b.upc)
|
|
|
}
|
|
|
- b.mu.Unlock()
|
|
|
return nil
|
|
|
}
|
|
|
|