|
|
@@ -88,15 +88,14 @@ type serverWatchStream struct {
|
|
|
watchStream mvcc.WatchStream
|
|
|
ctrlStream chan *pb.WatchResponse
|
|
|
|
|
|
+ // mu protects progress, prevKV
|
|
|
+ mu sync.Mutex
|
|
|
// progress tracks the watchID that stream might need to send
|
|
|
// progress to.
|
|
|
// TOOD: combine progress and prevKV into a single struct?
|
|
|
progress map[mvcc.WatchID]bool
|
|
|
prevKV map[mvcc.WatchID]bool
|
|
|
|
|
|
- // mu protects progress
|
|
|
- mu sync.Mutex
|
|
|
-
|
|
|
// closec indicates the stream is closed.
|
|
|
closec chan struct{}
|
|
|
|
|
|
@@ -191,12 +190,14 @@ func (sws *serverWatchStream) recvLoop() error {
|
|
|
}
|
|
|
id := sws.watchStream.Watch(creq.Key, creq.RangeEnd, rev, filters...)
|
|
|
if id != -1 {
|
|
|
+ sws.mu.Lock()
|
|
|
if creq.ProgressNotify {
|
|
|
sws.progress[id] = true
|
|
|
}
|
|
|
if creq.PrevKv {
|
|
|
sws.prevKV[id] = true
|
|
|
}
|
|
|
+ sws.mu.Unlock()
|
|
|
}
|
|
|
wr := &pb.WatchResponse{
|
|
|
Header: sws.newResponseHeader(wsrev),
|
|
|
@@ -268,7 +269,9 @@ func (sws *serverWatchStream) sendLoop() {
|
|
|
// or define protocol buffer with []mvccpb.Event.
|
|
|
evs := wresp.Events
|
|
|
events := make([]*mvccpb.Event, len(evs))
|
|
|
+ sws.mu.Lock()
|
|
|
needPrevKV := sws.prevKV[wresp.WatchID]
|
|
|
+ sws.mu.Unlock()
|
|
|
for i := range evs {
|
|
|
events[i] = &evs[i]
|
|
|
|
|
|
@@ -333,12 +336,14 @@ func (sws *serverWatchStream) sendLoop() {
|
|
|
delete(pending, wid)
|
|
|
}
|
|
|
case <-progressTicker.C:
|
|
|
+ sws.mu.Lock()
|
|
|
for id, ok := range sws.progress {
|
|
|
if ok {
|
|
|
sws.watchStream.RequestProgress(id)
|
|
|
}
|
|
|
sws.progress[id] = true
|
|
|
}
|
|
|
+ sws.mu.Unlock()
|
|
|
case <-sws.closec:
|
|
|
return
|
|
|
}
|