Browse Source

clientv3: send compacted revision before closing watch chan

Anthony Romano 10 years ago
parent
commit
8e411b1b3b
2 changed files with 29 additions and 11 deletions
  1. 5 3
      clientv3/integration/kv_test.go
  2. 24 8
      clientv3/watch.go

+ 5 - 3
clientv3/integration/kv_test.go

@@ -311,9 +311,11 @@ func TestKVCompact(t *testing.T) {
 	defer wc.Close()
 	defer wc.Close()
 	wchan := wc.Watch(ctx, "foo", 3)
 	wchan := wc.Watch(ctx, "foo", 3)
 
 
-	_, ok := <-wchan
-	if ok {
-		t.Fatalf("wchan ok got %v, want false", ok)
+	if wr := <-wchan; wr.CompactRevision != 7 {
+		t.Fatalf("wchan CompactRevision got %v, want 7", wr.CompactRevision)
+	}
+	if wr, ok := <-wchan; ok {
+		t.Fatalf("wchan got %v, expected closed", wr)
 	}
 	}
 
 
 	err = kv.Compact(ctx, 1000)
 	err = kv.Compact(ctx, 1000)

+ 24 - 8
clientv3/watch.go

@@ -44,6 +44,9 @@ type Watcher interface {
 type WatchResponse struct {
 type WatchResponse struct {
 	Header pb.ResponseHeader
 	Header pb.ResponseHeader
 	Events []*storagepb.Event
 	Events []*storagepb.Event
+	// CompactRevision is set to the compaction revision that
+	// caused the watcher to cancel.
+	CompactRevision int64
 }
 }
 
 
 // watcher implements the Watcher interface
 // watcher implements the Watcher interface
@@ -166,7 +169,18 @@ func (w *watcher) addStream(resp *pb.WatchResponse, pendingReq *watchRequest) {
 	if pendingReq == nil {
 	if pendingReq == nil {
 		// no pending request; ignore
 		// no pending request; ignore
 		return
 		return
-	} else if resp.WatchId == -1 || resp.CompactRevision != 0 {
+	}
+	if resp.CompactRevision != 0 {
+		// compaction after start revision
+		ret := make(chan WatchResponse, 1)
+		ret <- WatchResponse{
+			Header:          *resp.Header,
+			CompactRevision: resp.CompactRevision}
+		close(ret)
+		pendingReq.retc <- ret
+		return
+	}
+	if resp.WatchId == -1 {
 		// failed; no channel
 		// failed; no channel
 		pendingReq.retc <- nil
 		pendingReq.retc <- nil
 		return
 		return
@@ -238,12 +252,6 @@ func (w *watcher) run() {
 			switch {
 			switch {
 			case pbresp.Canceled:
 			case pbresp.Canceled:
 				delete(cancelSet, pbresp.WatchId)
 				delete(cancelSet, pbresp.WatchId)
-			case pbresp.CompactRevision != 0:
-				w.mu.Lock()
-				if ws, ok := w.streams[pbresp.WatchId]; ok {
-					w.closeStream(ws)
-				}
-				w.mu.Unlock()
 			case pbresp.Created:
 			case pbresp.Created:
 				// response to pending req, try to add
 				// response to pending req, try to add
 				w.addStream(pbresp, pendingReq)
 				w.addStream(pbresp, pendingReq)
@@ -305,7 +313,10 @@ func (w *watcher) dispatchEvent(pbresp *pb.WatchResponse) bool {
 	defer w.mu.RUnlock()
 	defer w.mu.RUnlock()
 	ws, ok := w.streams[pbresp.WatchId]
 	ws, ok := w.streams[pbresp.WatchId]
 	if ok {
 	if ok {
-		wr := &WatchResponse{*pbresp.Header, pbresp.Events}
+		wr := &WatchResponse{
+			Header:          *pbresp.Header,
+			Events:          pbresp.Events,
+			CompactRevision: pbresp.CompactRevision}
 		ws.recvc <- wr
 		ws.recvc <- wr
 	}
 	}
 	return ok
 	return ok
@@ -346,6 +357,11 @@ func (w *watcher) serveStream(ws *watcherStream) {
 		}
 		}
 		select {
 		select {
 		case outc <- *curWr:
 		case outc <- *curWr:
+			if len(wrs[0].Events) == 0 {
+				// compaction message
+				closing = true
+				break
+			}
 			newRev := wrs[0].Events[len(wrs[0].Events)-1].Kv.ModRevision
 			newRev := wrs[0].Events[len(wrs[0].Events)-1].Kv.ModRevision
 			if newRev != ws.lastRev {
 			if newRev != ws.lastRev {
 				ws.lastRev = newRev
 				ws.lastRev = newRev