Browse Source

clientv3: return closed channel on Watch() cancel

was returning nil; difficult to use correctly

Fixes #4626
Anthony Romano 9 years ago
parent
commit
d02b1c982f
2 changed files with 40 additions and 11 deletions
  1. 20 0
      clientv3/integration/watch_test.go
  2. 20 11
      clientv3/watch.go

+ 20 - 0
clientv3/integration/watch_test.go

@@ -226,6 +226,26 @@ func testWatchReconnRunning(t *testing.T, wctx *watchctx) {
 	putAndWatch(t, wctx, "a", "b")
 }
 
+// TestWatchCancelImmediate ensures a closed channel is returned
+// if the context is cancelled.
+func TestWatchCancelImmediate(t *testing.T) {
+	runWatchTest(t, testWatchCancelImmediate)
+}
+
+func testWatchCancelImmediate(t *testing.T, wctx *watchctx) {
+	ctx, cancel := context.WithCancel(context.Background())
+	cancel()
+	wch := wctx.w.Watch(ctx, "a")
+	select {
+	case wresp, ok := <-wch:
+		if ok {
+			t.Fatalf("read wch got %v; expected closed channel", wresp)
+		}
+	default:
+		t.Fatalf("closed watcher channel should not block")
+	}
+}
+
 // TestWatchCancelInit tests watcher closes correctly after no events.
 func TestWatchCancelInit(t *testing.T) {
 	runWatchTest(t, testWatchCancelInit)

+ 20 - 11
clientv3/watch.go

@@ -135,23 +135,30 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
 	retc := make(chan chan WatchResponse, 1)
 	wr.retc = retc
 
+	ok := false
+
 	// submit request
 	select {
 	case w.reqc <- wr:
+		ok = true
 	case <-wr.ctx.Done():
-		return nil
 	case <-w.donec:
-		return nil
 	}
+
 	// receive channel
-	select {
-	case ret := <-retc:
-		return ret
-	case <-ctx.Done():
-		return nil
-	case <-w.donec:
-		return nil
+	if ok {
+		select {
+		case ret := <-retc:
+			return ret
+		case <-ctx.Done():
+		case <-w.donec:
+		}
 	}
+
+	// couldn't create channel; return closed channel
+	ch := make(chan WatchResponse)
+	close(ch)
+	return ch
 }
 
 func (w *watcher) Close() error {
@@ -179,13 +186,15 @@ func (w *watcher) addStream(resp *pb.WatchResponse, pendingReq *watchRequest) {
 		pendingReq.retc <- ret
 		return
 	}
+
+	ret := make(chan WatchResponse)
 	if resp.WatchId == -1 {
 		// failed; no channel
-		pendingReq.retc <- nil
+		close(ret)
+		pendingReq.retc <- ret
 		return
 	}
 
-	ret := make(chan WatchResponse)
 	ws := &watcherStream{
 		initReq: *pendingReq,
 		id:      resp.WatchId,