Browse Source

Merge pull request #5505 from heyitsanthony/v3rpc-watcher-close

v3rpc: fix race on ctrl channel when watcher stream closes
Anthony Romano 9 years ago
parent
commit
7709cd84bb
2 changed files with 41 additions and 2 deletions
  1. 7 2
      etcdserver/api/v3rpc/watch.go
  2. 34 0
      integration/v3_watch_test.go

+ 7 - 2
etcdserver/api/v3rpc/watch.go

@@ -139,6 +139,7 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
 }
 }
 
 
 func (sws *serverWatchStream) recvLoop() error {
 func (sws *serverWatchStream) recvLoop() error {
+	defer close(sws.ctrlStream)
 	for {
 	for {
 		req, err := sws.gRPCStream.Recv()
 		req, err := sws.gRPCStream.Recv()
 		if err == io.EOF {
 		if err == io.EOF {
@@ -172,12 +173,17 @@ func (sws *serverWatchStream) recvLoop() error {
 			if id != -1 && creq.ProgressNotify {
 			if id != -1 && creq.ProgressNotify {
 				sws.progress[id] = true
 				sws.progress[id] = true
 			}
 			}
-			sws.ctrlStream <- &pb.WatchResponse{
+			wr := &pb.WatchResponse{
 				Header:   sws.newResponseHeader(wsrev),
 				Header:   sws.newResponseHeader(wsrev),
 				WatchId:  int64(id),
 				WatchId:  int64(id),
 				Created:  true,
 				Created:  true,
 				Canceled: id == -1,
 				Canceled: id == -1,
 			}
 			}
+			select {
+			case sws.ctrlStream <- wr:
+			case <-sws.closec:
+				return nil
+			}
 		case *pb.WatchRequest_CancelRequest:
 		case *pb.WatchRequest_CancelRequest:
 			if uv.CancelRequest != nil {
 			if uv.CancelRequest != nil {
 				id := uv.CancelRequest.WatchId
 				id := uv.CancelRequest.WatchId
@@ -301,7 +307,6 @@ func (sws *serverWatchStream) sendLoop() {
 func (sws *serverWatchStream) close() {
 func (sws *serverWatchStream) close() {
 	sws.watchStream.Close()
 	sws.watchStream.Close()
 	close(sws.closec)
 	close(sws.closec)
-	close(sws.ctrlStream)
 	sws.wg.Wait()
 	sws.wg.Wait()
 }
 }
 
 

+ 34 - 0
integration/v3_watch_test.go

@@ -976,3 +976,37 @@ func TestWatchWithProgressNotify(t *testing.T) {
 		t.Errorf("unexpected pb.WatchResponse is received %+v", resp)
 		t.Errorf("unexpected pb.WatchResponse is received %+v", resp)
 	}
 	}
 }
 }
+
+// TestV3WatcMultiOpenhClose opens many watchers concurrently on multiple streams.
+func TestV3WatchClose(t *testing.T) {
+	defer testutil.AfterTest(t)
+	clus := NewClusterV3(t, &ClusterConfig{Size: 1})
+	defer clus.Terminate(t)
+
+	c := clus.RandClient()
+	wapi := toGRPC(c).Watch
+
+	var wg sync.WaitGroup
+	wg.Add(100)
+	for i := 0; i < 100; i++ {
+		go func() {
+			ctx, cancel := context.WithCancel(context.TODO())
+			defer func() {
+				wg.Done()
+				cancel()
+			}()
+			ws, err := wapi.Watch(ctx)
+			if err != nil {
+				return
+			}
+			cr := &pb.WatchCreateRequest{Key: []byte("a")}
+			req := &pb.WatchRequest{
+				RequestUnion: &pb.WatchRequest_CreateRequest{
+					CreateRequest: cr}}
+			ws.Send(req)
+			ws.Recv()
+		}()
+	}
+	c.ActiveConnection().Close()
+	wg.Wait()
+}