|
|
@@ -17,6 +17,7 @@ package storage
|
|
|
import (
|
|
|
"fmt"
|
|
|
"log"
|
|
|
+ "math"
|
|
|
"sync"
|
|
|
"time"
|
|
|
|
|
|
@@ -241,57 +242,105 @@ func (s *watchableStore) syncWatchingsLoop() {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-// syncWatchings syncs the watchings in the unsyncd map.
|
|
|
+// syncWatchings periodically syncs unsynced watchings by: Iterate all unsynced
|
|
|
+// watchings to get the minimum revision within its range, skipping the
|
|
|
+// watching if its current revision is behind the compact revision of the
|
|
|
+// store. And use this minimum revision to get all key-value pairs. Then send
|
|
|
+// those events to watchings.
|
|
|
func (s *watchableStore) syncWatchings() {
|
|
|
- _, curRev, _ := s.store.Range(nil, nil, 0, 0)
|
|
|
+ s.store.mu.Lock()
|
|
|
+ defer s.store.mu.Unlock()
|
|
|
+
|
|
|
+ if len(s.unsynced) == 0 {
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ // in order to find key-value pairs from unsynced watchings, we need to
|
|
|
+ // find min revision index, and these revisions can be used to
|
|
|
+ // query the backend store of key-value pairs
|
|
|
+ minRev := int64(math.MaxInt64)
|
|
|
+
|
|
|
+ curRev := s.store.currentRev.main
|
|
|
+ compactionRev := s.store.compactMainRev
|
|
|
+
|
|
|
+ // TODO: change unsynced struct type same to this
|
|
|
+ keyToUnsynced := make(map[string]map[*watching]struct{})
|
|
|
+
|
|
|
for w := range s.unsynced {
|
|
|
- var end []byte
|
|
|
- if w.prefix {
|
|
|
- end = make([]byte, len(w.key))
|
|
|
- copy(end, w.key)
|
|
|
- end[len(w.key)-1]++
|
|
|
+ k := string(w.key)
|
|
|
+
|
|
|
+ if w.cur > curRev {
|
|
|
+ panic("watching current revision should not exceed current revision")
|
|
|
}
|
|
|
- limit := cap(w.ch) - len(w.ch)
|
|
|
- // the channel is full, try it in the next round
|
|
|
- if limit == 0 {
|
|
|
+
|
|
|
+ if w.cur < compactionRev {
|
|
|
+ // TODO: return error compacted to that watching instead of
|
|
|
+ // just removing it sliently from unsynced.
|
|
|
+ delete(s.unsynced, w)
|
|
|
continue
|
|
|
}
|
|
|
- revbs, kvs, nextRev, err := s.store.RangeHistory(w.key, end, int64(limit), w.cur)
|
|
|
- if err != nil {
|
|
|
- // TODO: send error event to watching
|
|
|
- delete(s.unsynced, w)
|
|
|
+
|
|
|
+ if minRev >= w.cur {
|
|
|
+ minRev = w.cur
|
|
|
+ }
|
|
|
+
|
|
|
+ if _, ok := keyToUnsynced[k]; !ok {
|
|
|
+ keyToUnsynced[k] = make(map[*watching]struct{})
|
|
|
+ }
|
|
|
+ keyToUnsynced[k][w] = struct{}{}
|
|
|
+ }
|
|
|
+
|
|
|
+ minBytes, maxBytes := newRevBytes(), newRevBytes()
|
|
|
+ revToBytes(revision{main: minRev}, minBytes)
|
|
|
+ revToBytes(revision{main: curRev + 1}, maxBytes)
|
|
|
+
|
|
|
+ // UnsafeRange returns keys and values. And in boltdb, keys are revisions.
|
|
|
+ // values are actual key-value pairs in backend.
|
|
|
+ tx := s.store.b.BatchTx()
|
|
|
+ tx.Lock()
|
|
|
+ ks, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0)
|
|
|
+ tx.Unlock()
|
|
|
+
|
|
|
+ for i, v := range vs {
|
|
|
+ var kv storagepb.KeyValue
|
|
|
+ if err := kv.Unmarshal(v); err != nil {
|
|
|
+ log.Panicf("storage: cannot unmarshal event: %v", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ k := string(kv.Key)
|
|
|
+ wm, ok := keyToUnsynced[k]
|
|
|
+ if !ok {
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
- // push events to the channel
|
|
|
- for i, kv := range kvs {
|
|
|
- var evt storagepb.Event_EventType
|
|
|
- switch {
|
|
|
- case isTombstone(revbs[i]):
|
|
|
- evt = storagepb.DELETE
|
|
|
- default:
|
|
|
- evt = storagepb.PUT
|
|
|
- }
|
|
|
+ var ev storagepb.Event
|
|
|
+ switch {
|
|
|
+ case isTombstone(ks[i]):
|
|
|
+ ev.Type = storagepb.DELETE
|
|
|
+ default:
|
|
|
+ ev.Type = storagepb.PUT
|
|
|
+ }
|
|
|
+ ev.Kv = &kv
|
|
|
+
|
|
|
+ for w := range wm {
|
|
|
+ ev.WatchID = w.id
|
|
|
|
|
|
- w.ch <- storagepb.Event{
|
|
|
- Type: evt,
|
|
|
- Kv: &kv,
|
|
|
- WatchID: w.id,
|
|
|
+ select {
|
|
|
+ case w.ch <- ev:
|
|
|
+ pendingEventsGauge.Inc()
|
|
|
+ default:
|
|
|
+ // TODO: handle the full unsynced watchings.
|
|
|
+ // continue to process other watchings for now, the full ones
|
|
|
+ // will be processed next time and hopefully it will not be full.
|
|
|
+ continue
|
|
|
}
|
|
|
- pendingEventsGauge.Inc()
|
|
|
- }
|
|
|
- // switch to tracking future events if needed
|
|
|
- if nextRev > curRev {
|
|
|
- k := string(w.key)
|
|
|
if err := unsafeAddWatching(&s.synced, k, w); err != nil {
|
|
|
log.Panicf("error unsafeAddWatching (%v) for key %s", err, k)
|
|
|
}
|
|
|
delete(s.unsynced, w)
|
|
|
- continue
|
|
|
}
|
|
|
- // put it back to try it in the next round
|
|
|
- w.cur = nextRev
|
|
|
}
|
|
|
+
|
|
|
slowWatchingGauge.Set(float64(len(s.unsynced)))
|
|
|
}
|
|
|
|