|
@@ -66,10 +66,19 @@ func sendLoop(stream pb.Watch_WatchServer, watcher storage.Watcher, closec chan
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
err := stream.Send(&pb.WatchResponse{Event: &e})
|
|
err := stream.Send(&pb.WatchResponse{Event: &e})
|
|
|
|
|
+ storage.ReportEventReceived()
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
case <-closec:
|
|
case <-closec:
|
|
|
|
|
+ // drain the chan to clean up pending events
|
|
|
|
|
+ for {
|
|
|
|
|
+ _, ok := <-watcher.Chan()
|
|
|
|
|
+ if !ok {
|
|
|
|
|
+ return
|
|
|
|
|
+ }
|
|
|
|
|
+ storage.ReportEventReceived()
|
|
|
|
|
+ }
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|