Browse Source

*: support watch from future revision

Xiang Li 9 years ago
parent
commit
036ed87c6d

+ 5 - 12
etcdserver/api/v3rpc/watch.go

@@ -119,27 +119,20 @@ func (sws *serverWatchStream) recvLoop() error {
 				// support  >= key queries
 				creq.RangeEnd = []byte{}
 			}
-
-			rev := creq.StartRevision
 			wsrev := sws.watchStream.Rev()
-			futureRev := rev > wsrev
+			rev := creq.StartRevision
 			if rev == 0 {
-				// rev 0 watches past the current revision
 				rev = wsrev + 1
 			}
-			// do not allow future watch revision
-			id := storage.WatchID(-1)
-			if !futureRev {
-				id = sws.watchStream.Watch(creq.Key, creq.RangeEnd, rev)
-				if creq.ProgressNotify {
-					sws.progress[id] = true
-				}
+			id := sws.watchStream.Watch(creq.Key, creq.RangeEnd, rev)
+			if id != -1 && creq.ProgressNotify {
+				sws.progress[id] = true
 			}
 			sws.ctrlStream <- &pb.WatchResponse{
 				Header:   sws.newResponseHeader(wsrev),
 				WatchId:  int64(id),
 				Created:  true,
-				Canceled: futureRev,
+				Canceled: id == -1,
 			}
 		case *pb.WatchRequest_CancelRequest:
 			if uv.CancelRequest != nil {

+ 64 - 30
integration/v3_watch_test.go

@@ -286,6 +286,70 @@ func TestV3WatchFromCurrentRevision(t *testing.T) {
 	}
 }
 
+// TestV3WatchFutureRevision tests Watch APIs from a future revision.
+func TestV3WatchFutureRevision(t *testing.T) {
+	defer testutil.AfterTest(t)
+
+	clus := NewClusterV3(t, &ClusterConfig{Size: 1})
+	defer clus.Terminate(t)
+
+	wAPI := toGRPC(clus.RandClient()).Watch
+	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+	defer cancel()
+	wStream, err := wAPI.Watch(ctx)
+	if err != nil {
+		t.Fatalf("wAPI.Watch error: %v", err)
+	}
+
+	wkey := []byte("foo")
+	wrev := int64(10)
+	req := &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{
+		CreateRequest: &pb.WatchCreateRequest{Key: wkey, StartRevision: wrev}}}
+	err = wStream.Send(req)
+	if err != nil {
+		t.Fatalf("wStream.Send error: %v", err)
+	}
+
+	// ensure watcher request created a new watcher
+	cresp, err := wStream.Recv()
+	if err != nil {
+		t.Fatalf("wStream.Recv error: %v", err)
+	}
+	if !cresp.Created {
+		t.Fatal("create = %v, want %v", cresp.Created, true)
+	}
+
+	// asynchronously create keys
+	go func() {
+		kvc := toGRPC(clus.RandClient()).KV
+		for {
+			req := &pb.PutRequest{Key: wkey, Value: []byte("bar")}
+			resp, rerr := kvc.Put(context.TODO(), req)
+			if rerr != nil {
+				t.Fatalf("couldn't put key (%v)", rerr)
+			}
+			if resp.Header.Revision == wrev {
+				return
+			}
+		}
+	}()
+
+	// ensure watcher request created a new watcher
+	cresp, err = wStream.Recv()
+	if err != nil {
+		t.Fatalf("wStream.Recv error: %v", err)
+	}
+	if cresp.Header.Revision != wrev {
+		t.Fatalf("revision = %d, want %d", cresp.Header.Revision, wrev)
+	}
+	if len(cresp.Events) != 1 {
+		t.Fatalf("failed to receive events")
+	}
+	if cresp.Events[0].Kv.ModRevision != wrev {
+		t.Errorf("mod revision = %d, want %d", cresp.Events[0].Kv.ModRevision, wrev)
+	}
+}
+
 // TestV3WatchCancelSynced tests Watch APIs cancellation from synced map.
 func TestV3WatchCancelSynced(t *testing.T) {
 	defer testutil.AfterTest(t)
@@ -859,36 +923,6 @@ func waitResponse(wc pb.Watch_WatchClient, timeout time.Duration) (bool, *pb.Wat
 	return true, nil
 }
 
-// TestV3WatchFutureRevision ensures invalid future revision to Watch APIs
-// returns WatchResponse of true Created and true Canceled.
-func TestV3WatchInvalidFutureRevision(t *testing.T) {
-	defer testutil.AfterTest(t)
-	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
-	defer clus.Terminate(t)
-
-	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
-	defer cancel()
-	wStream, wErr := toGRPC(clus.RandClient()).Watch.Watch(ctx)
-	if wErr != nil {
-		t.Fatalf("wAPI.Watch error: %v", wErr)
-	}
-
-	wreq := &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{
-		CreateRequest: &pb.WatchCreateRequest{Key: []byte("foo"), StartRevision: 100}}}
-	if err := wStream.Send(wreq); err != nil {
-		t.Fatalf("watch request failed (%v)", err)
-	}
-
-	resp, err := wStream.Recv()
-	if err != nil {
-		t.Errorf("wStream.Recv error: %v", err)
-	}
-	if resp.WatchId != -1 || !resp.Created || !resp.Canceled || len(resp.Events) != 0 {
-		t.Errorf("invalid start-rev expected -1, true, true, 0, but got %d, %v, %v, %d",
-			resp.WatchId, resp.Created, resp.Canceled, len(resp.Events))
-	}
-}
-
 func TestWatchWithProgressNotify(t *testing.T) {
 	testInterval := 3 * time.Second
 	pi := v3rpc.ProgressReportInterval

+ 8 - 6
storage/watchable_store.go

@@ -189,12 +189,12 @@ func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch c
 	synced := startRev > s.store.currentRev.main || startRev == 0
 	if synced {
 		wa.cur = s.store.currentRev.main + 1
+		if startRev > wa.cur {
+			wa.cur = startRev
+		}
 	}
 	s.store.mu.Unlock()
 	if synced {
-		if startRev > wa.cur {
-			panic("can't watch past sync revision")
-		}
 		s.synced.add(wa)
 	} else {
 		slowWatcherGauge.Inc()
@@ -368,9 +368,11 @@ type watcher struct {
 	// end indicates the end of the range to watch.
 	// If end is set, the watcher is on a range.
 	end []byte
-	// cur is the current watcher revision.
-	// If cur is behind the current revision of the KV,
-	// watcher is unsynced and needs to catch up.
+
+	// cur is the current watcher revision of a unsynced watcher.
+	// cur will be updated for unsynced watcher while it is catching up.
+	// cur is startRev of a synced watcher.
+	// cur will not be updated for synced watcher.
 	cur int64
 	id  WatchID
 

+ 39 - 0
storage/watchable_store_test.go

@@ -255,6 +255,45 @@ func TestWatchCompacted(t *testing.T) {
 	}
 }
 
+func TestWatchFutureRev(t *testing.T) {
+	b, tmpPath := backend.NewDefaultTmpBackend()
+	s := newWatchableStore(b, &lease.FakeLessor{})
+
+	defer func() {
+		s.store.Close()
+		os.Remove(tmpPath)
+	}()
+
+	testKey := []byte("foo")
+	testValue := []byte("bar")
+
+	w := s.NewWatchStream()
+	wrev := int64(10)
+	w.Watch(testKey, nil, wrev)
+
+	for i := 0; i < 10; i++ {
+		rev := s.Put(testKey, testValue, lease.NoLease)
+		if rev >= wrev {
+			break
+		}
+	}
+
+	select {
+	case resp := <-w.Chan():
+		if resp.Revision != wrev {
+			t.Fatalf("rev = %d, want %d", resp.Revision, wrev)
+		}
+		if len(resp.Events) != 1 {
+			t.Fatalf("failed to get events from the response")
+		}
+		if resp.Events[0].Kv.ModRevision != wrev {
+			t.Fatalf("kv.rev = %d, want %d", resp.Events[0].Kv.ModRevision, wrev)
+		}
+	case <-time.After(time.Second):
+		t.Fatal("failed to receive event in 1 second.")
+	}
+}
+
 // TestWatchBatchUnsynced tests batching on unsynced watchers
 func TestWatchBatchUnsynced(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()