|
|
@@ -61,12 +61,24 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) error {
|
|
|
func sendLoop(stream pb.Watch_WatchServer, watcher storage.Watcher, closec chan struct{}) {
|
|
|
for {
|
|
|
select {
|
|
|
- case e := <-watcher.Chan():
|
|
|
+ case e, ok := <-watcher.Chan():
|
|
|
+ if !ok {
|
|
|
+ return
|
|
|
+ }
|
|
|
err := stream.Send(&pb.WatchResponse{Event: &e})
|
|
|
+ storage.ReportEventReceived()
|
|
|
if err != nil {
|
|
|
return
|
|
|
}
|
|
|
case <-closec:
|
|
|
+ // drain the chan to clean up pending events
|
|
|
+ for {
|
|
|
+ _, ok := <-watcher.Chan()
|
|
|
+ if !ok {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ storage.ReportEventReceived()
|
|
|
+ }
|
|
|
return
|
|
|
}
|
|
|
}
|