|
@@ -131,10 +131,14 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
|
|
|
// but when stream.Context().Done() is closed, the stream's recv
|
|
// but when stream.Context().Done() is closed, the stream's recv
|
|
|
// may continue to block since it uses a different context, leading to
|
|
// may continue to block since it uses a different context, leading to
|
|
|
// deadlock when calling sws.close().
|
|
// deadlock when calling sws.close().
|
|
|
- go func() { errc <- sws.recvLoop() }()
|
|
|
|
|
-
|
|
|
|
|
|
|
+ go func() {
|
|
|
|
|
+ if rerr := sws.recvLoop(); rerr != nil {
|
|
|
|
|
+ errc <- rerr
|
|
|
|
|
+ }
|
|
|
|
|
+ }()
|
|
|
select {
|
|
select {
|
|
|
case err = <-errc:
|
|
case err = <-errc:
|
|
|
|
|
+ close(sws.ctrlStream)
|
|
|
case <-stream.Context().Done():
|
|
case <-stream.Context().Done():
|
|
|
err = stream.Context().Err()
|
|
err = stream.Context().Err()
|
|
|
// the only server-side cancellation is noleader for now.
|
|
// the only server-side cancellation is noleader for now.
|
|
@@ -147,7 +151,6 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (sws *serverWatchStream) recvLoop() error {
|
|
func (sws *serverWatchStream) recvLoop() error {
|
|
|
- defer close(sws.ctrlStream)
|
|
|
|
|
for {
|
|
for {
|
|
|
req, err := sws.gRPCStream.Recv()
|
|
req, err := sws.gRPCStream.Recv()
|
|
|
if err == io.EOF {
|
|
if err == io.EOF {
|