|
|
@@ -436,11 +436,15 @@ func (w *watcher) serveStream(ws *watcherStream) {
|
|
|
// TODO don't keep buffering if subscriber stops reading
|
|
|
wrs = append(wrs, wr)
|
|
|
case resumeRev := <-ws.resumec:
|
|
|
+ wrs = nil
|
|
|
+ resuming = true
|
|
|
+ if resumeRev == -1 {
|
|
|
+ // pause serving stream while resume gets set up
|
|
|
+ break
|
|
|
+ }
|
|
|
if resumeRev != ws.lastRev {
|
|
|
panic("unexpected resume revision")
|
|
|
}
|
|
|
- wrs = nil
|
|
|
- resuming = true
|
|
|
case <-w.donec:
|
|
|
closing = true
|
|
|
case <-ws.initReq.ctx.Done():
|
|
|
@@ -502,6 +506,9 @@ func (w *watcher) resumeWatchers(wc pb.Watch_WatchClient) error {
|
|
|
w.mu.RUnlock()
|
|
|
|
|
|
for _, ws := range streams {
|
|
|
+ // pause serveStream
|
|
|
+ ws.resumec <- -1
|
|
|
+
|
|
|
// reconstruct watcher from initial request
|
|
|
if ws.lastRev != 0 {
|
|
|
ws.initReq.rev = ws.lastRev
|
|
|
@@ -525,6 +532,7 @@ func (w *watcher) resumeWatchers(wc pb.Watch_WatchClient) error {
|
|
|
w.streams[ws.id] = ws
|
|
|
w.mu.Unlock()
|
|
|
|
|
|
+ // unpause serveStream
|
|
|
ws.resumec <- ws.lastRev
|
|
|
}
|
|
|
return nil
|