Browse Source

clientv3: support watch filters

Anthony Romano 9 years ago
parent
commit
943fe70178
2 changed files with 30 additions and 0 deletions
  1. 17 0
      clientv3/op.go
  2. 13 0
      clientv3/watch.go

+ 17 - 0
clientv3/op.go

@@ -50,6 +50,9 @@ type Op struct {
 
 
 	// progressNotify is for progress updates.
 	// progressNotify is for progress updates.
 	progressNotify bool
 	progressNotify bool
+	// filters for watchers
+	filterPut    bool
+	filterDelete bool
 
 
 	// for put
 	// for put
 	val     []byte
 	val     []byte
@@ -111,6 +114,8 @@ func OpDelete(key string, opts ...OpOption) Op {
 		panic("unexpected serializable in delete")
 		panic("unexpected serializable in delete")
 	case ret.countOnly:
 	case ret.countOnly:
 		panic("unexpected countOnly in delete")
 		panic("unexpected countOnly in delete")
+	case ret.filterDelete, ret.filterPut:
+		panic("unexpected filter in delete")
 	}
 	}
 	return ret
 	return ret
 }
 }
@@ -131,6 +136,8 @@ func OpPut(key, val string, opts ...OpOption) Op {
 		panic("unexpected serializable in put")
 		panic("unexpected serializable in put")
 	case ret.countOnly:
 	case ret.countOnly:
 		panic("unexpected countOnly in put")
 		panic("unexpected countOnly in put")
+	case ret.filterDelete, ret.filterPut:
+		panic("unexpected filter in put")
 	}
 	}
 	return ret
 	return ret
 }
 }
@@ -274,6 +281,16 @@ func WithProgressNotify() OpOption {
 	}
 	}
 }
 }
 
 
+// WithFilterPut discards PUT events from the watcher.
+func WithFilterPut() OpOption {
+	return func(op *Op) { op.filterPut = true }
+}
+
+// WithFilterDelete discards DELETE events from the watcher.
+func WithFilterDelete() OpOption {
+	return func(op *Op) { op.filterDelete = true }
+}
+
 // WithPrevKV gets the previous key-value pair before the event happens. If the previous KV is already compacted,
 // WithPrevKV gets the previous key-value pair before the event happens. If the previous KV is already compacted,
 // nothing will be returned.
 // nothing will be returned.
 func WithPrevKV() OpOption {
 func WithPrevKV() OpOption {

+ 13 - 0
clientv3/watch.go

@@ -140,6 +140,8 @@ type watchRequest struct {
 	rev int64
 	rev int64
 	// progressNotify is for progress updates
 	// progressNotify is for progress updates
 	progressNotify bool
 	progressNotify bool
+	// filters is the list of events to filter out
+	filters []pb.WatchCreateRequest_FilterType
 	// get the previous key-value pair before the event happens
 	// get the previous key-value pair before the event happens
 	prevKV bool
 	prevKV bool
 	// retc receives a chan WatchResponse once the watcher is established
 	// retc receives a chan WatchResponse once the watcher is established
@@ -210,12 +212,22 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
 	ow := opWatch(key, opts...)
 	ow := opWatch(key, opts...)
 
 
 	retc := make(chan chan WatchResponse, 1)
 	retc := make(chan chan WatchResponse, 1)
+
+	var filters []pb.WatchCreateRequest_FilterType
+	if ow.filterPut {
+		filters = append(filters, pb.WatchCreateRequest_NOPUT)
+	}
+	if ow.filterDelete {
+		filters = append(filters, pb.WatchCreateRequest_NODELETE)
+	}
+
 	wr := &watchRequest{
 	wr := &watchRequest{
 		ctx:            ctx,
 		ctx:            ctx,
 		key:            string(ow.key),
 		key:            string(ow.key),
 		end:            string(ow.end),
 		end:            string(ow.end),
 		rev:            ow.rev,
 		rev:            ow.rev,
 		progressNotify: ow.progressNotify,
 		progressNotify: ow.progressNotify,
+		filters:        filters,
 		prevKV:         ow.prevKV,
 		prevKV:         ow.prevKV,
 		retc:           retc,
 		retc:           retc,
 	}
 	}
@@ -690,6 +702,7 @@ func (wr *watchRequest) toPB() *pb.WatchRequest {
 		Key:            []byte(wr.key),
 		Key:            []byte(wr.key),
 		RangeEnd:       []byte(wr.end),
 		RangeEnd:       []byte(wr.end),
 		ProgressNotify: wr.progressNotify,
 		ProgressNotify: wr.progressNotify,
+		Filters:        wr.filters,
 		PrevKv:         wr.prevKV,
 		PrevKv:         wr.prevKV,
 	}
 	}
 	cr := &pb.WatchRequest_CreateRequest{CreateRequest: req}
 	cr := &pb.WatchRequest_CreateRequest{CreateRequest: req}