|
|
@@ -192,11 +192,29 @@ func (e *Etcd) Config() Config {
|
|
|
func (e *Etcd) Close() {
|
|
|
e.closeOnce.Do(func() { close(e.stopc) })
|
|
|
|
|
|
- // (gRPC server) stops accepting new connections,
|
|
|
- // RPCs, and blocks until all pending RPCs are finished
|
|
|
+ timeout := 2 * time.Second
|
|
|
+ if e.Server != nil {
|
|
|
+ timeout = e.Server.Cfg.ReqTimeout()
|
|
|
+ }
|
|
|
for _, sctx := range e.sctxs {
|
|
|
for gs := range sctx.grpcServerC {
|
|
|
- gs.GracefulStop()
|
|
|
+ ch := make(chan struct{})
|
|
|
+ go func() {
|
|
|
+ defer close(ch)
|
|
|
+ // close listeners to stop accepting new connections,
|
|
|
+ // will block on any existing transports
|
|
|
+ gs.GracefulStop()
|
|
|
+ }()
|
|
|
+ // wait until all pending RPCs are finished
|
|
|
+ select {
|
|
|
+ case <-ch:
|
|
|
+ case <-time.After(timeout):
|
|
|
+ // took too long, manually close open transports
|
|
|
+ // e.g. watch streams
|
|
|
+ gs.Stop()
|
|
|
+ // concurrent GracefulStop should be interrupted
|
|
|
+ <-ch
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|