|
@@ -172,8 +172,10 @@ func (s *watchableStore) Watcher(key []byte, prefix bool, startRev, endRev int64
|
|
|
s.endm[endRev] = append(s.endm[endRev], wa)
|
|
s.endm[endRev] = append(s.endm[endRev], wa)
|
|
|
}
|
|
}
|
|
|
} else {
|
|
} else {
|
|
|
|
|
+ slowWatchersGauge.Inc()
|
|
|
s.unsynced = append(s.unsynced, wa)
|
|
s.unsynced = append(s.unsynced, wa)
|
|
|
}
|
|
}
|
|
|
|
|
+ watchersGauge.Inc()
|
|
|
|
|
|
|
|
cancel := CancelFunc(func() {
|
|
cancel := CancelFunc(func() {
|
|
|
s.mu.Lock()
|
|
s.mu.Lock()
|
|
@@ -184,6 +186,8 @@ func (s *watchableStore) Watcher(key []byte, prefix bool, startRev, endRev int64
|
|
|
for i, w := range s.unsynced {
|
|
for i, w := range s.unsynced {
|
|
|
if w == wa {
|
|
if w == wa {
|
|
|
s.unsynced = append(s.unsynced[:i], s.unsynced[i+1:]...)
|
|
s.unsynced = append(s.unsynced[:i], s.unsynced[i+1:]...)
|
|
|
|
|
+ slowWatchersGauge.Dec()
|
|
|
|
|
+ watchersGauge.Dec()
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
@@ -191,6 +195,7 @@ func (s *watchableStore) Watcher(key []byte, prefix bool, startRev, endRev int64
|
|
|
for i, w := range s.synced[k] {
|
|
for i, w := range s.synced[k] {
|
|
|
if w == wa {
|
|
if w == wa {
|
|
|
s.synced[k] = append(s.synced[k][:i], s.synced[k][i+1:]...)
|
|
s.synced[k] = append(s.synced[k][:i], s.synced[k][i+1:]...)
|
|
|
|
|
+ watchersGauge.Dec()
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
if wa.end != 0 {
|
|
if wa.end != 0 {
|
|
@@ -252,6 +257,7 @@ func (s *watchableStore) syncWatchers() {
|
|
|
// push events to the channel
|
|
// push events to the channel
|
|
|
for _, ev := range evs {
|
|
for _, ev := range evs {
|
|
|
w.ch <- ev
|
|
w.ch <- ev
|
|
|
|
|
+ pendingEventsGauge.Inc()
|
|
|
}
|
|
}
|
|
|
// stop watcher if it reaches the end
|
|
// stop watcher if it reaches the end
|
|
|
if w.end > 0 && nextRev >= w.end {
|
|
if w.end > 0 && nextRev >= w.end {
|
|
@@ -271,6 +277,7 @@ func (s *watchableStore) syncWatchers() {
|
|
|
nws = append(nws, w)
|
|
nws = append(nws, w)
|
|
|
}
|
|
}
|
|
|
s.unsynced = nws
|
|
s.unsynced = nws
|
|
|
|
|
+ slowWatchersGauge.Set(float64(len(s.unsynced)))
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// handle handles the change of the happening event on all watchers.
|
|
// handle handles the change of the happening event on all watchers.
|
|
@@ -294,6 +301,7 @@ func (s *watchableStore) notify(rev int64, ev storagepb.Event) {
|
|
|
}
|
|
}
|
|
|
select {
|
|
select {
|
|
|
case w.ch <- ev:
|
|
case w.ch <- ev:
|
|
|
|
|
+ pendingEventsGauge.Inc()
|
|
|
nws = append(nws, w)
|
|
nws = append(nws, w)
|
|
|
default:
|
|
default:
|
|
|
// put it back to unsynced place
|
|
// put it back to unsynced place
|
|
@@ -306,6 +314,7 @@ func (s *watchableStore) notify(rev int64, ev storagepb.Event) {
|
|
|
}
|
|
}
|
|
|
w.cur = rev
|
|
w.cur = rev
|
|
|
s.unsynced = append(s.unsynced, w)
|
|
s.unsynced = append(s.unsynced, w)
|
|
|
|
|
+ slowWatchersGauge.Inc()
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
s.synced[string(ev.Kv.Key[:i])] = nws
|
|
s.synced[string(ev.Kv.Key[:i])] = nws
|
|
@@ -319,6 +328,7 @@ func (s *watchableStore) stopWatchers(rev int64) {
|
|
|
for _, w := range s.synced[k] {
|
|
for _, w := range s.synced[k] {
|
|
|
if w == wa {
|
|
if w == wa {
|
|
|
s.synced[k] = append(s.synced[k][:i], s.synced[k][i+1:]...)
|
|
s.synced[k] = append(s.synced[k][:i], s.synced[k][i+1:]...)
|
|
|
|
|
+ watchersGauge.Dec()
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
wa.stopWithError(ExceedEnd)
|
|
wa.stopWithError(ExceedEnd)
|