|
@@ -89,6 +89,9 @@ func (wb *watchBroadcast) bcast(wr clientv3.WatchResponse) {
|
|
|
for r := range wb.receivers {
|
|
for r := range wb.receivers {
|
|
|
r.send(wr)
|
|
r.send(wr)
|
|
|
}
|
|
}
|
|
|
|
|
+ if wb.size() > 0 {
|
|
|
|
|
+ eventsCoalescing.Add(float64(wb.size() - 1))
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// add puts a watcher into receiving a broadcast if its revision at least
|
|
// add puts a watcher into receiving a broadcast if its revision at least
|
|
@@ -121,6 +124,8 @@ func (wb *watchBroadcast) add(w *watcher) bool {
|
|
|
return false
|
|
return false
|
|
|
}
|
|
}
|
|
|
wb.receivers[w] = struct{}{}
|
|
wb.receivers[w] = struct{}{}
|
|
|
|
|
+ watchersCoalescing.Inc()
|
|
|
|
|
+
|
|
|
return true
|
|
return true
|
|
|
}
|
|
}
|
|
|
func (wb *watchBroadcast) delete(w *watcher) {
|
|
func (wb *watchBroadcast) delete(w *watcher) {
|
|
@@ -130,6 +135,10 @@ func (wb *watchBroadcast) delete(w *watcher) {
|
|
|
panic("deleting missing watcher from broadcast")
|
|
panic("deleting missing watcher from broadcast")
|
|
|
}
|
|
}
|
|
|
delete(wb.receivers, w)
|
|
delete(wb.receivers, w)
|
|
|
|
|
+ if !wb.empty() {
|
|
|
|
|
+ // do not dec the only left watcher for coalescing.
|
|
|
|
|
+ watchersCoalescing.Dec()
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (wb *watchBroadcast) size() int {
|
|
func (wb *watchBroadcast) size() int {
|
|
@@ -141,6 +150,11 @@ func (wb *watchBroadcast) size() int {
|
|
|
func (wb *watchBroadcast) empty() bool { return wb.size() == 0 }
|
|
func (wb *watchBroadcast) empty() bool { return wb.size() == 0 }
|
|
|
|
|
|
|
|
func (wb *watchBroadcast) stop() {
|
|
func (wb *watchBroadcast) stop() {
|
|
|
|
|
+ if !wb.empty() {
|
|
|
|
|
+ // do not dec the only left watcher for coalescing.
|
|
|
|
|
+ watchersCoalescing.Sub(float64(wb.size() - 1))
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
wb.cancel()
|
|
wb.cancel()
|
|
|
<-wb.donec
|
|
<-wb.donec
|
|
|
}
|
|
}
|