Browse Source

Merge pull request #6064 from heyitsanthony/clientv3-watch-filter

clientv3: watch filters
Anthony Romano 9 years ago
parent
commit
72eb2d8893
3 changed files with 66 additions and 0 deletions
  1. 36 0
      clientv3/integration/watch_test.go
  2. 17 0
      clientv3/op.go
  3. 13 0
      clientv3/watch.go

+ 36 - 0
clientv3/integration/watch_test.go

@@ -673,3 +673,39 @@ func TestWatchWithRequireLeader(t *testing.T) {
 		t.Fatalf("expected response, got closed channel")
 	}
 }
+
+// TestWatchWithFilter checks that watch filtering works.
+func TestWatchWithFilter(t *testing.T) {
+	cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
+	defer cluster.Terminate(t)
+
+	client := cluster.RandClient()
+	ctx := context.Background()
+
+	wcNoPut := client.Watch(ctx, "a", clientv3.WithFilterPut())
+	wcNoDel := client.Watch(ctx, "a", clientv3.WithFilterDelete())
+
+	if _, err := client.Put(ctx, "a", "abc"); err != nil {
+		t.Fatal(err)
+	}
+	if _, err := client.Delete(ctx, "a"); err != nil {
+		t.Fatal(err)
+	}
+
+	npResp := <-wcNoPut
+	if len(npResp.Events) != 1 || npResp.Events[0].Type != clientv3.EventTypeDelete {
+		t.Fatalf("expected delete event, got %+v", npResp.Events)
+	}
+	ndResp := <-wcNoDel
+	if len(ndResp.Events) != 1 || ndResp.Events[0].Type != clientv3.EventTypePut {
+		t.Fatalf("expected put event, got %+v", ndResp.Events)
+	}
+
+	select {
+	case resp := <-wcNoPut:
+		t.Fatalf("unexpected event on filtered put (%+v)", resp)
+	case resp := <-wcNoDel:
+		t.Fatalf("unexpected event on filtered delete (%+v)", resp)
+	case <-time.After(100 * time.Millisecond):
+	}
+}

+ 17 - 0
clientv3/op.go

@@ -50,6 +50,9 @@ type Op struct {
 
 	// progressNotify is for progress updates.
 	progressNotify bool
+	// filters for watchers
+	filterPut    bool
+	filterDelete bool
 
 	// for put
 	val     []byte
@@ -111,6 +114,8 @@ func OpDelete(key string, opts ...OpOption) Op {
 		panic("unexpected serializable in delete")
 	case ret.countOnly:
 		panic("unexpected countOnly in delete")
+	case ret.filterDelete, ret.filterPut:
+		panic("unexpected filter in delete")
 	}
 	return ret
 }
@@ -131,6 +136,8 @@ func OpPut(key, val string, opts ...OpOption) Op {
 		panic("unexpected serializable in put")
 	case ret.countOnly:
 		panic("unexpected countOnly in put")
+	case ret.filterDelete, ret.filterPut:
+		panic("unexpected filter in put")
 	}
 	return ret
 }
@@ -274,6 +281,16 @@ func WithProgressNotify() OpOption {
 	}
 }
 
+// WithFilterPut discards PUT events from the watcher.
+func WithFilterPut() OpOption {
+	return func(op *Op) { op.filterPut = true }
+}
+
+// WithFilterDelete discards DELETE events from the watcher.
+func WithFilterDelete() OpOption {
+	return func(op *Op) { op.filterDelete = true }
+}
+
 // WithPrevKV gets the previous key-value pair before the event happens. If the previous KV is already compacted,
 // nothing will be returned.
 func WithPrevKV() OpOption {

+ 13 - 0
clientv3/watch.go

@@ -140,6 +140,8 @@ type watchRequest struct {
 	rev int64
 	// progressNotify is for progress updates
 	progressNotify bool
+	// filters is the list of events to filter out
+	filters []pb.WatchCreateRequest_FilterType
 	// get the previous key-value pair before the event happens
 	prevKV bool
 	// retc receives a chan WatchResponse once the watcher is established
@@ -210,12 +212,22 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
 	ow := opWatch(key, opts...)
 
 	retc := make(chan chan WatchResponse, 1)
+
+	var filters []pb.WatchCreateRequest_FilterType
+	if ow.filterPut {
+		filters = append(filters, pb.WatchCreateRequest_NOPUT)
+	}
+	if ow.filterDelete {
+		filters = append(filters, pb.WatchCreateRequest_NODELETE)
+	}
+
 	wr := &watchRequest{
 		ctx:            ctx,
 		key:            string(ow.key),
 		end:            string(ow.end),
 		rev:            ow.rev,
 		progressNotify: ow.progressNotify,
+		filters:        filters,
 		prevKV:         ow.prevKV,
 		retc:           retc,
 	}
@@ -690,6 +702,7 @@ func (wr *watchRequest) toPB() *pb.WatchRequest {
 		Key:            []byte(wr.key),
 		RangeEnd:       []byte(wr.end),
 		ProgressNotify: wr.progressNotify,
+		Filters:        wr.filters,
 		PrevKv:         wr.prevKV,
 	}
 	cr := &pb.WatchRequest_CreateRequest{CreateRequest: req}