Pārlūkot izejas kodu

Merge pull request #6849 from heyitsanthony/proxy-fix-watch-create

grpcproxy: don't send extra watch create events
Anthony Romano 9 gadi atpakaļ
vecāks
revīzija
d073512def

+ 10 - 3
proxy/grpcproxy/watch_broadcast.go

@@ -54,13 +54,20 @@ func newWatchBroadcast(wp *watchProxy, w *watcher, update func(*watchBroadcast))
 		defer close(wb.donec)
 		// loop because leader loss will close channel
 		for cctx.Err() == nil {
-			wch := wp.cw.Watch(cctx, w.wr.key,
+			opts := []clientv3.OpOption{
 				clientv3.WithRange(w.wr.end),
 				clientv3.WithProgressNotify(),
-				clientv3.WithCreatedNotify(),
 				clientv3.WithRev(wb.nextrev),
 				clientv3.WithPrevKV(),
-			)
+			}
+			// The create notification should be the first response;
+			// if the watch is recreated following leader loss, it
+			// shouldn't post a second create response to the client.
+			if wb.responses == 0 {
+				opts = append(opts, clientv3.WithCreatedNotify())
+			}
+			wch := wp.cw.Watch(cctx, w.wr.key, opts...)
+
 			for wr := range wch {
 				wb.bcast(wr)
 				update(wb)

+ 4 - 2
proxy/grpcproxy/watch_broadcasts.go

@@ -60,8 +60,10 @@ func (wbs *watchBroadcasts) coalesce(wb *watchBroadcast) {
 			continue
 		}
 		wbswb.mu.Lock()
-		// NB: victim lock already held
-		if wb.nextrev >= wbswb.nextrev && wbswb.nextrev != 0 {
+		// 1. check if wbswb is behind wb so it won't skip any events in wb
+		// 2. ensure wbswb started; nextrev == 0 may mean wbswb is waiting
+		// for a current watcher and expects a create event from the server.
+		if wb.nextrev >= wbswb.nextrev && wbswb.responses > 0 {
 			for w := range wb.receivers {
 				wbswb.receivers[w] = struct{}{}
 				wbs.watchers[w] = wbswb