Browse Source

grpcproxy: add filter to watcher

Xiang Li 9 years ago
parent
commit
58aa3483c3
3 changed files with 45 additions and 15 deletions
  1. 15 10
      etcdserver/api/v3rpc/watch.go
  2. 2 0
      proxy/grpcproxy/watch.go
  3. 28 5
      proxy/grpcproxy/watcher.go

+ 15 - 10
etcdserver/api/v3rpc/watch.go

@@ -172,16 +172,7 @@ func (sws *serverWatchStream) recvLoop() error {
 				// support  >= key queries
 				creq.RangeEnd = []byte{}
 			}
-			filters := make([]mvcc.FilterFunc, 0, len(creq.Filters))
-			for _, ft := range creq.Filters {
-				switch ft {
-				case pb.WatchCreateRequest_NOPUT:
-					filters = append(filters, filterNoPut)
-				case pb.WatchCreateRequest_NODELETE:
-					filters = append(filters, filterNoDelete)
-				default:
-				}
-			}
+			filters := FiltersFromRequest(creq)
 
 			wsrev := sws.watchStream.Rev()
 			rev := creq.StartRevision
@@ -372,3 +363,17 @@ func filterNoDelete(e mvccpb.Event) bool {
 func filterNoPut(e mvccpb.Event) bool {
 	return e.Type == mvccpb.PUT
 }
+
+func FiltersFromRequest(creq *pb.WatchCreateRequest) []mvcc.FilterFunc {
+	filters := make([]mvcc.FilterFunc, 0, len(creq.Filters))
+	for _, ft := range creq.Filters {
+		switch ft {
+		case pb.WatchCreateRequest_NOPUT:
+			filters = append(filters, filterNoPut)
+		case pb.WatchCreateRequest_NODELETE:
+			filters = append(filters, filterNoDelete)
+		default:
+		}
+	}
+	return filters
+}

+ 2 - 0
proxy/grpcproxy/watch.go

@@ -21,6 +21,7 @@ import (
 	"golang.org/x/net/context"
 
 	"github.com/coreos/etcd/clientv3"
+	"github.com/coreos/etcd/etcdserver/api/v3rpc"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 )
 
@@ -104,6 +105,7 @@ func (sws *serverWatchStream) recvLoop() error {
 				ch: sws.watchCh,
 
 				progress: cr.ProgressNotify,
+				filters:  v3rpc.FiltersFromRequest(cr),
 			}
 			if cr.StartRevision != 0 {
 				sws.addDedicatedWatcher(watcher, cr.StartRevision)

+ 28 - 5
proxy/grpcproxy/watcher.go

@@ -17,6 +17,7 @@ package grpcproxy
 import (
 	"github.com/coreos/etcd/clientv3"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	"github.com/coreos/etcd/mvcc"
 	"github.com/coreos/etcd/mvcc/mvccpb"
 )
 
@@ -27,7 +28,8 @@ type watchRange struct {
 type watcher struct {
 	id int64
 	wr watchRange
-	// TODO: support filter
+
+	filters  []mvcc.FilterFunc
 	progress bool
 	ch       chan<- *pb.WatchResponse
 }
@@ -39,11 +41,32 @@ func (w *watcher) send(wr clientv3.WatchResponse) {
 
 	// todo: filter out the events that this watcher already seen.
 
-	evs := wr.Events
-	events := make([]*mvccpb.Event, len(evs))
-	for i := range evs {
-		events[i] = (*mvccpb.Event)(evs[i])
+	events := make([]*mvccpb.Event, 0, len(wr.Events))
+
+	for i := range wr.Events {
+		filtered := false
+
+		ev := (*mvccpb.Event)(wr.Events[i])
+
+		if len(w.filters) != 0 {
+			for _, filter := range w.filters {
+				if filter(*ev) {
+					filtered = true
+					break
+				}
+			}
+		}
+
+		if !filtered {
+			events = append(events, ev)
+		}
 	}
+
+	// all events are filtered out?
+	if !wr.IsProgressNotify() && len(events) == 0 {
+		return
+	}
+
 	pbwr := &pb.WatchResponse{
 		Header:  &wr.Header,
 		WatchId: w.id,