Browse Source

grpcproxy: don't use WithRequireLeader for watch event stream

Ohterwise leader loss will reject all stream creation.
Anthony Romano 9 years ago
parent
commit
e42fa18ccf
2 changed files with 17 additions and 23 deletions
  1. 3 2
      proxy/grpcproxy/watch.go
  2. 14 21
      proxy/grpcproxy/watch_broadcast.go

+ 3 - 2
proxy/grpcproxy/watch.go

@@ -52,7 +52,7 @@ const (
 func NewWatchProxy(c *clientv3.Client) (pb.WatchServer, <-chan struct{}) {
 	wp := &watchProxy{
 		cw:           c.Watcher,
-		ctx:          clientv3.WithRequireLeader(c.Ctx()),
+		ctx:          c.Ctx(),
 		retryLimiter: rate.NewLimiter(rate.Limit(retryPerSecond), retryPerSecond),
 		leaderc:      make(chan struct{}),
 	}
@@ -63,8 +63,9 @@ func NewWatchProxy(c *clientv3.Client) (pb.WatchServer, <-chan struct{}) {
 		// a new streams without opening any watchers won't catch
 		// a lost leader event, so have a special watch to monitor it
 		rev := int64((uint64(1) << 63) - 2)
+		lctx := clientv3.WithRequireLeader(wp.ctx)
 		for wp.ctx.Err() == nil {
-			wch := wp.cw.Watch(wp.ctx, lostLeaderKey, clientv3.WithRev(rev))
+			wch := wp.cw.Watch(lctx, lostLeaderKey, clientv3.WithRev(rev))
 			for range wch {
 			}
 			wp.mu.Lock()

+ 14 - 21
proxy/grpcproxy/watch_broadcast.go

@@ -50,27 +50,20 @@ func newWatchBroadcast(wp *watchProxy, w *watcher, update func(*watchBroadcast))
 	wb.add(w)
 	go func() {
 		defer close(wb.donec)
-		// loop because leader loss will close channel
-		for cctx.Err() == nil {
-			opts := []clientv3.OpOption{
-				clientv3.WithRange(w.wr.end),
-				clientv3.WithProgressNotify(),
-				clientv3.WithRev(wb.nextrev),
-				clientv3.WithPrevKV(),
-			}
-			// The create notification should be the first response;
-			// if the watch is recreated following leader loss, it
-			// shouldn't post a second create response to the client.
-			if wb.responses == 0 {
-				opts = append(opts, clientv3.WithCreatedNotify())
-			}
-			wch := wp.cw.Watch(cctx, w.wr.key, opts...)
-
-			for wr := range wch {
-				wb.bcast(wr)
-				update(wb)
-			}
-			wp.retryLimiter.Wait(cctx)
+
+		opts := []clientv3.OpOption{
+			clientv3.WithRange(w.wr.end),
+			clientv3.WithProgressNotify(),
+			clientv3.WithRev(wb.nextrev),
+			clientv3.WithPrevKV(),
+			clientv3.WithCreatedNotify(),
+		}
+
+		wch := wp.cw.Watch(cctx, w.wr.key, opts...)
+
+		for wr := range wch {
+			wb.bcast(wr)
+			update(wb)
 		}
 	}()
 	return wb