Browse Source

grpcproxy: fix more issues in watch path

Xiang Li 9 years ago
parent
commit
8cd47c4348
3 changed files with 29 additions and 3 deletions
  1. 9 1
      proxy/grpcproxy/watcher.go
  2. 4 1
      proxy/grpcproxy/watcher_group.go
  3. 16 1
      proxy/grpcproxy/watcher_groups.go

+ 9 - 1
proxy/grpcproxy/watcher.go

@@ -42,12 +42,16 @@ func (w *watcher) send(wr clientv3.WatchResponse) {
 
 	events := make([]*mvccpb.Event, 0, len(wr.Events))
 
+	var lastRev int64
 	for i := range wr.Events {
 		ev := (*mvccpb.Event)(wr.Events[i])
 		if ev.Kv.ModRevision <= w.rev {
 			continue
 		} else {
-			w.rev = ev.Kv.ModRevision
+			// We cannot update w.rev here.
+			// txn can have multiple events with the same rev.
+			// If we update w.rev here, we would skip some events in the same txn.
+			lastRev = ev.Kv.ModRevision
 		}
 
 		filtered := false
@@ -65,6 +69,10 @@ func (w *watcher) send(wr clientv3.WatchResponse) {
 		}
 	}
 
+	if lastRev > w.rev {
+		w.rev = lastRev
+	}
+
 	// all events are filtered out?
 	if !wr.IsProgressNotify() && !wr.Created && len(events) == 0 {
 		return

+ 4 - 1
proxy/grpcproxy/watcher_group.go

@@ -67,10 +67,13 @@ func (wg *watcherGroup) broadcast(wr clientv3.WatchResponse) {
 	}
 }
 
-func (wg *watcherGroup) add(rid receiverID, w watcher) {
+// add adds the watcher into the group with given ID.
+// The current revision of the watcherGroup is returned.
+func (wg *watcherGroup) add(rid receiverID, w watcher) int64 {
 	wg.mu.Lock()
 	defer wg.mu.Unlock()
 	wg.receivers[rid] = w
+	return wg.rev
 }
 
 func (wg *watcherGroup) delete(rid receiverID) {

+ 16 - 1
proxy/grpcproxy/watcher_groups.go

@@ -18,6 +18,8 @@ import (
 	"sync"
 
 	"github.com/coreos/etcd/clientv3"
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+
 	"golang.org/x/net/context"
 )
 
@@ -38,8 +40,21 @@ func (wgs *watchergroups) addWatcher(rid receiverID, w watcher) {
 	groups := wgs.groups
 
 	if wg, ok := groups[w.wr]; ok {
-		wg.add(rid, w)
+		rev := wg.add(rid, w)
 		wgs.idToGroup[rid] = wg
+
+		resp := &pb.WatchResponse{
+			Header: &pb.ResponseHeader{
+				// todo: fill in ClusterId
+				// todo: fill in MemberId:
+				Revision: rev,
+				// todo: fill in RaftTerm:
+			},
+			WatchId: rid.watcherID,
+			Created: true,
+		}
+		w.ch <- resp
+
 		return
 	}