|
@@ -214,7 +214,19 @@ func (sws *serverWatchStream) sendLoop() {
|
|
|
|
|
|
|
|
interval := GetProgressReportInterval()
|
|
interval := GetProgressReportInterval()
|
|
|
progressTicker := time.NewTicker(interval)
|
|
progressTicker := time.NewTicker(interval)
|
|
|
- defer progressTicker.Stop()
|
|
|
|
|
|
|
+
|
|
|
|
|
+ defer func() {
|
|
|
|
|
+ progressTicker.Stop()
|
|
|
|
|
+ // drain the chan to clean up pending events
|
|
|
|
|
+ for ws := range sws.watchStream.Chan() {
|
|
|
|
|
+ mvcc.ReportEventReceived(len(ws.Events))
|
|
|
|
|
+ }
|
|
|
|
|
+ for _, wrs := range pending {
|
|
|
|
|
+ for _, ws := range wrs {
|
|
|
|
|
+ mvcc.ReportEventReceived(len(ws.Events))
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }()
|
|
|
|
|
|
|
|
for {
|
|
for {
|
|
|
select {
|
|
select {
|
|
@@ -246,7 +258,7 @@ func (sws *serverWatchStream) sendLoop() {
|
|
|
continue
|
|
continue
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- mvcc.ReportEventReceived()
|
|
|
|
|
|
|
+ mvcc.ReportEventReceived(len(evs))
|
|
|
if err := sws.gRPCStream.Send(wr); err != nil {
|
|
if err := sws.gRPCStream.Send(wr); err != nil {
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
@@ -276,7 +288,7 @@ func (sws *serverWatchStream) sendLoop() {
|
|
|
// flush buffered events
|
|
// flush buffered events
|
|
|
ids[wid] = struct{}{}
|
|
ids[wid] = struct{}{}
|
|
|
for _, v := range pending[wid] {
|
|
for _, v := range pending[wid] {
|
|
|
- mvcc.ReportEventReceived()
|
|
|
|
|
|
|
+ mvcc.ReportEventReceived(len(v.Events))
|
|
|
if err := sws.gRPCStream.Send(v); err != nil {
|
|
if err := sws.gRPCStream.Send(v); err != nil {
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
@@ -291,15 +303,7 @@ func (sws *serverWatchStream) sendLoop() {
|
|
|
sws.progress[id] = true
|
|
sws.progress[id] = true
|
|
|
}
|
|
}
|
|
|
case <-sws.closec:
|
|
case <-sws.closec:
|
|
|
- // drain the chan to clean up pending events
|
|
|
|
|
- for range sws.watchStream.Chan() {
|
|
|
|
|
- mvcc.ReportEventReceived()
|
|
|
|
|
- }
|
|
|
|
|
- for _, wrs := range pending {
|
|
|
|
|
- for range wrs {
|
|
|
|
|
- mvcc.ReportEventReceived()
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ return
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|