|
|
@@ -55,6 +55,22 @@ func (w watcherSetByKey) add(wa *watcher) {
|
|
|
set.add(wa)
|
|
|
}
|
|
|
|
|
|
+func (w watcherSetByKey) delete(wa *watcher) bool {
|
|
|
+ k := string(wa.key)
|
|
|
+ if v, ok := w[k]; ok {
|
|
|
+ if _, ok := v[wa]; ok {
|
|
|
+ delete(v, wa)
|
|
|
+ // if there is nothing in the set,
|
|
|
+ // remove the set
|
|
|
+ if len(v) == 0 {
|
|
|
+ delete(w, k)
|
|
|
+ }
|
|
|
+ return true
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return false
|
|
|
+}
|
|
|
+
|
|
|
type watchable interface {
|
|
|
watch(key []byte, prefix bool, startRev int64, id WatchID, ch chan<- WatchResponse) (*watcher, cancelFunc)
|
|
|
rev() int64
|
|
|
@@ -203,7 +219,6 @@ func (s *watchableStore) watch(key []byte, prefix bool, startRev int64, id Watch
|
|
|
ch: ch,
|
|
|
}
|
|
|
|
|
|
- k := string(key)
|
|
|
if startRev == 0 {
|
|
|
s.synced.add(wa)
|
|
|
} else {
|
|
|
@@ -223,16 +238,8 @@ func (s *watchableStore) watch(key []byte, prefix bool, startRev int64, id Watch
|
|
|
return
|
|
|
}
|
|
|
|
|
|
- if v, ok := s.synced[k]; ok {
|
|
|
- if _, ok := v[wa]; ok {
|
|
|
- delete(v, wa)
|
|
|
- // if there is nothing in s.synced[k],
|
|
|
- // remove the key from the synced
|
|
|
- if len(v) == 0 {
|
|
|
- delete(s.synced, k)
|
|
|
- }
|
|
|
- watcherGauge.Dec()
|
|
|
- }
|
|
|
+ if s.synced.delete(wa) {
|
|
|
+ watcherGauge.Dec()
|
|
|
}
|
|
|
// If we cannot find it, it should have finished watch.
|
|
|
})
|