|
|
@@ -383,6 +383,7 @@ func (s *watchableStore) syncWatchers() {
|
|
|
evs := kvsToEvents(wg, revs, vs)
|
|
|
tx.Unlock()
|
|
|
|
|
|
+ var victims watcherBatch
|
|
|
wb := newWatcherBatch(wg, evs)
|
|
|
for w := range wg.watchers {
|
|
|
eb, ok := wb[w]
|
|
|
@@ -394,23 +395,30 @@ func (s *watchableStore) syncWatchers() {
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
+ w.cur = curRev
|
|
|
+ isBlocked := false
|
|
|
select {
|
|
|
case w.ch <- WatchResponse{WatchID: w.id, Events: eb.evs, Revision: curRev}:
|
|
|
pendingEventsGauge.Add(float64(len(eb.evs)))
|
|
|
default:
|
|
|
- // TODO: handle the full unsynced watchers.
|
|
|
- // continue to process other watchers for now, the full ones
|
|
|
- // will be processed next time and hopefully it will not be full.
|
|
|
- continue
|
|
|
+ if victims == nil {
|
|
|
+ victims = make(watcherBatch)
|
|
|
+ }
|
|
|
+ isBlocked = true
|
|
|
}
|
|
|
- if eb.moreRev != 0 {
|
|
|
- w.cur = eb.moreRev
|
|
|
- continue
|
|
|
+
|
|
|
+ if isBlocked {
|
|
|
+ victims[w] = eb
|
|
|
+ } else {
|
|
|
+ if eb.moreRev != 0 {
|
|
|
+ w.cur = eb.moreRev
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ s.synced.add(w)
|
|
|
}
|
|
|
- w.cur = curRev
|
|
|
- s.synced.add(w)
|
|
|
s.unsynced.delete(w)
|
|
|
}
|
|
|
+ s.addVictim(victims)
|
|
|
|
|
|
vsz := 0
|
|
|
for _, v := range s.victims {
|