Browse Source

*: watch true cancel, created for wrong rev

This sets Created and Cancel true in pb.WatchResponse
when it has received wrong start revision instead of
panic. So that clientv3 can set 'Canceled' in WatchResponse
as well.

Fix https://github.com/coreos/etcd/issues/4610.
Gyu-Ho Lee 10 years ago
parent
commit
a78604dacb

+ 25 - 0
clientv3/integration/watch_test.go

@@ -297,3 +297,28 @@ func putAndWatch(t *testing.T, wctx *watchctx, key, val string) {
 		}
 	}
 }
+
+func TestWatchInvalidFutureRevision(t *testing.T) {
+	defer testutil.AfterTest(t)
+
+	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
+	defer clus.Terminate(t)
+
+	w := clientv3.NewWatcher(clus.RandClient())
+	defer w.Close()
+
+	rch := w.Watch(context.Background(), "foo", clientv3.WithRev(100))
+
+	wresp, ok := <-rch // WatchResponse from canceled one
+	if !ok {
+		t.Fatalf("expected wresp 'open'(ok true), but got ok %v", ok)
+	}
+	if !wresp.Canceled {
+		t.Fatalf("wresp.Canceled expected 'true', but got %v", wresp.Canceled)
+	}
+
+	_, ok = <-rch // ensure the channel is closed
+	if ok != false {
+		t.Fatalf("expected wresp 'closed'(ok false), but got ok %v", ok)
+	}
+}

+ 10 - 5
clientv3/watch.go

@@ -44,6 +44,9 @@ type WatchResponse struct {
 	// CompactRevision is set to the compaction revision that
 	// caused the watcher to cancel.
 	CompactRevision int64
+
+	// Canceled is 'true' when it has received wrong watch start revision.
+	Canceled bool
 }
 
 // watcher implements the Watcher interface
@@ -165,12 +168,13 @@ func (w *watcher) addStream(resp *pb.WatchResponse, pendingReq *watchRequest) {
 		// no pending request; ignore
 		return
 	}
-	if resp.CompactRevision != 0 {
+	if resp.Canceled || resp.CompactRevision != 0 {
 		// compaction after start revision
 		ret := make(chan WatchResponse, 1)
 		ret <- WatchResponse{
 			Header:          *resp.Header,
-			CompactRevision: resp.CompactRevision}
+			CompactRevision: resp.CompactRevision,
+			Canceled:        resp.Canceled}
 		close(ret)
 		pendingReq.retc <- ret
 		return
@@ -251,13 +255,13 @@ func (w *watcher) run() {
 		// New events from the watch client
 		case pbresp := <-w.respc:
 			switch {
-			case pbresp.Canceled:
-				delete(cancelSet, pbresp.WatchId)
 			case pbresp.Created:
 				// response to pending req, try to add
 				w.addStream(pbresp, pendingReq)
 				pendingReq = nil
 				curReqC = w.reqc
+			case pbresp.Canceled:
+				delete(cancelSet, pbresp.WatchId)
 			default:
 				// dispatch to appropriate watch stream
 				if ok := w.dispatchEvent(pbresp); ok {
@@ -317,7 +321,8 @@ func (w *watcher) dispatchEvent(pbresp *pb.WatchResponse) bool {
 		wr := &WatchResponse{
 			Header:          *pbresp.Header,
 			Events:          pbresp.Events,
-			CompactRevision: pbresp.CompactRevision}
+			CompactRevision: pbresp.CompactRevision,
+			Canceled:        pbresp.Canceled}
 		ws.recvc <- wr
 	}
 	return ok

+ 7 - 0
etcdserver/api/v3rpc/watch.go

@@ -108,6 +108,13 @@ func (sws *serverWatchStream) recvLoop() error {
 				if rev == 0 {
 					// rev 0 watches past the current revision
 					rev = wsrev + 1
+				} else if rev > wsrev { // do not allow watching future revision.
+					sws.ctrlStream <- &pb.WatchResponse{
+						Header:   sws.newResponseHeader(wsrev),
+						Created:  true,
+						Canceled: true,
+					}
+					continue
 				}
 				id := sws.watchStream.Watch(toWatch, prefix, rev)
 				sws.ctrlStream <- &pb.WatchResponse{

+ 29 - 0
integration/v3_watch_test.go

@@ -782,3 +782,32 @@ func waitResponse(wc pb.Watch_WatchClient, timeout time.Duration) (bool, *pb.Wat
 	}
 	return true, nil
 }
+
+// TestV3WatchFutureRevision ensures invalid future revision to Watch APIs
+// returns WatchResponse of true Created and true Canceled.
+func TestV3WatchInvalidFutureRevision(t *testing.T) {
+	defer testutil.AfterTest(t)
+	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
+	defer clus.Terminate(t)
+
+	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+	defer cancel()
+	wStream, wErr := clus.RandClient().Watch.Watch(ctx)
+	if wErr != nil {
+		t.Fatalf("wAPI.Watch error: %v", wErr)
+	}
+
+	wreq := &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{
+		CreateRequest: &pb.WatchCreateRequest{Key: []byte("foo"), StartRevision: 100}}}
+	if err := wStream.Send(wreq); err != nil {
+		t.Fatalf("watch request failed (%v)", err)
+	}
+
+	resp, err := wStream.Recv()
+	if err != nil {
+		t.Errorf("wStream.Recv error: %v", err)
+	}
+	if !resp.Created || !resp.Canceled || len(resp.Events) != 0 {
+		t.Errorf("invalid start rev should return true, true, 0, but got %v, %v, %d", resp.Created, resp.Canceled, len(resp.Events))
+	}
+}