Browse Source

grpcproxy: handle create event

Xiang Li 9 years ago
parent
commit
57c68ab1db
4 changed files with 10 additions and 15 deletions
  1. 1 1
      clientv3/watch.go
  2. 1 12
      proxy/grpcproxy/watch.go
  3. 2 1
      proxy/grpcproxy/watcher.go
  4. 6 1
      proxy/grpcproxy/watcher_groups.go

+ 1 - 1
clientv3/watch.go

@@ -92,7 +92,7 @@ func (wr *WatchResponse) Err() error {
 
 // IsProgressNotify returns true if the WatchResponse is progress notification.
 func (wr *WatchResponse) IsProgressNotify() bool {
-	return len(wr.Events) == 0 && !wr.Canceled
+	return len(wr.Events) == 0 && !wr.Canceled && !wr.Created
 }
 
 // watcher implements the Watcher interface

+ 1 - 12
proxy/grpcproxy/watch.go

@@ -123,19 +123,7 @@ func (sws *serverWatchStream) recvLoop() error {
 			} else {
 				sws.addCoalescedWatcher(watcher)
 			}
-
-			wresp := &pb.WatchResponse{
-				Header:  &pb.ResponseHeader{}, // TODO: fill in header
-				WatchId: sws.nextWatcherID,
-				Created: true,
-			}
-
 			sws.nextWatcherID++
-			select {
-			case sws.ctrlCh <- wresp:
-			default:
-				panic("handle this")
-			}
 
 		case *pb.WatchRequest_CancelRequest:
 			sws.removeWatcher(uv.CancelRequest.WatchId)
@@ -185,6 +173,7 @@ func (sws *serverWatchStream) addDedicatedWatcher(w watcher, rev int64) {
 		w.wr.key, clientv3.WithRange(w.wr.end),
 		clientv3.WithRev(rev),
 		clientv3.WithProgressNotify(),
+		clientv3.WithCreatedNotify(),
 	)
 
 	ws := newWatcherSingle(wch, cancel, w, sws)

+ 2 - 1
proxy/grpcproxy/watcher.go

@@ -66,12 +66,13 @@ func (w *watcher) send(wr clientv3.WatchResponse) {
 	}
 
 	// all events are filtered out?
-	if !wr.IsProgressNotify() && len(events) == 0 {
+	if !wr.IsProgressNotify() && !wr.Created && len(events) == 0 {
 		return
 	}
 
 	pbwr := &pb.WatchResponse{
 		Header:  &wr.Header,
+		Created: wr.Created,
 		WatchId: w.id,
 		Events:  events,
 	}

+ 6 - 1
proxy/grpcproxy/watcher_groups.go

@@ -42,7 +42,12 @@ func (wgs *watchergroups) addWatcher(rid receiverID, w watcher) {
 
 	ctx, cancel := context.WithCancel(context.Background())
 
-	wch := wgs.cw.Watch(ctx, w.wr.key, clientv3.WithRange(w.wr.end), clientv3.WithProgressNotify())
+	wch := wgs.cw.Watch(ctx, w.wr.key,
+		clientv3.WithRange(w.wr.end),
+		clientv3.WithProgressNotify(),
+		clientv3.WithCreatedNotify(),
+	)
+
 	watchg := newWatchergroup(wch, cancel)
 	watchg.add(rid, w)
 	go watchg.run()