Jelajahi Sumber

storage: clarify comment for store.RangeEvents and fix related bugs

Change to the function:
1. specify the meaning of startRev and endRev parameters
2. specify the meaning of returned nextRev

Moreover, it adds unit tests for the function.
Yicheng Qin 10 tahun lalu
induk
melakukan
d72914c36f
2 mengubah file dengan 299 tambahan dan 12 penghapusan
  1. 19 5
      storage/kvstore.go
  2. 280 7
      storage/kvstore_test.go

+ 19 - 5
storage/kvstore.go

@@ -193,20 +193,34 @@ func (s *store) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err
 	return n, rev, nil
 	return n, rev, nil
 }
 }
 
 
-// RangeEvents gets the events from key to end at or after rangeRev.
-// If rangeRev <=0, rangeEvents returns events from the beginning of the history.
+// RangeEvents gets the events from key to end in [startRev, endRev).
 // If `end` is nil, the request only observes the events on key.
 // If `end` is nil, the request only observes the events on key.
 // If `end` is not nil, it observes the events on key range [key, range_end).
 // If `end` is not nil, it observes the events on key range [key, range_end).
 // Limit limits the number of events returned.
 // Limit limits the number of events returned.
-// If the required rev is compacted, ErrCompacted will be returned.
+// If startRev <=0, rangeEvents returns events from the beginning of uncompacted history.
+// If endRev <=0, it indicates there is no end revision.
+//
+// If the required start rev is compacted, ErrCompacted will be returned.
+// If the required start rev has not happened, ErrFutureRev will be returned.
+//
+// RangeEvents returns events that satisfy the requirement (0 <= n <= limit).
+// If events in the revision range have not all happened, it returns immeidately
+// what is available.
+// It also returns nextRev which indicates the start revision used for the following
+// RangeEvents call. The nextRev could be smaller than the given endRev if the store
+// has not progressed so far or it hits the event limit.
+//
 // TODO: return byte slices instead of events to avoid meaningless encode and decode.
 // TODO: return byte slices instead of events to avoid meaningless encode and decode.
 func (s *store) RangeEvents(key, end []byte, limit, startRev, endRev int64) (evs []storagepb.Event, nextRev int64, err error) {
 func (s *store) RangeEvents(key, end []byte, limit, startRev, endRev int64) (evs []storagepb.Event, nextRev int64, err error) {
 	s.mu.Lock()
 	s.mu.Lock()
 	defer s.mu.Unlock()
 	defer s.mu.Unlock()
 
 
-	if startRev <= s.compactMainRev {
+	if startRev > 0 && startRev <= s.compactMainRev {
 		return nil, 0, ErrCompacted
 		return nil, 0, ErrCompacted
 	}
 	}
+	if startRev > s.currentRev.main {
+		return nil, 0, ErrFutureRev
+	}
 
 
 	revs := s.kvindex.RangeEvents(key, end, startRev)
 	revs := s.kvindex.RangeEvents(key, end, startRev)
 	if len(revs) == 0 {
 	if len(revs) == 0 {
@@ -218,7 +232,7 @@ func (s *store) RangeEvents(key, end []byte, limit, startRev, endRev int64) (evs
 	defer tx.Unlock()
 	defer tx.Unlock()
 	// fetch events from the backend using revisions
 	// fetch events from the backend using revisions
 	for _, rev := range revs {
 	for _, rev := range revs {
-		if rev.main >= endRev {
+		if endRev > 0 && rev.main >= endRev {
 			return evs, rev.main, nil
 			return evs, rev.main, nil
 		}
 		}
 		revbytes := newRevBytes()
 		revbytes := newRevBytes()

+ 280 - 7
storage/kvstore_test.go

@@ -258,6 +258,68 @@ func TestStoreDeleteRange(t *testing.T) {
 	}
 	}
 }
 }
 
 
+func TestStoreRangeEvents(t *testing.T) {
+	ev := storagepb.Event{
+		Type: storagepb.PUT,
+		Kv: &storagepb.KeyValue{
+			Key:            []byte("foo"),
+			Value:          []byte("bar"),
+			CreateRevision: 1,
+			ModRevision:    2,
+			Version:        1,
+		},
+	}
+	evb, err := ev.Marshal()
+	if err != nil {
+		t.Fatal(err)
+	}
+	currev := revision{2, 0}
+
+	tests := []struct {
+		idxr indexRangeEventsResp
+		r    rangeResp
+	}{
+		{
+			indexRangeEventsResp{[]revision{{2, 0}}},
+			rangeResp{[][]byte{newTestBytes(revision{2, 0})}, [][]byte{evb}},
+		},
+		{
+			indexRangeEventsResp{[]revision{{2, 0}, {3, 0}}},
+			rangeResp{[][]byte{newTestBytes(revision{2, 0})}, [][]byte{evb}},
+		},
+	}
+	for i, tt := range tests {
+		s, b, index := newFakeStore()
+		s.currentRev = currev
+		index.indexRangeEventsRespc <- tt.idxr
+		b.tx.rangeRespc <- tt.r
+
+		evs, _, err := s.RangeEvents([]byte("foo"), []byte("goo"), 1, 1, 4)
+		if err != nil {
+			t.Errorf("#%d: err = %v, want nil", i, err)
+		}
+		if w := []storagepb.Event{ev}; !reflect.DeepEqual(evs, w) {
+			t.Errorf("#%d: evs = %+v, want %+v", i, evs, w)
+		}
+
+		wact := []testutil.Action{
+			{"rangeEvents", []interface{}{[]byte("foo"), []byte("goo"), int64(1)}},
+		}
+		if g := index.Action(); !reflect.DeepEqual(g, wact) {
+			t.Errorf("#%d: index action = %+v, want %+v", i, g, wact)
+		}
+		wact = []testutil.Action{
+			{"range", []interface{}{keyBucketName, newTestBytes(tt.idxr.revs[0]), []byte(nil), int64(0)}},
+		}
+		if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
+			t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact)
+		}
+		if s.currentRev != currev {
+			t.Errorf("#%d: current rev = %+v, want %+v", i, s.currentRev, currev)
+		}
+	}
+}
+
 func TestStoreCompact(t *testing.T) {
 func TestStoreCompact(t *testing.T) {
 	s, b, index := newFakeStore()
 	s, b, index := newFakeStore()
 	s.currentRev = revision{3, 0}
 	s.currentRev = revision{3, 0}
@@ -346,6 +408,209 @@ func TestStoreRestore(t *testing.T) {
 	}
 	}
 }
 }
 
 
+// tests end parameter works well
+func TestStoreRangeEventsEnd(t *testing.T) {
+	s := newStore(tmpPath)
+	defer cleanup(s, tmpPath)
+
+	s.Put([]byte("foo"), []byte("bar"))
+	s.Put([]byte("foo1"), []byte("bar1"))
+	s.Put([]byte("foo2"), []byte("bar2"))
+	evs := []storagepb.Event{
+		{
+			Type: storagepb.PUT,
+			Kv:   &storagepb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1},
+		},
+		{
+			Type: storagepb.PUT,
+			Kv:   &storagepb.KeyValue{Key: []byte("foo1"), Value: []byte("bar1"), CreateRevision: 2, ModRevision: 2, Version: 1},
+		},
+		{
+			Type: storagepb.PUT,
+			Kv:   &storagepb.KeyValue{Key: []byte("foo2"), Value: []byte("bar2"), CreateRevision: 3, ModRevision: 3, Version: 1},
+		},
+	}
+
+	tests := []struct {
+		key, end []byte
+		wevs     []storagepb.Event
+	}{
+		// get no keys
+		{
+			[]byte("doo"), []byte("foo"),
+			nil,
+		},
+		// get no keys when key == end
+		{
+			[]byte("foo"), []byte("foo"),
+			nil,
+		},
+		// get no keys when ranging single key
+		{
+			[]byte("doo"), nil,
+			nil,
+		},
+		// get all keys
+		{
+			[]byte("foo"), []byte("foo3"),
+			evs,
+		},
+		// get partial keys
+		{
+			[]byte("foo"), []byte("foo1"),
+			evs[:1],
+		},
+		// get single key
+		{
+			[]byte("foo"), nil,
+			evs[:1],
+		},
+	}
+
+	for i, tt := range tests {
+		evs, rev, err := s.RangeEvents(tt.key, tt.end, 0, 1, 100)
+		if err != nil {
+			t.Fatal(err)
+		}
+		if rev != 4 {
+			t.Errorf("#%d: rev = %d, want %d", i, rev, 4)
+		}
+		if !reflect.DeepEqual(evs, tt.wevs) {
+			t.Errorf("#%d: evs = %+v, want %+v", i, evs, tt.wevs)
+		}
+	}
+}
+
+func TestStoreRangeEventsRev(t *testing.T) {
+	s := newStore(tmpPath)
+	defer cleanup(s, tmpPath)
+
+	s.Put([]byte("foo"), []byte("bar"))
+	s.DeleteRange([]byte("foo"), nil)
+	s.Put([]byte("foo"), []byte("bar"))
+	s.Put([]byte("unrelated"), []byte("unrelated"))
+	evs := []storagepb.Event{
+		{
+			Type: storagepb.PUT,
+			Kv:   &storagepb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1},
+		},
+		{
+			Type: storagepb.DELETE,
+			Kv:   &storagepb.KeyValue{Key: []byte("foo")},
+		},
+		{
+			Type: storagepb.PUT,
+			Kv:   &storagepb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 3, ModRevision: 3, Version: 1},
+		},
+	}
+
+	tests := []struct {
+		start int64
+		end   int64
+		wevs  []storagepb.Event
+		wnext int64
+	}{
+		{1, 1, nil, 1},
+		{1, 2, evs[:1], 2},
+		{1, 3, evs[:2], 3},
+		{1, 4, evs, 5},
+		{1, 5, evs, 5},
+		{1, 10, evs, 5},
+		{3, 4, evs[2:], 5},
+		{0, 10, evs, 5},
+		{1, 0, evs, 5},
+		{0, 0, evs, 5},
+	}
+
+	for i, tt := range tests {
+		evs, next, err := s.RangeEvents([]byte("foo"), nil, 0, tt.start, tt.end)
+		if err != nil {
+			t.Fatal(err)
+		}
+		if !reflect.DeepEqual(evs, tt.wevs) {
+			t.Errorf("#%d: evs = %+v, want %+v", i, evs, tt.wevs)
+		}
+		if next != tt.wnext {
+			t.Errorf("#%d: next = %d, want %d", i, next, tt.wnext)
+		}
+	}
+}
+
+func TestStoreRangeEventsBad(t *testing.T) {
+	s := newStore(tmpPath)
+	defer cleanup(s, tmpPath)
+
+	s.Put([]byte("foo"), []byte("bar"))
+	s.Put([]byte("foo"), []byte("bar1"))
+	s.Put([]byte("foo"), []byte("bar2"))
+	if err := s.Compact(3); err != nil {
+		t.Fatalf("compact error (%v)", err)
+	}
+
+	tests := []struct {
+		rev  int64
+		werr error
+	}{
+		{1, ErrCompacted},
+		{2, ErrCompacted},
+		{3, ErrCompacted},
+		{4, ErrFutureRev},
+		{10, ErrFutureRev},
+	}
+	for i, tt := range tests {
+		_, _, err := s.RangeEvents([]byte("foo"), nil, 0, tt.rev, 100)
+		if err != tt.werr {
+			t.Errorf("#%d: error = %v, want %v", i, err, tt.werr)
+		}
+	}
+}
+
+func TestStoreRangeEventsLimit(t *testing.T) {
+	s := newStore(tmpPath)
+	defer cleanup(s, tmpPath)
+
+	s.Put([]byte("foo"), []byte("bar"))
+	s.DeleteRange([]byte("foo"), nil)
+	s.Put([]byte("foo"), []byte("bar"))
+	evs := []storagepb.Event{
+		{
+			Type: storagepb.PUT,
+			Kv:   &storagepb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1},
+		},
+		{
+			Type: storagepb.DELETE,
+			Kv:   &storagepb.KeyValue{Key: []byte("foo")},
+		},
+		{
+			Type: storagepb.PUT,
+			Kv:   &storagepb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 3, ModRevision: 3, Version: 1},
+		},
+	}
+
+	tests := []struct {
+		limit int64
+		wevs  []storagepb.Event
+	}{
+		// no limit
+		{-1, evs},
+		// no limit
+		{0, evs},
+		{1, evs[:1]},
+		{2, evs[:2]},
+		{3, evs},
+		{100, evs},
+	}
+	for i, tt := range tests {
+		evs, _, err := s.RangeEvents([]byte("foo"), nil, tt.limit, 1, 100)
+		if err != nil {
+			t.Fatalf("#%d: range error (%v)", i, err)
+		}
+		if !reflect.DeepEqual(evs, tt.wevs) {
+			t.Errorf("#%d: evs = %+v, want %+v", i, evs, tt.wevs)
+		}
+	}
+}
+
 func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
 func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
 	s0 := newStore(tmpPath)
 	s0 := newStore(tmpPath)
 	defer os.Remove(tmpPath)
 	defer os.Remove(tmpPath)
@@ -412,9 +677,10 @@ func newTestBytes(rev revision) []byte {
 func newFakeStore() (*store, *fakeBackend, *fakeIndex) {
 func newFakeStore() (*store, *fakeBackend, *fakeIndex) {
 	b := &fakeBackend{&fakeBatchTx{rangeRespc: make(chan rangeResp, 5)}}
 	b := &fakeBackend{&fakeBatchTx{rangeRespc: make(chan rangeResp, 5)}}
 	index := &fakeIndex{
 	index := &fakeIndex{
-		indexGetRespc:     make(chan indexGetResp, 1),
-		indexRangeRespc:   make(chan indexRangeResp, 1),
-		indexCompactRespc: make(chan map[revision]struct{}, 1),
+		indexGetRespc:         make(chan indexGetResp, 1),
+		indexRangeRespc:       make(chan indexRangeResp, 1),
+		indexRangeEventsRespc: make(chan indexRangeEventsResp, 1),
+		indexCompactRespc:     make(chan map[revision]struct{}, 1),
 	}
 	}
 	return &store{
 	return &store{
 		b:              b,
 		b:              b,
@@ -472,11 +738,16 @@ type indexRangeResp struct {
 	revs []revision
 	revs []revision
 }
 }
 
 
+type indexRangeEventsResp struct {
+	revs []revision
+}
+
 type fakeIndex struct {
 type fakeIndex struct {
 	testutil.Recorder
 	testutil.Recorder
-	indexGetRespc     chan indexGetResp
-	indexRangeRespc   chan indexRangeResp
-	indexCompactRespc chan map[revision]struct{}
+	indexGetRespc         chan indexGetResp
+	indexRangeRespc       chan indexRangeResp
+	indexRangeEventsRespc chan indexRangeEventsResp
+	indexCompactRespc     chan map[revision]struct{}
 }
 }
 
 
 func (i *fakeIndex) Get(key []byte, atRev int64) (rev, created revision, ver int64, err error) {
 func (i *fakeIndex) Get(key []byte, atRev int64) (rev, created revision, ver int64, err error) {
@@ -500,7 +771,9 @@ func (i *fakeIndex) Tombstone(key []byte, rev revision) error {
 	return nil
 	return nil
 }
 }
 func (i *fakeIndex) RangeEvents(key, end []byte, rev int64) []revision {
 func (i *fakeIndex) RangeEvents(key, end []byte, rev int64) []revision {
-	return nil
+	i.Recorder.Record(testutil.Action{Name: "rangeEvents", Params: []interface{}{key, end, rev}})
+	r := <-i.indexRangeEventsRespc
+	return r.revs
 }
 }
 func (i *fakeIndex) Compact(rev int64) map[revision]struct{} {
 func (i *fakeIndex) Compact(rev int64) map[revision]struct{} {
 	i.Recorder.Record(testutil.Action{Name: "compact", Params: []interface{}{rev}})
 	i.Recorder.Record(testutil.Action{Name: "compact", Params: []interface{}{rev}})