|
|
@@ -131,6 +131,8 @@ type watchGrpcStream struct {
|
|
|
donec chan struct{}
|
|
|
// errc transmits errors from grpc Recv to the watch stream reconn logic
|
|
|
errc chan error
|
|
|
+ // closingc gets the watcherStream of closing watchers
|
|
|
+ closingc chan *watcherStream
|
|
|
|
|
|
// the error that closed the watch stream
|
|
|
closeErr error
|
|
|
@@ -203,11 +205,12 @@ func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
|
|
|
cancel: cancel,
|
|
|
streams: make(map[int64]*watcherStream),
|
|
|
|
|
|
- respc: make(chan *pb.WatchResponse),
|
|
|
- reqc: make(chan *watchRequest),
|
|
|
- stopc: make(chan struct{}),
|
|
|
- donec: make(chan struct{}),
|
|
|
- errc: make(chan error, 1),
|
|
|
+ respc: make(chan *pb.WatchResponse),
|
|
|
+ reqc: make(chan *watchRequest),
|
|
|
+ stopc: make(chan struct{}),
|
|
|
+ donec: make(chan struct{}),
|
|
|
+ errc: make(chan error, 1),
|
|
|
+ closingc: make(chan *watcherStream),
|
|
|
}
|
|
|
go wgs.run()
|
|
|
return wgs
|
|
|
@@ -268,7 +271,6 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
|
|
|
case reqc <- wr:
|
|
|
ok = true
|
|
|
case <-wr.ctx.Done():
|
|
|
- wgs.stopIfEmpty()
|
|
|
case <-donec:
|
|
|
if wgs.closeErr != nil {
|
|
|
closeCh <- WatchResponse{closeErr: wgs.closeErr}
|
|
|
@@ -378,15 +380,19 @@ func (w *watchGrpcStream) addStream(resp *pb.WatchResponse, pendingReq *watchReq
|
|
|
go w.serveStream(ws)
|
|
|
}
|
|
|
|
|
|
-// closeStream closes the watcher resources and removes it
|
|
|
-func (w *watchGrpcStream) closeStream(ws *watcherStream) {
|
|
|
+func (w *watchGrpcStream) closeStream(ws *watcherStream) bool {
|
|
|
w.mu.Lock()
|
|
|
// cancels request stream; subscriber receives nil channel
|
|
|
close(ws.initReq.retc)
|
|
|
// close subscriber's channel
|
|
|
close(ws.outc)
|
|
|
delete(w.streams, ws.id)
|
|
|
+ empty := len(w.streams) == 0
|
|
|
+ if empty && w.stopc != nil {
|
|
|
+ w.stopc = nil
|
|
|
+ }
|
|
|
w.mu.Unlock()
|
|
|
+ return empty
|
|
|
}
|
|
|
|
|
|
// run is the root of the goroutines for managing a watcher client
|
|
|
@@ -491,6 +497,10 @@ func (w *watchGrpcStream) run() {
|
|
|
cancelSet = make(map[int64]struct{})
|
|
|
case <-stopc:
|
|
|
return
|
|
|
+ case ws := <-w.closingc:
|
|
|
+ if w.closeStream(ws) {
|
|
|
+ return
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// send failed; queue for retry
|
|
|
@@ -553,6 +563,15 @@ func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) {
|
|
|
|
|
|
// serveStream forwards watch responses from run() to the subscriber
|
|
|
func (w *watchGrpcStream) serveStream(ws *watcherStream) {
|
|
|
+ defer func() {
|
|
|
+ // signal that this watcherStream is finished
|
|
|
+ select {
|
|
|
+ case w.closingc <- ws:
|
|
|
+ case <-w.donec:
|
|
|
+ w.closeStream(ws)
|
|
|
+ }
|
|
|
+ }()
|
|
|
+
|
|
|
var closeErr error
|
|
|
emptyWr := &WatchResponse{}
|
|
|
wrs := []*WatchResponse{}
|
|
|
@@ -641,20 +660,9 @@ func (w *watchGrpcStream) serveStream(ws *watcherStream) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- w.closeStream(ws)
|
|
|
- w.stopIfEmpty()
|
|
|
// lazily send cancel message if events on missing id
|
|
|
}
|
|
|
|
|
|
-func (wgs *watchGrpcStream) stopIfEmpty() {
|
|
|
- wgs.mu.Lock()
|
|
|
- if len(wgs.streams) == 0 && wgs.stopc != nil {
|
|
|
- close(wgs.stopc)
|
|
|
- wgs.stopc = nil
|
|
|
- }
|
|
|
- wgs.mu.Unlock()
|
|
|
-}
|
|
|
-
|
|
|
func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
|
|
|
ws, rerr := w.resume()
|
|
|
if rerr != nil {
|