Browse Source

Merge pull request #4043 from gyuho/storage_range_all_unsynced

storage: range all unsynced at once
Gyu-Ho Lee 10 years ago
parent
commit
6c5dc28d0f
3 changed files with 84 additions and 357 deletions
  1. 0 59
      storage/kvstore.go
  2. 0 263
      storage/kvstore_test.go
  3. 84 35
      storage/watchable_store.go

+ 0 - 59
storage/kvstore.go

@@ -189,65 +189,6 @@ func (s *store) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err
 	return n, rev, nil
 }
 
-// RangeHistory ranges the history from key to end starting from startRev.
-// If `end` is nil, the request only observes the events on key.
-// If `end` is not nil, it observes the events on key range [key, range_end).
-// Limit limits the number of events returned.
-// If startRev <=0, rangeEvents returns events from the beginning of uncompacted history.
-//
-// If the required start rev is compacted, ErrCompacted will be returned.
-// If the required start rev has not happened, ErrFutureRev will be returned.
-//
-// RangeHistory returns revision bytes slice and key-values that satisfy the requirement (0 <= n <= limit).
-// If history in the revision range has not all happened, it returns immeidately
-// what is available.
-// It also returns nextRev which indicates the start revision used for the following
-// RangeEvents call. The nextRev could be smaller than the given endRev if the store
-// has not progressed so far or it hits the event limit.
-//
-// TODO: return byte slices instead of keyValues to avoid meaningless encode and decode.
-// This also helps to return raw (key, val) pair directly to make API consistent.
-func (s *store) RangeHistory(key, end []byte, limit, startRev int64) (revbs [][]byte, kvs []storagepb.KeyValue, nextRev int64, err error) {
-	s.mu.Lock()
-	defer s.mu.Unlock()
-
-	if startRev > 0 && startRev <= s.compactMainRev {
-		return nil, nil, 0, ErrCompacted
-	}
-	if startRev > s.currentRev.main {
-		return nil, nil, 0, ErrFutureRev
-	}
-
-	revs := s.kvindex.RangeSince(key, end, startRev)
-	if len(revs) == 0 {
-		return nil, nil, s.currentRev.main + 1, nil
-	}
-
-	tx := s.b.BatchTx()
-	tx.Lock()
-	defer tx.Unlock()
-	// fetch events from the backend using revisions
-	for _, rev := range revs {
-		start, end := revBytesRange(rev)
-
-		ks, vs := tx.UnsafeRange(keyBucketName, start, end, 0)
-		if len(vs) != 1 {
-			log.Fatalf("storage: range cannot find rev (%d,%d)", rev.main, rev.sub)
-		}
-
-		var kv storagepb.KeyValue
-		if err := kv.Unmarshal(vs[0]); err != nil {
-			log.Fatalf("storage: cannot unmarshal event: %v", err)
-		}
-		revbs = append(revbs, ks[0])
-		kvs = append(kvs, kv)
-		if limit > 0 && len(kvs) >= int(limit) {
-			return revbs, kvs, rev.main + 1, nil
-		}
-	}
-	return revbs, kvs, s.currentRev.main + 1, nil
-}
-
 func (s *store) Compact(rev int64) error {
 	s.mu.Lock()
 	defer s.mu.Unlock()

+ 0 - 263
storage/kvstore_test.go

@@ -262,73 +262,6 @@ func TestStoreDeleteRange(t *testing.T) {
 	}
 }
 
-func TestStoreRangeHistory(t *testing.T) {
-	key := newTestKeyBytes(revision{2, 0}, false)
-	kv := storagepb.KeyValue{
-		Key:            []byte("foo"),
-		Value:          []byte("bar"),
-		CreateRevision: 1,
-		ModRevision:    2,
-		Version:        1,
-	}
-	kvb, err := kv.Marshal()
-	if err != nil {
-		t.Fatal(err)
-	}
-	currev := revision{2, 0}
-
-	tests := []struct {
-		idxr indexRangeEventsResp
-		r    rangeResp
-	}{
-		{
-			indexRangeEventsResp{[]revision{{2, 0}}},
-			rangeResp{[][]byte{key}, [][]byte{kvb}},
-		},
-		{
-			indexRangeEventsResp{[]revision{{2, 0}, {3, 0}}},
-			rangeResp{[][]byte{key}, [][]byte{kvb}},
-		},
-	}
-	for i, tt := range tests {
-		s := newFakeStore()
-		b := s.b.(*fakeBackend)
-		fi := s.kvindex.(*fakeIndex)
-
-		s.currentRev = currev
-		fi.indexRangeEventsRespc <- tt.idxr
-		b.tx.rangeRespc <- tt.r
-
-		keys, kvs, _, err := s.RangeHistory([]byte("foo"), []byte("goo"), 1, 1)
-		if err != nil {
-			t.Errorf("#%d: err = %v, want nil", i, err)
-		}
-		if w := [][]byte{key}; !reflect.DeepEqual(keys, w) {
-			t.Errorf("#%d: keys = %+v, want %+v", i, keys, w)
-		}
-		if w := []storagepb.KeyValue{kv}; !reflect.DeepEqual(kvs, w) {
-			t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, w)
-		}
-
-		wact := []testutil.Action{
-			{"rangeEvents", []interface{}{[]byte("foo"), []byte("goo"), int64(1)}},
-		}
-		if g := fi.Action(); !reflect.DeepEqual(g, wact) {
-			t.Errorf("#%d: index action = %+v, want %+v", i, g, wact)
-		}
-		wstart, wend := revBytesRange(tt.idxr.revs[0])
-		wact = []testutil.Action{
-			{"range", []interface{}{keyBucketName, wstart, wend, int64(0)}},
-		}
-		if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
-			t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact)
-		}
-		if s.currentRev != currev {
-			t.Errorf("#%d: current rev = %+v, want %+v", i, s.currentRev, currev)
-		}
-	}
-}
-
 func TestStoreCompact(t *testing.T) {
 	s := newFakeStore()
 	b := s.b.(*fakeBackend)
@@ -420,202 +353,6 @@ func TestStoreRestore(t *testing.T) {
 	}
 }
 
-// tests end parameter works well
-func TestStoreRangeHistoryEnd(t *testing.T) {
-	s := newStore(tmpPath)
-	defer cleanup(s, tmpPath)
-
-	s.Put([]byte("foo"), []byte("bar"))
-	s.Put([]byte("foo1"), []byte("bar1"))
-	s.Put([]byte("foo2"), []byte("bar2"))
-	keys := [][]byte{
-		newTestKeyBytes(revision{1, 0}, false),
-		newTestKeyBytes(revision{2, 0}, false),
-		newTestKeyBytes(revision{3, 0}, false),
-	}
-	kvs := []storagepb.KeyValue{
-		{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1},
-		{Key: []byte("foo1"), Value: []byte("bar1"), CreateRevision: 2, ModRevision: 2, Version: 1},
-		{Key: []byte("foo2"), Value: []byte("bar2"), CreateRevision: 3, ModRevision: 3, Version: 1},
-	}
-
-	tests := []struct {
-		key, end []byte
-		wkeys    [][]byte
-		wkvs     []storagepb.KeyValue
-	}{
-		// get no keys
-		{
-			[]byte("doo"), []byte("foo"),
-			nil, nil,
-		},
-		// get no keys when key == end
-		{
-			[]byte("foo"), []byte("foo"),
-			nil, nil,
-		},
-		// get no keys when ranging single key
-		{
-			[]byte("doo"), nil,
-			nil, nil,
-		},
-		// get all keys
-		{
-			[]byte("foo"), []byte("foo3"),
-			keys, kvs,
-		},
-		// get partial keys
-		{
-			[]byte("foo"), []byte("foo1"),
-			keys[:1], kvs[:1],
-		},
-		// get single key
-		{
-			[]byte("foo"), nil,
-			keys[:1], kvs[:1],
-		},
-	}
-
-	for i, tt := range tests {
-		keys, kvs, rev, err := s.RangeHistory(tt.key, tt.end, 0, 1)
-		if err != nil {
-			t.Fatal(err)
-		}
-		if rev != 4 {
-			t.Errorf("#%d: rev = %d, want %d", i, rev, 4)
-		}
-		if !reflect.DeepEqual(keys, tt.wkeys) {
-			t.Errorf("#%d: actions = %+v, want %+v", i, keys, tt.wkeys)
-		}
-		if !reflect.DeepEqual(kvs, tt.wkvs) {
-			t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, tt.wkvs)
-		}
-	}
-}
-
-func TestStoreRangeHistoryRev(t *testing.T) {
-	s := newStore(tmpPath)
-	defer cleanup(s, tmpPath)
-
-	s.Put([]byte("foo"), []byte("bar"))
-	s.DeleteRange([]byte("foo"), nil)
-	s.Put([]byte("foo"), []byte("bar"))
-	s.Put([]byte("unrelated"), []byte("unrelated"))
-	keys := [][]byte{
-		newTestKeyBytes(revision{1, 0}, false),
-		newTestKeyBytes(revision{2, 0}, true),
-		newTestKeyBytes(revision{3, 0}, false),
-	}
-	kvs := []storagepb.KeyValue{
-		{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1},
-		{Key: []byte("foo")},
-		{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 3, ModRevision: 3, Version: 1},
-	}
-
-	tests := []struct {
-		start int64
-
-		wkeys [][]byte
-		wkvs  []storagepb.KeyValue
-		wnext int64
-	}{
-		{0, keys, kvs, 5},
-		{1, keys, kvs, 5},
-		{3, keys[2:], kvs[2:], 5},
-	}
-
-	for i, tt := range tests {
-		keys, kvs, next, err := s.RangeHistory([]byte("foo"), nil, 0, tt.start)
-		if err != nil {
-			t.Fatal(err)
-		}
-		if !reflect.DeepEqual(keys, tt.wkeys) {
-			t.Errorf("#%d: acts = %+v, want %+v", i, keys, tt.wkeys)
-		}
-		if !reflect.DeepEqual(kvs, tt.wkvs) {
-			t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, tt.wkvs)
-		}
-		if next != tt.wnext {
-			t.Errorf("#%d: next = %d, want %d", i, next, tt.wnext)
-		}
-	}
-}
-
-func TestStoreRangeHistoryBad(t *testing.T) {
-	s := newStore(tmpPath)
-	defer cleanup(s, tmpPath)
-
-	s.Put([]byte("foo"), []byte("bar"))
-	s.Put([]byte("foo"), []byte("bar1"))
-	s.Put([]byte("foo"), []byte("bar2"))
-	if err := s.Compact(3); err != nil {
-		t.Fatalf("compact error (%v)", err)
-	}
-
-	tests := []struct {
-		rev  int64
-		werr error
-	}{
-		{1, ErrCompacted},
-		{2, ErrCompacted},
-		{3, ErrCompacted},
-		{4, ErrFutureRev},
-		{10, ErrFutureRev},
-	}
-	for i, tt := range tests {
-		_, _, _, err := s.RangeHistory([]byte("foo"), nil, 0, tt.rev)
-		if err != tt.werr {
-			t.Errorf("#%d: error = %v, want %v", i, err, tt.werr)
-		}
-	}
-}
-
-func TestStoreRangeHistoryLimit(t *testing.T) {
-	s := newStore(tmpPath)
-	defer cleanup(s, tmpPath)
-
-	s.Put([]byte("foo"), []byte("bar"))
-	s.DeleteRange([]byte("foo"), nil)
-	s.Put([]byte("foo"), []byte("bar"))
-	keys := [][]byte{
-		newTestKeyBytes(revision{1, 0}, false),
-		newTestKeyBytes(revision{2, 0}, true),
-		newTestKeyBytes(revision{3, 0}, false),
-	}
-	kvs := []storagepb.KeyValue{
-		{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1},
-		{Key: []byte("foo")},
-		{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 3, ModRevision: 3, Version: 1},
-	}
-
-	tests := []struct {
-		limit int64
-		wkeys [][]byte
-		wkvs  []storagepb.KeyValue
-	}{
-		// no limit
-		{-1, keys, kvs},
-		// no limit
-		{0, keys, kvs},
-		{1, keys[:1], kvs[:1]},
-		{2, keys[:2], kvs[:2]},
-		{3, keys, kvs},
-		{100, keys, kvs},
-	}
-	for i, tt := range tests {
-		keys, kvs, _, err := s.RangeHistory([]byte("foo"), nil, tt.limit, 1)
-		if err != nil {
-			t.Fatalf("#%d: range error (%v)", i, err)
-		}
-		if !reflect.DeepEqual(keys, tt.wkeys) {
-			t.Errorf("#%d: acts = %+v, want %+v", i, keys, tt.wkeys)
-		}
-		if !reflect.DeepEqual(kvs, tt.wkvs) {
-			t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, tt.wkvs)
-		}
-	}
-}
-
 func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
 	s0 := newStore(tmpPath)
 	defer os.Remove(tmpPath)

+ 84 - 35
storage/watchable_store.go

@@ -17,6 +17,7 @@ package storage
 import (
 	"fmt"
 	"log"
+	"math"
 	"sync"
 	"time"
 
@@ -241,57 +242,105 @@ func (s *watchableStore) syncWatchingsLoop() {
 	}
 }
 
-// syncWatchings syncs the watchings in the unsyncd map.
+// syncWatchings periodically syncs unsynced watchings by: Iterate all unsynced
+// watchings to get the minimum revision within its range, skipping the
+// watching if its current revision is behind the compact revision of the
+// store. And use this minimum revision to get all key-value pairs. Then send
+// those events to watchings.
 func (s *watchableStore) syncWatchings() {
-	_, curRev, _ := s.store.Range(nil, nil, 0, 0)
+	s.store.mu.Lock()
+	defer s.store.mu.Unlock()
+
+	if len(s.unsynced) == 0 {
+		return
+	}
+
+	// in order to find key-value pairs from unsynced watchings, we need to
+	// find min revision index, and these revisions can be used to
+	// query the backend store of key-value pairs
+	minRev := int64(math.MaxInt64)
+
+	curRev := s.store.currentRev.main
+	compactionRev := s.store.compactMainRev
+
+	// TODO: change unsynced struct type same to this
+	keyToUnsynced := make(map[string]map[*watching]struct{})
+
 	for w := range s.unsynced {
-		var end []byte
-		if w.prefix {
-			end = make([]byte, len(w.key))
-			copy(end, w.key)
-			end[len(w.key)-1]++
+		k := string(w.key)
+
+		if w.cur > curRev {
+			panic("watching current revision should not exceed current revision")
 		}
-		limit := cap(w.ch) - len(w.ch)
-		// the channel is full, try it in the next round
-		if limit == 0 {
+
+		if w.cur < compactionRev {
+			// TODO: return error compacted to that watching instead of
+			// just removing it sliently from unsynced.
+			delete(s.unsynced, w)
 			continue
 		}
-		revbs, kvs, nextRev, err := s.store.RangeHistory(w.key, end, int64(limit), w.cur)
-		if err != nil {
-			// TODO: send error event to watching
-			delete(s.unsynced, w)
+
+		if minRev >= w.cur {
+			minRev = w.cur
+		}
+
+		if _, ok := keyToUnsynced[k]; !ok {
+			keyToUnsynced[k] = make(map[*watching]struct{})
+		}
+		keyToUnsynced[k][w] = struct{}{}
+	}
+
+	minBytes, maxBytes := newRevBytes(), newRevBytes()
+	revToBytes(revision{main: minRev}, minBytes)
+	revToBytes(revision{main: curRev + 1}, maxBytes)
+
+	// UnsafeRange returns keys and values. And in boltdb, keys are revisions.
+	// values are actual key-value pairs in backend.
+	tx := s.store.b.BatchTx()
+	tx.Lock()
+	ks, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0)
+	tx.Unlock()
+
+	for i, v := range vs {
+		var kv storagepb.KeyValue
+		if err := kv.Unmarshal(v); err != nil {
+			log.Panicf("storage: cannot unmarshal event: %v", err)
+		}
+
+		k := string(kv.Key)
+		wm, ok := keyToUnsynced[k]
+		if !ok {
 			continue
 		}
 
-		// push events to the channel
-		for i, kv := range kvs {
-			var evt storagepb.Event_EventType
-			switch {
-			case isTombstone(revbs[i]):
-				evt = storagepb.DELETE
-			default:
-				evt = storagepb.PUT
-			}
+		var ev storagepb.Event
+		switch {
+		case isTombstone(ks[i]):
+			ev.Type = storagepb.DELETE
+		default:
+			ev.Type = storagepb.PUT
+		}
+		ev.Kv = &kv
+
+		for w := range wm {
+			ev.WatchID = w.id
 
-			w.ch <- storagepb.Event{
-				Type:    evt,
-				Kv:      &kv,
-				WatchID: w.id,
+			select {
+			case w.ch <- ev:
+				pendingEventsGauge.Inc()
+			default:
+				// TODO: handle the full unsynced watchings.
+				// continue to process other watchings for now, the full ones
+				// will be processed next time and hopefully it will not be full.
+				continue
 			}
-			pendingEventsGauge.Inc()
-		}
-		// switch to tracking future events if needed
-		if nextRev > curRev {
-			k := string(w.key)
 			if err := unsafeAddWatching(&s.synced, k, w); err != nil {
 				log.Panicf("error unsafeAddWatching (%v) for key %s", err, k)
 			}
 			delete(s.unsynced, w)
-			continue
 		}
-		// put it back to try it in the next round
-		w.cur = nextRev
 	}
+
 	slowWatchingGauge.Set(float64(len(s.unsynced)))
 }