@@ -82,7 +82,18 @@ type serverWatchStream struct {
nextWatcherID int64
}
+func (sws *serverWatchStream) close() {
+ close(sws.watchCh)
+ close(sws.ctrlCh)
+ for _, ws := range sws.singles {
+ ws.stop()
+ }
+ sws.groups.stop()
+}
+
func (sws *serverWatchStream) recvLoop() error {
+ defer sws.close()
for {
req, err := sws.gRPCStream.Recv()
if err == io.EOF {
@@ -86,3 +86,11 @@ func (wgs *watchergroups) maybeJoinWatcherSingle(rid receiverID, ws watcherSingl
return false
+func (wgs *watchergroups) stop() {
+ wgs.mu.Lock()
+ defer wgs.mu.Unlock()
+ for _, wg := range wgs.groups {
+ wg.stop()