Browse Source

Merge pull request #4564 from heyitsanthony/fix-watchreconnrequest

clientv3: fix current watcher reconnection
Anthony Romano 9 years ago
parent
commit
11bb07c248
2 changed files with 19 additions and 4 deletions
  1. 13 4
      clientv3/integration/watch_test.go
  2. 6 0
      clientv3/watch.go

+ 13 - 4
clientv3/integration/watch_test.go

@@ -163,13 +163,18 @@ func TestWatchReconnRequest(t *testing.T) {
 }
 
 func testWatchReconnRequest(t *testing.T, wctx *watchctx) {
-	// take down watcher connection
-	donec := make(chan struct{})
+	donec, stopc := make(chan struct{}), make(chan struct{}, 1)
 	go func() {
+		timer := time.After(2 * time.Second)
+		defer close(donec)
+		// take down watcher connection
 		for {
 			wctx.wclient.ActiveConnection().Close()
 			select {
-			case <-donec:
+			case <-timer:
+				// spinning on close may live lock reconnection
+				return
+			case <-stopc:
 				return
 			default:
 			}
@@ -179,7 +184,11 @@ func testWatchReconnRequest(t *testing.T, wctx *watchctx) {
 	if wctx.ch = wctx.w.Watch(context.TODO(), "a", 0); wctx.ch == nil {
 		t.Fatalf("expected non-nil channel")
 	}
-	close(donec)
+
+	// wait for disconnections to stop
+	stopc <- struct{}{}
+	<-donec
+
 	// ensure watcher works
 	putAndWatch(t, wctx, "a", "a")
 }

+ 6 - 0
clientv3/watch.go

@@ -198,6 +198,12 @@ func (w *watcher) addStream(resp *pb.WatchResponse, pendingReq *watchRequest) {
 		resumec: make(chan int64),
 	}
 
+	if pendingReq.rev == 0 {
+		// note the header revision so that a put following a current watcher
+		// disconnect will arrive on the watcher channel after reconnect
+		ws.initReq.rev = resp.Header.Revision
+	}
+
 	w.mu.Lock()
 	w.streams[ws.id] = ws
 	w.mu.Unlock()