|
|
@@ -186,10 +186,10 @@ func (s *watchableStore) watch(key []byte, prefix bool, startRev int64, ch chan<
|
|
|
if startRev == 0 {
|
|
|
s.synced[k] = append(s.synced[k], wa)
|
|
|
} else {
|
|
|
- slowWatchersGauge.Inc()
|
|
|
+ slowWatchingGauge.Inc()
|
|
|
s.unsynced[wa] = struct{}{}
|
|
|
}
|
|
|
- watchersGauge.Inc()
|
|
|
+ watchingGauge.Inc()
|
|
|
|
|
|
cancel := CancelFunc(func() {
|
|
|
s.mu.Lock()
|
|
|
@@ -197,15 +197,15 @@ func (s *watchableStore) watch(key []byte, prefix bool, startRev int64, ch chan<
|
|
|
// remove global references of the watching
|
|
|
if _, ok := s.unsynced[wa]; ok {
|
|
|
delete(s.unsynced, wa)
|
|
|
- slowWatchersGauge.Dec()
|
|
|
- watchersGauge.Dec()
|
|
|
+ slowWatchingGauge.Dec()
|
|
|
+ watchingGauge.Dec()
|
|
|
return
|
|
|
}
|
|
|
|
|
|
for i, w := range s.synced[k] {
|
|
|
if w == wa {
|
|
|
s.synced[k] = append(s.synced[k][:i], s.synced[k][i+1:]...)
|
|
|
- watchersGauge.Dec()
|
|
|
+ watchingGauge.Dec()
|
|
|
}
|
|
|
}
|
|
|
// If we cannot find it, it should have finished watch.
|
|
|
@@ -267,7 +267,7 @@ func (s *watchableStore) syncWatchings() {
|
|
|
// put it back to try it in the next round
|
|
|
w.cur = nextRev
|
|
|
}
|
|
|
- slowWatchersGauge.Set(float64(len(s.unsynced)))
|
|
|
+ slowWatchingGauge.Set(float64(len(s.unsynced)))
|
|
|
}
|
|
|
|
|
|
// handle handles the change of the happening event on all watchings.
|
|
|
@@ -295,7 +295,7 @@ func (s *watchableStore) notify(rev int64, ev storagepb.Event) {
|
|
|
default:
|
|
|
w.cur = rev
|
|
|
s.unsynced[w] = struct{}{}
|
|
|
- slowWatchersGauge.Inc()
|
|
|
+ slowWatchingGauge.Inc()
|
|
|
}
|
|
|
}
|
|
|
s.synced[string(ev.Kv.Key[:i])] = nws
|