|
|
@@ -70,6 +70,9 @@ type Watcher interface {
|
|
|
// (see https://github.com/coreos/etcd/issues/8980)
|
|
|
Watch(ctx context.Context, key string, opts ...OpOption) WatchChan
|
|
|
|
|
|
+ // RequestProgress requests a progress notify response be sent in all watch channels.
|
|
|
+ RequestProgress(ctx context.Context) error
|
|
|
+
|
|
|
// Close closes the watcher and cancels all watch requests.
|
|
|
Close() error
|
|
|
}
|
|
|
@@ -156,7 +159,7 @@ type watchGrpcStream struct {
|
|
|
resuming []*watcherStream
|
|
|
|
|
|
// reqc sends a watch request from Watch() to the main goroutine
|
|
|
- reqc chan *watchRequest
|
|
|
+ reqc chan watchStreamRequest
|
|
|
// respc receives data from the watch client
|
|
|
respc chan *pb.WatchResponse
|
|
|
// donec closes to broadcast shutdown
|
|
|
@@ -174,6 +177,11 @@ type watchGrpcStream struct {
|
|
|
closeErr error
|
|
|
}
|
|
|
|
|
|
+// watchStreamRequest is a union of the supported watch request operation types
|
|
|
+type watchStreamRequest interface {
|
|
|
+ toPB() *pb.WatchRequest
|
|
|
+}
|
|
|
+
|
|
|
// watchRequest is issued by the subscriber to start a new watcher
|
|
|
type watchRequest struct {
|
|
|
ctx context.Context
|
|
|
@@ -198,6 +206,10 @@ type watchRequest struct {
|
|
|
retc chan chan WatchResponse
|
|
|
}
|
|
|
|
|
|
+// progressRequest is issued by the subscriber to request watch progress
|
|
|
+type progressRequest struct {
|
|
|
+}
|
|
|
+
|
|
|
// watcherStream represents a registered watcher
|
|
|
type watcherStream struct {
|
|
|
// initReq is the request that initiated this request
|
|
|
@@ -255,7 +267,7 @@ func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
|
|
|
cancel: cancel,
|
|
|
substreams: make(map[int64]*watcherStream),
|
|
|
respc: make(chan *pb.WatchResponse),
|
|
|
- reqc: make(chan *watchRequest),
|
|
|
+ reqc: make(chan watchStreamRequest),
|
|
|
donec: make(chan struct{}),
|
|
|
errc: make(chan error, 1),
|
|
|
closingc: make(chan *watcherStream),
|
|
|
@@ -361,6 +373,42 @@ func (w *watcher) Close() (err error) {
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
+// RequestProgress requests a progress notify response be sent in all watch channels.
|
|
|
+func (w *watcher) RequestProgress(ctx context.Context) (err error) {
|
|
|
+ ctxKey := streamKeyFromCtx(ctx)
|
|
|
+
|
|
|
+ w.mu.Lock()
|
|
|
+ if w.streams == nil {
|
|
|
+ return fmt.Errorf("no stream found for context")
|
|
|
+ }
|
|
|
+ wgs := w.streams[ctxKey]
|
|
|
+ if wgs == nil {
|
|
|
+ wgs = w.newWatcherGrpcStream(ctx)
|
|
|
+ w.streams[ctxKey] = wgs
|
|
|
+ }
|
|
|
+ donec := wgs.donec
|
|
|
+ reqc := wgs.reqc
|
|
|
+ w.mu.Unlock()
|
|
|
+
|
|
|
+ pr := &progressRequest{}
|
|
|
+
|
|
|
+ select {
|
|
|
+ case reqc <- pr:
|
|
|
+ return nil
|
|
|
+ case <-ctx.Done():
|
|
|
+ if err == nil {
|
|
|
+ return ctx.Err()
|
|
|
+ }
|
|
|
+ return err
|
|
|
+ case <-donec:
|
|
|
+ if wgs.closeErr != nil {
|
|
|
+ return wgs.closeErr
|
|
|
+ }
|
|
|
+ // retry; may have dropped stream from no ctxs
|
|
|
+ return w.RequestProgress(ctx)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
func (w *watchGrpcStream) close() (err error) {
|
|
|
w.cancel()
|
|
|
<-w.donec
|
|
|
@@ -468,26 +516,31 @@ func (w *watchGrpcStream) run() {
|
|
|
for {
|
|
|
select {
|
|
|
// Watch() requested
|
|
|
- case wreq := <-w.reqc:
|
|
|
- outc := make(chan WatchResponse, 1)
|
|
|
- // TODO: pass custom watch ID?
|
|
|
- ws := &watcherStream{
|
|
|
- initReq: *wreq,
|
|
|
- id: -1,
|
|
|
- outc: outc,
|
|
|
- // unbuffered so resumes won't cause repeat events
|
|
|
- recvc: make(chan *WatchResponse),
|
|
|
- }
|
|
|
+ case req := <-w.reqc:
|
|
|
+ switch wreq := req.(type) {
|
|
|
+ case *watchRequest:
|
|
|
+ outc := make(chan WatchResponse, 1)
|
|
|
+ // TODO: pass custom watch ID?
|
|
|
+ ws := &watcherStream{
|
|
|
+ initReq: *wreq,
|
|
|
+ id: -1,
|
|
|
+ outc: outc,
|
|
|
+ // unbuffered so resumes won't cause repeat events
|
|
|
+ recvc: make(chan *WatchResponse),
|
|
|
+ }
|
|
|
|
|
|
- ws.donec = make(chan struct{})
|
|
|
- w.wg.Add(1)
|
|
|
- go w.serveSubstream(ws, w.resumec)
|
|
|
+ ws.donec = make(chan struct{})
|
|
|
+ w.wg.Add(1)
|
|
|
+ go w.serveSubstream(ws, w.resumec)
|
|
|
|
|
|
- // queue up for watcher creation/resume
|
|
|
- w.resuming = append(w.resuming, ws)
|
|
|
- if len(w.resuming) == 1 {
|
|
|
- // head of resume queue, can register a new watcher
|
|
|
- wc.Send(ws.initReq.toPB())
|
|
|
+ // queue up for watcher creation/resume
|
|
|
+ w.resuming = append(w.resuming, ws)
|
|
|
+ if len(w.resuming) == 1 {
|
|
|
+ // head of resume queue, can register a new watcher
|
|
|
+ wc.Send(ws.initReq.toPB())
|
|
|
+ }
|
|
|
+ case *progressRequest:
|
|
|
+ wc.Send(wreq.toPB())
|
|
|
}
|
|
|
|
|
|
// new events from the watch client
|
|
|
@@ -614,7 +667,31 @@ func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
|
|
|
Canceled: pbresp.Canceled,
|
|
|
cancelReason: pbresp.CancelReason,
|
|
|
}
|
|
|
- ws, ok := w.substreams[pbresp.WatchId]
|
|
|
+
|
|
|
+ // watch IDs are zero indexed, so request notify watch responses are assigned a watch ID of -1 to
|
|
|
+ // indicate they should be broadcast.
|
|
|
+ if wr.IsProgressNotify() && pbresp.WatchId == -1 {
|
|
|
+ return w.broadcastResponse(wr)
|
|
|
+ }
|
|
|
+
|
|
|
+ return w.unicastResponse(wr, pbresp.WatchId)
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
+// broadcastResponse send a watch response to all watch substreams.
|
|
|
+func (w *watchGrpcStream) broadcastResponse(wr *WatchResponse) bool {
|
|
|
+ for _, ws := range w.substreams {
|
|
|
+ select {
|
|
|
+ case ws.recvc <- wr:
|
|
|
+ case <-ws.donec:
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return true
|
|
|
+}
|
|
|
+
|
|
|
+// unicastResponse sends a watch response to a specific watch substream.
|
|
|
+func (w *watchGrpcStream) unicastResponse(wr *WatchResponse, watchId int64) bool {
|
|
|
+ ws, ok := w.substreams[watchId]
|
|
|
if !ok {
|
|
|
return false
|
|
|
}
|
|
|
@@ -888,6 +965,13 @@ func (wr *watchRequest) toPB() *pb.WatchRequest {
|
|
|
return &pb.WatchRequest{RequestUnion: cr}
|
|
|
}
|
|
|
|
|
|
+// toPB converts an internal progress request structure to its protobuf WatchRequest structure.
|
|
|
+func (pr *progressRequest) toPB() *pb.WatchRequest {
|
|
|
+ req := &pb.WatchProgressRequest{}
|
|
|
+ cr := &pb.WatchRequest_ProgressRequest{ProgressRequest: req}
|
|
|
+ return &pb.WatchRequest{RequestUnion: cr}
|
|
|
+}
|
|
|
+
|
|
|
func streamKeyFromCtx(ctx context.Context) string {
|
|
|
if md, ok := metadata.FromOutgoingContext(ctx); ok {
|
|
|
return fmt.Sprintf("%+v", md)
|