|
|
@@ -15,6 +15,7 @@
|
|
|
package storage
|
|
|
|
|
|
import (
|
|
|
+ "fmt"
|
|
|
"log"
|
|
|
"sync"
|
|
|
"time"
|
|
|
@@ -44,7 +45,7 @@ type watchableStore struct {
|
|
|
|
|
|
// contains all synced watching that are tracking the events that will happen
|
|
|
// The key of the map is the key that the watching is watching on.
|
|
|
- synced map[string][]*watching
|
|
|
+ synced map[string]map[*watching]struct{}
|
|
|
tx *ongoingTx
|
|
|
|
|
|
stopc chan struct{}
|
|
|
@@ -55,7 +56,7 @@ func newWatchableStore(path string) *watchableStore {
|
|
|
s := &watchableStore{
|
|
|
store: newStore(path),
|
|
|
unsynced: make(map[*watching]struct{}),
|
|
|
- synced: make(map[string][]*watching),
|
|
|
+ synced: make(map[string]map[*watching]struct{}),
|
|
|
stopc: make(chan struct{}),
|
|
|
}
|
|
|
s.wg.Add(1)
|
|
|
@@ -185,7 +186,9 @@ func (s *watchableStore) watch(key []byte, prefix bool, startRev int64, ch chan<
|
|
|
|
|
|
k := string(key)
|
|
|
if startRev == 0 {
|
|
|
- s.synced[k] = append(s.synced[k], wa)
|
|
|
+ if err := unsafeAddWatching(&s.synced, k, wa); err != nil {
|
|
|
+ log.Panicf("error unsafeAddWatching (%v) for key %s", err, k)
|
|
|
+ }
|
|
|
} else {
|
|
|
slowWatchingGauge.Inc()
|
|
|
s.unsynced[wa] = struct{}{}
|
|
|
@@ -203,9 +206,14 @@ func (s *watchableStore) watch(key []byte, prefix bool, startRev int64, ch chan<
|
|
|
return
|
|
|
}
|
|
|
|
|
|
- for i, w := range s.synced[k] {
|
|
|
- if w == wa {
|
|
|
- s.synced[k] = append(s.synced[k][:i], s.synced[k][i+1:]...)
|
|
|
+ if v, ok := s.synced[k]; ok {
|
|
|
+ if _, ok := v[wa]; ok {
|
|
|
+ delete(v, wa)
|
|
|
+ // if there is nothing in s.synced[k],
|
|
|
+ // remove the key from the synced
|
|
|
+ if len(v) == 0 {
|
|
|
+ delete(s.synced, k)
|
|
|
+ }
|
|
|
watchingGauge.Dec()
|
|
|
}
|
|
|
}
|
|
|
@@ -272,7 +280,10 @@ func (s *watchableStore) syncWatchings() {
|
|
|
}
|
|
|
// switch to tracking future events if needed
|
|
|
if nextRev > curRev {
|
|
|
- s.synced[string(w.key)] = append(s.synced[string(w.key)], w)
|
|
|
+ k := string(w.key)
|
|
|
+ if err := unsafeAddWatching(&s.synced, k, w); err != nil {
|
|
|
+ log.Panicf("error unsafeAddWatching (%v) for key %s", err, k)
|
|
|
+ }
|
|
|
delete(s.unsynced, w)
|
|
|
continue
|
|
|
}
|
|
|
@@ -292,25 +303,25 @@ func (s *watchableStore) handle(rev int64, ev storagepb.Event) {
|
|
|
func (s *watchableStore) notify(rev int64, ev storagepb.Event) {
|
|
|
// check all prefixes of the key to notify all corresponded watchings
|
|
|
for i := 0; i <= len(ev.Kv.Key); i++ {
|
|
|
- ws := s.synced[string(ev.Kv.Key[:i])]
|
|
|
- nws := ws[:0]
|
|
|
- for _, w := range ws {
|
|
|
- // the watching needs to be notified when either it watches prefix or
|
|
|
- // the key is exactly matched.
|
|
|
- if !w.prefix && i != len(ev.Kv.Key) {
|
|
|
- continue
|
|
|
- }
|
|
|
- select {
|
|
|
- case w.ch <- ev:
|
|
|
- pendingEventsGauge.Inc()
|
|
|
- nws = append(nws, w)
|
|
|
- default:
|
|
|
- w.cur = rev
|
|
|
- s.unsynced[w] = struct{}{}
|
|
|
- slowWatchingGauge.Inc()
|
|
|
+ k := string(ev.Kv.Key[:i])
|
|
|
+ if wm, ok := s.synced[k]; ok {
|
|
|
+ for w := range wm {
|
|
|
+ // the watching needs to be notified when either it watches prefix or
|
|
|
+ // the key is exactly matched.
|
|
|
+ if !w.prefix && i != len(ev.Kv.Key) {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ select {
|
|
|
+ case w.ch <- ev:
|
|
|
+ pendingEventsGauge.Inc()
|
|
|
+ default:
|
|
|
+ w.cur = rev
|
|
|
+ s.unsynced[w] = struct{}{}
|
|
|
+ delete(wm, w)
|
|
|
+ slowWatchingGauge.Inc()
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
- s.synced[string(ev.Kv.Key[:i])] = nws
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -356,3 +367,23 @@ type watching struct {
|
|
|
// The chan might be shared with other watchings.
|
|
|
ch chan<- storagepb.Event
|
|
|
}
|
|
|
+
|
|
|
+// unsafeAddWatching puts watching with key k into watchableStore's synced.
|
|
|
+// Make sure to this is thread-safe using mutex before and after.
|
|
|
+func unsafeAddWatching(synced *map[string]map[*watching]struct{}, k string, wa *watching) error {
|
|
|
+ if wa == nil {
|
|
|
+ return fmt.Errorf("nil watching received")
|
|
|
+ }
|
|
|
+ mp := *synced
|
|
|
+ if v, ok := mp[k]; ok {
|
|
|
+ if _, ok := v[wa]; ok {
|
|
|
+ return fmt.Errorf("put the same watch twice: %+v", wa)
|
|
|
+ } else {
|
|
|
+ v[wa] = struct{}{}
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ mp[k] = make(map[*watching]struct{})
|
|
|
+ mp[k][wa] = struct{}{}
|
|
|
+ return nil
|
|
|
+}
|