Browse Source

clientv3: drain buffered WatchResponses before resuming

Otherwise, the watcherStream can receive WatchResponses in the
middle of a resume, corrupting the stream.

Fixes #6364
Anthony Romano 9 years ago
parent
commit
ad318ee891
1 changed files with 15 additions and 0 deletions
  1. 15 0
      clientv3/watch.go

+ 15 - 0
clientv3/watch.go

@@ -708,6 +708,10 @@ func (w *watchGrpcStream) resumeWatchers(wc pb.Watch_WatchClient) error {
 	w.mu.RUnlock()
 	w.mu.RUnlock()
 
 
 	for _, ws := range streams {
 	for _, ws := range streams {
+		// drain recvc so no old WatchResponses (e.g., Created messages)
+		// are processed while resuming
+		ws.drain()
+
 		// pause serveStream
 		// pause serveStream
 		ws.resumec <- -1
 		ws.resumec <- -1
 
 
@@ -740,6 +744,17 @@ func (w *watchGrpcStream) resumeWatchers(wc pb.Watch_WatchClient) error {
 	return nil
 	return nil
 }
 }
 
 
+// drain removes all buffered WatchResponses from the stream's receive channel.
+func (ws *watcherStream) drain() {
+	for {
+		select {
+		case <-ws.recvc:
+		default:
+			return
+		}
+	}
+}
+
 // toPB converts an internal watch request structure to its protobuf messagefunc (wr *watchRequest)
 // toPB converts an internal watch request structure to its protobuf messagefunc (wr *watchRequest)
 func (wr *watchRequest) toPB() *pb.WatchRequest {
 func (wr *watchRequest) toPB() *pb.WatchRequest {
 	req := &pb.WatchCreateRequest{
 	req := &pb.WatchCreateRequest{