|
@@ -708,6 +708,10 @@ func (w *watchGrpcStream) resumeWatchers(wc pb.Watch_WatchClient) error {
|
|
|
w.mu.RUnlock()
|
|
w.mu.RUnlock()
|
|
|
|
|
|
|
|
for _, ws := range streams {
|
|
for _, ws := range streams {
|
|
|
|
|
+ // drain recvc so no old WatchResponses (e.g., Created messages)
|
|
|
|
|
+ // are processed while resuming
|
|
|
|
|
+ ws.drain()
|
|
|
|
|
+
|
|
|
// pause serveStream
|
|
// pause serveStream
|
|
|
ws.resumec <- -1
|
|
ws.resumec <- -1
|
|
|
|
|
|
|
@@ -740,6 +744,17 @@ func (w *watchGrpcStream) resumeWatchers(wc pb.Watch_WatchClient) error {
|
|
|
return nil
|
|
return nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+// drain removes all buffered WatchResponses from the stream's receive channel.
|
|
|
|
|
+func (ws *watcherStream) drain() {
|
|
|
|
|
+ for {
|
|
|
|
|
+ select {
|
|
|
|
|
+ case <-ws.recvc:
|
|
|
|
|
+ default:
|
|
|
|
|
+ return
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
// toPB converts an internal watch request structure to its protobuf messagefunc (wr *watchRequest)
|
|
// toPB converts an internal watch request structure to its protobuf messagefunc (wr *watchRequest)
|
|
|
func (wr *watchRequest) toPB() *pb.WatchRequest {
|
|
func (wr *watchRequest) toPB() *pb.WatchRequest {
|
|
|
req := &pb.WatchCreateRequest{
|
|
req := &pb.WatchCreateRequest{
|