|
@@ -615,13 +615,18 @@ func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{
|
|
|
// send first creation event only if requested
|
|
// send first creation event only if requested
|
|
|
if ws.initReq.createdNotify {
|
|
if ws.initReq.createdNotify {
|
|
|
ws.outc <- *wr
|
|
ws.outc <- *wr
|
|
|
- if ws.initReq.rev == 0 {
|
|
|
|
|
- // current revision of store; returning the
|
|
|
|
|
- // create response binds the current revision to
|
|
|
|
|
- // this revision, so restart with it if there's a
|
|
|
|
|
- // disconnect before receiving any events.
|
|
|
|
|
- nextRev = wr.Header.Revision
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ }
|
|
|
|
|
+ // once the watch channel is returned, a current revision
|
|
|
|
|
+ // watch must resume at the store revision. This is necessary
|
|
|
|
|
+ // for the following case to work as expected:
|
|
|
|
|
+ // wch := m1.Watch("a")
|
|
|
|
|
+ // m2.Put("a", "b")
|
|
|
|
|
+ // <-wch
|
|
|
|
|
+ // If the revision is only bound on the first observed event,
|
|
|
|
|
+ // if wch is disconnected before the Put is issued, then reconnects
|
|
|
|
|
+ // after it is committed, it'll miss the Put.
|
|
|
|
|
+ if ws.initReq.rev == 0 {
|
|
|
|
|
+ nextRev = wr.Header.Revision
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
} else {
|
|
} else {
|