Browse Source

clientv3: add WithProgressNotify

Client side for https://github.com/coreos/etcd/issues/4628.
Gyu-Ho Lee 9 years ago
parent
commit
27316196d8
3 changed files with 85 additions and 9 deletions
  1. 56 0
      clientv3/integration/watch_test.go
  2. 11 0
      clientv3/op.go
  3. 18 9
      clientv3/watch.go

+ 56 - 0
clientv3/integration/watch_test.go

@@ -396,3 +396,59 @@ func TestWatchCompactRevision(t *testing.T) {
 		t.Fatalf("expected closed channel, but got %v", wresp)
 	}
 }
+
+func TestWatchWithProgressNotify(t *testing.T)        { testWatchWithProgressNotify(t, true) }
+func TestWatchWithProgressNotifyNoEvent(t *testing.T) { testWatchWithProgressNotify(t, false) }
+
+func testWatchWithProgressNotify(t *testing.T, watchOnPut bool) {
+	defer testutil.AfterTest(t)
+
+	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
+	defer clus.Terminate(t)
+
+	wc := clientv3.NewWatcher(clus.RandClient())
+	defer wc.Close()
+
+	testInterval := 3 * time.Second
+	pi := v3rpc.ProgressReportInterval
+	v3rpc.ProgressReportInterval = testInterval
+	defer func() { v3rpc.ProgressReportInterval = pi }()
+
+	opts := []clientv3.OpOption{clientv3.WithProgressNotify()}
+	if watchOnPut {
+		opts = append(opts, clientv3.WithPrefix())
+	}
+	rch := wc.Watch(context.Background(), "foo", opts...)
+
+	select {
+	case resp := <-rch: // wait for notification
+		if len(resp.Events) != 0 {
+			t.Fatalf("resp.Events expected none, got %+v", resp.Events)
+		}
+	case <-time.After(2 * pi):
+		t.Fatalf("watch response expected in %v, but timed out", pi)
+	}
+
+	kvc := clientv3.NewKV(clus.RandClient())
+	if _, err := kvc.Put(context.TODO(), "foox", "bar"); err != nil {
+		t.Fatal(err)
+	}
+
+	select {
+	case resp := <-rch:
+		if resp.Header.Revision != 2 {
+			t.Fatalf("resp.Header.Revision expected 2, got %d", resp.Header.Revision)
+		}
+		if watchOnPut { // wait for put if watch on the put key
+			ev := []*storagepb.Event{{Type: storagepb.PUT,
+				Kv: &storagepb.KeyValue{Key: []byte("foox"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 2, Version: 1}}}
+			if !reflect.DeepEqual(ev, resp.Events) {
+				t.Fatalf("expected %+v, got %+v", ev, resp.Events)
+			}
+		} else if len(resp.Events) != 0 { // wait for notification otherwise
+			t.Fatalf("expected no events, but got %+v", resp.Events)
+		}
+	case <-time.After(2 * pi):
+		t.Fatalf("watch response expected in %v, but timed out", pi)
+	}
+}

+ 11 - 0
clientv3/op.go

@@ -42,6 +42,9 @@ type Op struct {
 	// for range, watch
 	rev int64
 
+	// progressNotify is for progress updates.
+	progressNotify bool
+
 	// for put
 	val     []byte
 	leaseID lease.LeaseID
@@ -225,3 +228,11 @@ func WithLastRev() []OpOption { return withTop(SortByModifiedRev, SortDescend) }
 func withTop(target SortTarget, order SortOrder) []OpOption {
 	return []OpOption{WithPrefix(), WithSort(target, order), WithLimit(1)}
 }
+
+// WithProgressNotify makes watch server send periodic progress updates.
+// Progress updates have zero events in WatchResponse.
+func WithProgressNotify() OpOption {
+	return func(op *Op) {
+		op.progressNotify = true
+	}
+}

+ 18 - 9
clientv3/watch.go

@@ -94,6 +94,8 @@ type watchRequest struct {
 	key string
 	end string
 	rev int64
+	// progressNotify is for progress updates.
+	progressNotify bool
 	// retc receives a chan WatchResponse once the watcher is established
 	retc chan chan WatchResponse
 }
@@ -143,11 +145,12 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
 
 	retc := make(chan chan WatchResponse, 1)
 	wr := &watchRequest{
-		ctx:  ctx,
-		key:  string(ow.key),
-		end:  string(ow.end),
-		rev:  ow.rev,
-		retc: retc,
+		ctx:            ctx,
+		key:            string(ow.key),
+		end:            string(ow.end),
+		rev:            ow.rev,
+		progressNotify: ow.progressNotify,
+		retc:           retc,
 	}
 
 	ok := false
@@ -392,7 +395,12 @@ func (w *watcher) serveStream(ws *watcherStream) {
 				closing = true
 				break
 			}
-			newRev := wrs[0].Events[len(wrs[0].Events)-1].Kv.ModRevision
+			var newRev int64
+			if len(wrs[0].Events) > 0 {
+				newRev = wrs[0].Events[len(wrs[0].Events)-1].Kv.ModRevision
+			} else {
+				newRev = wrs[0].Header.Revision
+			}
 			if newRev != ws.lastRev {
 				ws.lastRev = newRev
 			}
@@ -518,9 +526,10 @@ func (w *watcher) resumeWatchers(wc pb.Watch_WatchClient) error {
 // toPB converts an internal watch request structure to its protobuf messagefunc (wr *watchRequest)
 func (wr *watchRequest) toPB() *pb.WatchRequest {
 	req := &pb.WatchCreateRequest{
-		StartRevision: wr.rev,
-		Key:           []byte(wr.key),
-		RangeEnd:      []byte(wr.end),
+		StartRevision:  wr.rev,
+		Key:            []byte(wr.key),
+		RangeEnd:       []byte(wr.end),
+		ProgressNotify: wr.progressNotify,
 	}
 	cr := &pb.WatchRequest_CreateRequest{CreateRequest: req}
 	return &pb.WatchRequest{RequestUnion: cr}