|
@@ -117,13 +117,18 @@ func (ws *watchStream) Chan() <-chan WatchResponse {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (ws *watchStream) Cancel(id WatchID) error {
|
|
func (ws *watchStream) Cancel(id WatchID) error {
|
|
|
|
|
+ ws.mu.Lock()
|
|
|
cancel, ok := ws.cancels[id]
|
|
cancel, ok := ws.cancels[id]
|
|
|
|
|
+ ok = ok && !ws.closed
|
|
|
|
|
+ if ok {
|
|
|
|
|
+ delete(ws.cancels, id)
|
|
|
|
|
+ delete(ws.watchers, id)
|
|
|
|
|
+ }
|
|
|
|
|
+ ws.mu.Unlock()
|
|
|
if !ok {
|
|
if !ok {
|
|
|
return ErrWatcherNotExist
|
|
return ErrWatcherNotExist
|
|
|
}
|
|
}
|
|
|
cancel()
|
|
cancel()
|
|
|
- delete(ws.cancels, id)
|
|
|
|
|
- delete(ws.watchers, id)
|
|
|
|
|
return nil
|
|
return nil
|
|
|
}
|
|
}
|
|
|
|
|
|