|
|
@@ -60,8 +60,10 @@ func (wbs *watchBroadcasts) coalesce(wb *watchBroadcast) {
|
|
|
continue
|
|
|
}
|
|
|
wbswb.mu.Lock()
|
|
|
- // NB: victim lock already held
|
|
|
- if wb.nextrev >= wbswb.nextrev && wbswb.nextrev != 0 {
|
|
|
+ // 1. check if wbswb is behind wb so it won't skip any events in wb
|
|
|
+ // 2. ensure wbswb started; nextrev == 0 may mean wbswb is waiting
|
|
|
+ // for a current watcher and expects a create event from the server.
|
|
|
+ if wb.nextrev >= wbswb.nextrev && wbswb.responses > 0 {
|
|
|
for w := range wb.receivers {
|
|
|
wbswb.receivers[w] = struct{}{}
|
|
|
wbs.watchers[w] = wbswb
|