Browse Source

v3rpc: fix race on closing watcher stream ctrl channel

Sometimes close would race with the recvLoop, leading the
recvLoop to write to a close channel.
Anthony Romano 9 years ago
parent
commit
09e8f5782e
1 changed files with 7 additions and 2 deletions
  1. 7 2
      etcdserver/api/v3rpc/watch.go

+ 7 - 2
etcdserver/api/v3rpc/watch.go

@@ -139,6 +139,7 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
 }
 
 func (sws *serverWatchStream) recvLoop() error {
+	defer close(sws.ctrlStream)
 	for {
 		req, err := sws.gRPCStream.Recv()
 		if err == io.EOF {
@@ -172,12 +173,17 @@ func (sws *serverWatchStream) recvLoop() error {
 			if id != -1 && creq.ProgressNotify {
 				sws.progress[id] = true
 			}
-			sws.ctrlStream <- &pb.WatchResponse{
+			wr := &pb.WatchResponse{
 				Header:   sws.newResponseHeader(wsrev),
 				WatchId:  int64(id),
 				Created:  true,
 				Canceled: id == -1,
 			}
+			select {
+			case sws.ctrlStream <- wr:
+			case <-sws.closec:
+				return nil
+			}
 		case *pb.WatchRequest_CancelRequest:
 			if uv.CancelRequest != nil {
 				id := uv.CancelRequest.WatchId
@@ -301,7 +307,6 @@ func (sws *serverWatchStream) sendLoop() {
 func (sws *serverWatchStream) close() {
 	sws.watchStream.Close()
 	close(sws.closec)
-	close(sws.ctrlStream)
 	sws.wg.Wait()
 }