Browse Source

Merge pull request #6582 from heyitsanthony/fix-cancel-close

clientv3: only return closing error to watcher if context is not canceled
Anthony Romano 9 years ago
parent
commit
b980ab0c67
2 changed files with 30 additions and 1 deletions
  1. 29 0
      clientv3/integration/watch_test.go
  2. 1 1
      clientv3/watch.go

+ 29 - 0
clientv3/integration/watch_test.go

@@ -839,3 +839,32 @@ func testWatchOverlapContextCancel(t *testing.T, f func(*integration.ClusterV3))
 		}
 		}
 	}
 	}
 }
 }
+
+// TestWatchCanelAndCloseClient ensures that canceling a watcher then immediately
+// closing the client does not return a client closing error.
+func TestWatchCancelAndCloseClient(t *testing.T) {
+	defer testutil.AfterTest(t)
+	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
+	defer clus.Terminate(t)
+	cli := clus.Client(0)
+	ctx, cancel := context.WithCancel(context.Background())
+	wch := cli.Watch(ctx, "abc")
+	donec := make(chan struct{})
+	go func() {
+		defer close(donec)
+		select {
+		case wr, ok := <-wch:
+			if ok {
+				t.Fatalf("expected closed watch after cancel(), got resp=%+v err=%v", wr, wr.Err())
+			}
+		case <-time.After(5 * time.Second):
+			t.Fatal("timed out waiting for closed channel")
+		}
+	}()
+	cancel()
+	if err := cli.Close(); err != nil {
+		t.Fatal(err)
+	}
+	<-donec
+	clus.TakeClient(0)
+}

+ 1 - 1
clientv3/watch.go

@@ -364,7 +364,7 @@ func (w *watchGrpcStream) closeSubstream(ws *watcherStream) {
 	default:
 	default:
 	}
 	}
 	// close subscriber's channel
 	// close subscriber's channel
-	if closeErr := w.closeErr; closeErr != nil {
+	if closeErr := w.closeErr; closeErr != nil && ws.initReq.ctx.Err() == nil {
 		go w.sendCloseSubstream(ws, &WatchResponse{closeErr: w.closeErr})
 		go w.sendCloseSubstream(ws, &WatchResponse{closeErr: w.closeErr})
 	} else {
 	} else {
 		close(ws.outc)
 		close(ws.outc)