|
|
@@ -15,7 +15,6 @@
|
|
|
package storage
|
|
|
|
|
|
import (
|
|
|
- "fmt"
|
|
|
"log"
|
|
|
"math"
|
|
|
"strings"
|
|
|
@@ -56,6 +55,22 @@ func (w watcherSetByKey) add(wa *watcher) {
|
|
|
set.add(wa)
|
|
|
}
|
|
|
|
|
|
+func (w watcherSetByKey) delete(wa *watcher) bool {
|
|
|
+ k := string(wa.key)
|
|
|
+ if v, ok := w[k]; ok {
|
|
|
+ if _, ok := v[wa]; ok {
|
|
|
+ delete(v, wa)
|
|
|
+ // if there is nothing in the set,
|
|
|
+ // remove the set
|
|
|
+ if len(v) == 0 {
|
|
|
+ delete(w, k)
|
|
|
+ }
|
|
|
+ return true
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return false
|
|
|
+}
|
|
|
+
|
|
|
type watchable interface {
|
|
|
watch(key []byte, prefix bool, startRev int64, id WatchID, ch chan<- WatchResponse) (*watcher, cancelFunc)
|
|
|
rev() int64
|
|
|
@@ -204,11 +219,8 @@ func (s *watchableStore) watch(key []byte, prefix bool, startRev int64, id Watch
|
|
|
ch: ch,
|
|
|
}
|
|
|
|
|
|
- k := string(key)
|
|
|
if startRev == 0 {
|
|
|
- if err := unsafeAddWatcher(s.synced, k, wa); err != nil {
|
|
|
- log.Panicf("error unsafeAddWatcher (%v) for key %s", err, k)
|
|
|
- }
|
|
|
+ s.synced.add(wa)
|
|
|
} else {
|
|
|
slowWatcherGauge.Inc()
|
|
|
s.unsynced[wa] = struct{}{}
|
|
|
@@ -226,16 +238,8 @@ func (s *watchableStore) watch(key []byte, prefix bool, startRev int64, id Watch
|
|
|
return
|
|
|
}
|
|
|
|
|
|
- if v, ok := s.synced[k]; ok {
|
|
|
- if _, ok := v[wa]; ok {
|
|
|
- delete(v, wa)
|
|
|
- // if there is nothing in s.synced[k],
|
|
|
- // remove the key from the synced
|
|
|
- if len(v) == 0 {
|
|
|
- delete(s.synced, k)
|
|
|
- }
|
|
|
- watcherGauge.Dec()
|
|
|
- }
|
|
|
+ if s.synced.delete(wa) {
|
|
|
+ watcherGauge.Dec()
|
|
|
}
|
|
|
// If we cannot find it, it should have finished watch.
|
|
|
})
|
|
|
@@ -358,10 +362,7 @@ func (s *watchableStore) syncWatchers() {
|
|
|
// will be processed next time and hopefully it will not be full.
|
|
|
continue
|
|
|
}
|
|
|
- k := string(w.key)
|
|
|
- if err := unsafeAddWatcher(s.synced, k, w); err != nil {
|
|
|
- log.Panicf("error unsafeAddWatcher (%v) for key %s", err, k)
|
|
|
- }
|
|
|
+ s.synced.add(w)
|
|
|
delete(s.unsynced, w)
|
|
|
}
|
|
|
|
|
|
@@ -416,16 +417,6 @@ type watcher struct {
|
|
|
ch chan<- WatchResponse
|
|
|
}
|
|
|
|
|
|
-// unsafeAddWatcher puts watcher with key k into watchableStore's synced.
|
|
|
-// Make sure to this is thread-safe using mutex before and after.
|
|
|
-func unsafeAddWatcher(synced watcherSetByKey, k string, wa *watcher) error {
|
|
|
- if wa == nil {
|
|
|
- return fmt.Errorf("nil watcher received")
|
|
|
- }
|
|
|
- synced.add(wa)
|
|
|
- return nil
|
|
|
-}
|
|
|
-
|
|
|
// newWatcherToEventMap creates a map that has watcher as key and events as
|
|
|
// value. It enables quick events look up by watcher.
|
|
|
func newWatcherToEventMap(sm watcherSetByKey, evs []storagepb.Event) map[*watcher][]storagepb.Event {
|