Browse Source

Merge pull request #4612 from gyuho/watch_not_panic

*: watch true cancel, created for wrong rev
Gyu-Ho Lee 9 years ago
parent
commit
8f3981c651

+ 25 - 0
clientv3/integration/watch_test.go

@@ -297,3 +297,28 @@ func putAndWatch(t *testing.T, wctx *watchctx, key, val string) {
 		}
 		}
 	}
 	}
 }
 }
+
+func TestWatchInvalidFutureRevision(t *testing.T) {
+	defer testutil.AfterTest(t)
+
+	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
+	defer clus.Terminate(t)
+
+	w := clientv3.NewWatcher(clus.RandClient())
+	defer w.Close()
+
+	rch := w.Watch(context.Background(), "foo", clientv3.WithRev(100))
+
+	wresp, ok := <-rch // WatchResponse from canceled one
+	if !ok {
+		t.Fatalf("expected wresp 'open'(ok true), but got ok %v", ok)
+	}
+	if !wresp.Canceled {
+		t.Fatalf("wresp.Canceled expected 'true', but got %v", wresp.Canceled)
+	}
+
+	_, ok = <-rch // ensure the channel is closed
+	if ok != false {
+		t.Fatalf("expected wresp 'closed'(ok false), but got ok %v", ok)
+	}
+}

+ 10 - 5
clientv3/watch.go

@@ -44,6 +44,9 @@ type WatchResponse struct {
 	// CompactRevision is set to the compaction revision that
 	// CompactRevision is set to the compaction revision that
 	// caused the watcher to cancel.
 	// caused the watcher to cancel.
 	CompactRevision int64
 	CompactRevision int64
+
+	// Canceled is 'true' when it has received wrong watch start revision.
+	Canceled bool
 }
 }
 
 
 // watcher implements the Watcher interface
 // watcher implements the Watcher interface
@@ -165,12 +168,13 @@ func (w *watcher) addStream(resp *pb.WatchResponse, pendingReq *watchRequest) {
 		// no pending request; ignore
 		// no pending request; ignore
 		return
 		return
 	}
 	}
-	if resp.CompactRevision != 0 {
+	if resp.Canceled || resp.CompactRevision != 0 {
 		// compaction after start revision
 		// compaction after start revision
 		ret := make(chan WatchResponse, 1)
 		ret := make(chan WatchResponse, 1)
 		ret <- WatchResponse{
 		ret <- WatchResponse{
 			Header:          *resp.Header,
 			Header:          *resp.Header,
-			CompactRevision: resp.CompactRevision}
+			CompactRevision: resp.CompactRevision,
+			Canceled:        resp.Canceled}
 		close(ret)
 		close(ret)
 		pendingReq.retc <- ret
 		pendingReq.retc <- ret
 		return
 		return
@@ -251,13 +255,13 @@ func (w *watcher) run() {
 		// New events from the watch client
 		// New events from the watch client
 		case pbresp := <-w.respc:
 		case pbresp := <-w.respc:
 			switch {
 			switch {
-			case pbresp.Canceled:
-				delete(cancelSet, pbresp.WatchId)
 			case pbresp.Created:
 			case pbresp.Created:
 				// response to pending req, try to add
 				// response to pending req, try to add
 				w.addStream(pbresp, pendingReq)
 				w.addStream(pbresp, pendingReq)
 				pendingReq = nil
 				pendingReq = nil
 				curReqC = w.reqc
 				curReqC = w.reqc
+			case pbresp.Canceled:
+				delete(cancelSet, pbresp.WatchId)
 			default:
 			default:
 				// dispatch to appropriate watch stream
 				// dispatch to appropriate watch stream
 				if ok := w.dispatchEvent(pbresp); ok {
 				if ok := w.dispatchEvent(pbresp); ok {
@@ -317,7 +321,8 @@ func (w *watcher) dispatchEvent(pbresp *pb.WatchResponse) bool {
 		wr := &WatchResponse{
 		wr := &WatchResponse{
 			Header:          *pbresp.Header,
 			Header:          *pbresp.Header,
 			Events:          pbresp.Events,
 			Events:          pbresp.Events,
-			CompactRevision: pbresp.CompactRevision}
+			CompactRevision: pbresp.CompactRevision,
+			Canceled:        pbresp.Canceled}
 		ws.recvc <- wr
 		ws.recvc <- wr
 	}
 	}
 	return ok
 	return ok

+ 7 - 0
etcdserver/api/v3rpc/watch.go

@@ -108,6 +108,13 @@ func (sws *serverWatchStream) recvLoop() error {
 				if rev == 0 {
 				if rev == 0 {
 					// rev 0 watches past the current revision
 					// rev 0 watches past the current revision
 					rev = wsrev + 1
 					rev = wsrev + 1
+				} else if rev > wsrev { // do not allow watching future revision.
+					sws.ctrlStream <- &pb.WatchResponse{
+						Header:   sws.newResponseHeader(wsrev),
+						Created:  true,
+						Canceled: true,
+					}
+					continue
 				}
 				}
 				id := sws.watchStream.Watch(toWatch, prefix, rev)
 				id := sws.watchStream.Watch(toWatch, prefix, rev)
 				sws.ctrlStream <- &pb.WatchResponse{
 				sws.ctrlStream <- &pb.WatchResponse{

+ 29 - 0
integration/v3_watch_test.go

@@ -782,3 +782,32 @@ func waitResponse(wc pb.Watch_WatchClient, timeout time.Duration) (bool, *pb.Wat
 	}
 	}
 	return true, nil
 	return true, nil
 }
 }
+
+// TestV3WatchFutureRevision ensures invalid future revision to Watch APIs
+// returns WatchResponse of true Created and true Canceled.
+func TestV3WatchInvalidFutureRevision(t *testing.T) {
+	defer testutil.AfterTest(t)
+	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
+	defer clus.Terminate(t)
+
+	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+	defer cancel()
+	wStream, wErr := clus.RandClient().Watch.Watch(ctx)
+	if wErr != nil {
+		t.Fatalf("wAPI.Watch error: %v", wErr)
+	}
+
+	wreq := &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{
+		CreateRequest: &pb.WatchCreateRequest{Key: []byte("foo"), StartRevision: 100}}}
+	if err := wStream.Send(wreq); err != nil {
+		t.Fatalf("watch request failed (%v)", err)
+	}
+
+	resp, err := wStream.Recv()
+	if err != nil {
+		t.Errorf("wStream.Recv error: %v", err)
+	}
+	if !resp.Created || !resp.Canceled || len(resp.Events) != 0 {
+		t.Errorf("invalid start rev should return true, true, 0, but got %v, %v, %d", resp.Created, resp.Canceled, len(resp.Events))
+	}
+}