|
|
@@ -61,6 +61,9 @@ type WatchResponse struct {
|
|
|
// the channel sends a final response that has Canceled set to true with a non-nil Err().
|
|
|
Canceled bool
|
|
|
|
|
|
+ // Created is used to indicate the creation of the watcher.
|
|
|
+ Created bool
|
|
|
+
|
|
|
closeErr error
|
|
|
}
|
|
|
|
|
|
@@ -98,6 +101,7 @@ type watcher struct {
|
|
|
|
|
|
// mu protects the grpc streams map
|
|
|
mu sync.RWMutex
|
|
|
+
|
|
|
// streams holds all the active grpc streams keyed by ctx value.
|
|
|
streams map[string]*watchGrpcStream
|
|
|
}
|
|
|
@@ -138,6 +142,8 @@ type watchRequest struct {
|
|
|
key string
|
|
|
end string
|
|
|
rev int64
|
|
|
+ // send created notification event if this field is true
|
|
|
+ createdNotify bool
|
|
|
// progressNotify is for progress updates
|
|
|
progressNotify bool
|
|
|
// filters is the list of events to filter out
|
|
|
@@ -223,6 +229,7 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
|
|
|
|
|
|
wr := &watchRequest{
|
|
|
ctx: ctx,
|
|
|
+ createdNotify: ow.createdNotify,
|
|
|
key: string(ow.key),
|
|
|
end: string(ow.end),
|
|
|
rev: ow.rev,
|
|
|
@@ -418,6 +425,7 @@ func (w *watchGrpcStream) run() {
|
|
|
w.addStream(pbresp, pendingReq)
|
|
|
pendingReq = nil
|
|
|
curReqC = w.reqc
|
|
|
+ w.dispatchEvent(pbresp)
|
|
|
case pbresp.Canceled:
|
|
|
delete(cancelSet, pbresp.WatchId)
|
|
|
// shutdown serveStream, if any
|
|
|
@@ -489,19 +497,23 @@ func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
|
|
|
w.mu.RLock()
|
|
|
defer w.mu.RUnlock()
|
|
|
ws, ok := w.streams[pbresp.WatchId]
|
|
|
+ if !ok {
|
|
|
+ return false
|
|
|
+ }
|
|
|
+
|
|
|
events := make([]*Event, len(pbresp.Events))
|
|
|
for i, ev := range pbresp.Events {
|
|
|
events[i] = (*Event)(ev)
|
|
|
}
|
|
|
- if ok {
|
|
|
- wr := &WatchResponse{
|
|
|
- Header: *pbresp.Header,
|
|
|
- Events: events,
|
|
|
- CompactRevision: pbresp.CompactRevision,
|
|
|
- Canceled: pbresp.Canceled}
|
|
|
- ws.recvc <- wr
|
|
|
+ wr := &WatchResponse{
|
|
|
+ Header: *pbresp.Header,
|
|
|
+ Events: events,
|
|
|
+ CompactRevision: pbresp.CompactRevision,
|
|
|
+ Created: pbresp.Created,
|
|
|
+ Canceled: pbresp.Canceled,
|
|
|
}
|
|
|
- return ok
|
|
|
+ ws.recvc <- wr
|
|
|
+ return true
|
|
|
}
|
|
|
|
|
|
// serveWatchClient forwards messages from the grpc stream to run()
|
|
|
@@ -533,6 +545,14 @@ func (w *watchGrpcStream) serveStream(ws *watcherStream) {
|
|
|
for !closing {
|
|
|
curWr := emptyWr
|
|
|
outc := ws.outc
|
|
|
+
|
|
|
+ // ignore created event if create notify is not requested or
|
|
|
+ // we already sent the initial created event (when we are on the resume path).
|
|
|
+ if len(wrs) > 0 && wrs[0].Created &&
|
|
|
+ (!ws.initReq.createdNotify || ws.lastRev != 0) {
|
|
|
+ wrs = wrs[1:]
|
|
|
+ }
|
|
|
+
|
|
|
if len(wrs) > 0 {
|
|
|
curWr = wrs[0]
|
|
|
} else {
|