coalesce was locking the target coalesce broadcast object but not the source broadcast object resulting in a data race on the source's nextrev.
@@ -59,6 +59,7 @@ func (wbs *watchBroadcasts) coalesce(wb *watchBroadcast) {
if wbswb == wb {
continue
}
+ wb.mu.Lock()
wbswb.mu.Lock()
// 1. check if wbswb is behind wb so it won't skip any events in wb
// 2. ensure wbswb started; nextrev == 0 may mean wbswb is waiting
@@ -71,6 +72,7 @@ func (wbs *watchBroadcasts) coalesce(wb *watchBroadcast) {
wb.receivers = nil
wbswb.mu.Unlock()
+ wb.mu.Unlock()
if wb.empty() {
delete(wbs.bcasts, wb)
wb.stop()