|
@@ -28,8 +28,8 @@ type watchableStore struct {
|
|
|
*store
|
|
*store
|
|
|
|
|
|
|
|
// contains all unsynced watchers that needs to sync events that have happened
|
|
// contains all unsynced watchers that needs to sync events that have happened
|
|
|
- // TODO: use map to reduce cancel cost
|
|
|
|
|
- unsynced []*watcher
|
|
|
|
|
|
|
+ unsynced map[*watcher]struct{}
|
|
|
|
|
+
|
|
|
// contains all synced watchers that are tracking the events that will happen
|
|
// contains all synced watchers that are tracking the events that will happen
|
|
|
// The key of the map is the key that the watcher is watching on.
|
|
// The key of the map is the key that the watcher is watching on.
|
|
|
synced map[string][]*watcher
|
|
synced map[string][]*watcher
|
|
@@ -41,9 +41,10 @@ type watchableStore struct {
|
|
|
|
|
|
|
|
func newWatchableStore(path string) *watchableStore {
|
|
func newWatchableStore(path string) *watchableStore {
|
|
|
s := &watchableStore{
|
|
s := &watchableStore{
|
|
|
- store: newStore(path),
|
|
|
|
|
- synced: make(map[string][]*watcher),
|
|
|
|
|
- stopc: make(chan struct{}),
|
|
|
|
|
|
|
+ store: newStore(path),
|
|
|
|
|
+ unsynced: make(map[*watcher]struct{}),
|
|
|
|
|
+ synced: make(map[string][]*watcher),
|
|
|
|
|
+ stopc: make(chan struct{}),
|
|
|
}
|
|
}
|
|
|
s.wg.Add(1)
|
|
s.wg.Add(1)
|
|
|
go s.syncWatchersLoop()
|
|
go s.syncWatchersLoop()
|
|
@@ -161,7 +162,7 @@ func (s *watchableStore) Watcher(key []byte, prefix bool, startRev int64) (Watch
|
|
|
s.synced[k] = append(s.synced[k], wa)
|
|
s.synced[k] = append(s.synced[k], wa)
|
|
|
} else {
|
|
} else {
|
|
|
slowWatchersGauge.Inc()
|
|
slowWatchersGauge.Inc()
|
|
|
- s.unsynced = append(s.unsynced, wa)
|
|
|
|
|
|
|
+ s.unsynced[wa] = struct{}{}
|
|
|
}
|
|
}
|
|
|
watchersGauge.Inc()
|
|
watchersGauge.Inc()
|
|
|
|
|
|
|
@@ -171,13 +172,11 @@ func (s *watchableStore) Watcher(key []byte, prefix bool, startRev int64) (Watch
|
|
|
wa.stopWithError(ErrCanceled)
|
|
wa.stopWithError(ErrCanceled)
|
|
|
|
|
|
|
|
// remove global references of the watcher
|
|
// remove global references of the watcher
|
|
|
- for i, w := range s.unsynced {
|
|
|
|
|
- if w == wa {
|
|
|
|
|
- s.unsynced = append(s.unsynced[:i], s.unsynced[i+1:]...)
|
|
|
|
|
- slowWatchersGauge.Dec()
|
|
|
|
|
- watchersGauge.Dec()
|
|
|
|
|
- return
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ if _, ok := s.unsynced[wa]; ok {
|
|
|
|
|
+ delete(s.unsynced, wa)
|
|
|
|
|
+ slowWatchersGauge.Dec()
|
|
|
|
|
+ watchersGauge.Dec()
|
|
|
|
|
+ return
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
for i, w := range s.synced[k] {
|
|
for i, w := range s.synced[k] {
|
|
@@ -212,11 +211,7 @@ func (s *watchableStore) syncWatchersLoop() {
|
|
|
// syncWatchers syncs the watchers in the unsyncd map.
|
|
// syncWatchers syncs the watchers in the unsyncd map.
|
|
|
func (s *watchableStore) syncWatchers() {
|
|
func (s *watchableStore) syncWatchers() {
|
|
|
_, curRev, _ := s.store.Range(nil, nil, 0, 0)
|
|
_, curRev, _ := s.store.Range(nil, nil, 0, 0)
|
|
|
-
|
|
|
|
|
- // filtering without allocating
|
|
|
|
|
- // https://github.com/golang/go/wiki/SliceTricks#filtering-without-allocating
|
|
|
|
|
- nws := s.unsynced[:0]
|
|
|
|
|
- for _, w := range s.unsynced {
|
|
|
|
|
|
|
+ for w := range s.unsynced {
|
|
|
var end []byte
|
|
var end []byte
|
|
|
if w.prefix {
|
|
if w.prefix {
|
|
|
end = make([]byte, len(w.key))
|
|
end = make([]byte, len(w.key))
|
|
@@ -226,12 +221,12 @@ func (s *watchableStore) syncWatchers() {
|
|
|
limit := cap(w.ch) - len(w.ch)
|
|
limit := cap(w.ch) - len(w.ch)
|
|
|
// the channel is full, try it in the next round
|
|
// the channel is full, try it in the next round
|
|
|
if limit == 0 {
|
|
if limit == 0 {
|
|
|
- nws = append(nws, w)
|
|
|
|
|
continue
|
|
continue
|
|
|
}
|
|
}
|
|
|
evs, nextRev, err := s.store.RangeEvents(w.key, end, int64(limit), w.cur)
|
|
evs, nextRev, err := s.store.RangeEvents(w.key, end, int64(limit), w.cur)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
w.stopWithError(err)
|
|
w.stopWithError(err)
|
|
|
|
|
+ delete(s.unsynced, w)
|
|
|
continue
|
|
continue
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -243,13 +238,12 @@ func (s *watchableStore) syncWatchers() {
|
|
|
// switch to tracking future events if needed
|
|
// switch to tracking future events if needed
|
|
|
if nextRev > curRev {
|
|
if nextRev > curRev {
|
|
|
s.synced[string(w.key)] = append(s.synced[string(w.key)], w)
|
|
s.synced[string(w.key)] = append(s.synced[string(w.key)], w)
|
|
|
|
|
+ delete(s.unsynced, w)
|
|
|
continue
|
|
continue
|
|
|
}
|
|
}
|
|
|
// put it back to try it in the next round
|
|
// put it back to try it in the next round
|
|
|
w.cur = nextRev
|
|
w.cur = nextRev
|
|
|
- nws = append(nws, w)
|
|
|
|
|
}
|
|
}
|
|
|
- s.unsynced = nws
|
|
|
|
|
slowWatchersGauge.Set(float64(len(s.unsynced)))
|
|
slowWatchersGauge.Set(float64(len(s.unsynced)))
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -277,7 +271,7 @@ func (s *watchableStore) notify(rev int64, ev storagepb.Event) {
|
|
|
nws = append(nws, w)
|
|
nws = append(nws, w)
|
|
|
default:
|
|
default:
|
|
|
w.cur = rev
|
|
w.cur = rev
|
|
|
- s.unsynced = append(s.unsynced, w)
|
|
|
|
|
|
|
+ s.unsynced[w] = struct{}{}
|
|
|
slowWatchersGauge.Inc()
|
|
slowWatchersGauge.Inc()
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|