Browse Source

Merge pull request #3558 from yichengq/watch

storage: add tests for RangeEvents and its underlying functions
Yicheng Qin 10 years ago
parent
commit
f3d2b5831c
5 changed files with 524 additions and 54 deletions
  1. 55 0
      storage/index_test.go
  2. 6 1
      storage/key_index.go
  3. 164 41
      storage/key_index_test.go
  4. 19 5
      storage/kvstore.go
  5. 280 7
      storage/kvstore_test.go

+ 55 - 0
storage/index_test.go

@@ -136,6 +136,61 @@ func TestIndexTombstone(t *testing.T) {
 	}
 	}
 }
 }
 
 
+func TestIndexRangeEvents(t *testing.T) {
+	allKeys := [][]byte{[]byte("foo"), []byte("foo1"), []byte("foo2"), []byte("foo2"), []byte("foo1"), []byte("foo")}
+	allRevs := []revision{{main: 1}, {main: 2}, {main: 3}, {main: 4}, {main: 5}, {main: 6}}
+
+	index := newTreeIndex()
+	for i := range allKeys {
+		index.Put(allKeys[i], allRevs[i])
+	}
+
+	atRev := int64(1)
+	tests := []struct {
+		key, end []byte
+		wrevs    []revision
+	}{
+		// single key that not found
+		{
+			[]byte("bar"), nil, nil,
+		},
+		// single key that found
+		{
+			[]byte("foo"), nil, []revision{{main: 1}, {main: 6}},
+		},
+		// range keys, return first member
+		{
+			[]byte("foo"), []byte("foo1"), []revision{{main: 1}, {main: 6}},
+		},
+		// range keys, return first two members
+		{
+			[]byte("foo"), []byte("foo2"), []revision{{main: 1}, {main: 2}, {main: 5}, {main: 6}},
+		},
+		// range keys, return all members
+		{
+			[]byte("foo"), []byte("fop"), allRevs,
+		},
+		// range keys, return last two members
+		{
+			[]byte("foo1"), []byte("fop"), []revision{{main: 2}, {main: 3}, {main: 4}, {main: 5}},
+		},
+		// range keys, return last member
+		{
+			[]byte("foo2"), []byte("fop"), []revision{{main: 3}, {main: 4}},
+		},
+		// range keys, return nothing
+		{
+			[]byte("foo3"), []byte("fop"), nil,
+		},
+	}
+	for i, tt := range tests {
+		revs := index.RangeEvents(tt.key, tt.end, atRev)
+		if !reflect.DeepEqual(revs, tt.wrevs) {
+			t.Errorf("#%d: revs = %+v, want %+v", i, revs, tt.wrevs)
+		}
+	}
+}
+
 func TestIndexCompact(t *testing.T) {
 func TestIndexCompact(t *testing.T) {
 	maxRev := int64(20)
 	maxRev := int64(20)
 	tests := []struct {
 	tests := []struct {

+ 6 - 1
storage/key_index.go

@@ -157,7 +157,11 @@ func (ki *keyIndex) since(rev int64) []revision {
 	var gi int
 	var gi int
 	// find the generations to start checking
 	// find the generations to start checking
 	for gi = len(ki.generations) - 1; gi > 0; gi-- {
 	for gi = len(ki.generations) - 1; gi > 0; gi-- {
-		if since.GreaterThan(ki.generations[gi].created) {
+		g := ki.generations[gi]
+		if g.isEmpty() {
+			continue
+		}
+		if since.GreaterThan(g.created) {
 			break
 			break
 		}
 		}
 	}
 	}
@@ -173,6 +177,7 @@ func (ki *keyIndex) since(rev int64) []revision {
 				// replace the revision with a new one that has higher sub value,
 				// replace the revision with a new one that has higher sub value,
 				// because the original one should not be seen by external
 				// because the original one should not be seen by external
 				revs[len(revs)-1] = r
 				revs[len(revs)-1] = r
+				continue
 			}
 			}
 			revs = append(revs, r)
 			revs = append(revs, r)
 			last = r.main
 			last = r.main

+ 164 - 41
storage/key_index_test.go

@@ -20,50 +20,104 @@ import (
 )
 )
 
 
 func TestKeyIndexGet(t *testing.T) {
 func TestKeyIndexGet(t *testing.T) {
-	// key:     "foo"
-	// rev: 12
+	// key: "foo"
+	// rev: 16
 	// generations:
 	// generations:
 	//    {empty}
 	//    {empty}
-	//    {8[1], 10[2], 12(t)[3]}
-	//    {4[2], 6(t)[3]}
+	//    {{14, 0}[1], {14, 1}[2], {16, 0}(t)[3]}
+	//    {{8, 0}[1], {10, 0}[2], {12, 0}(t)[3]}
+	//    {{2, 0}[1], {4, 0}[2], {6, 0}(t)[3]}
 	ki := newTestKeyIndex()
 	ki := newTestKeyIndex()
 	ki.compact(4, make(map[revision]struct{}))
 	ki.compact(4, make(map[revision]struct{}))
 
 
 	tests := []struct {
 	tests := []struct {
 		rev int64
 		rev int64
 
 
-		wrev int64
-		werr error
+		wmod   revision
+		wcreat revision
+		wver   int64
+		werr   error
 	}{
 	}{
-		{13, 0, ErrRevisionNotFound},
-		{12, 0, ErrRevisionNotFound},
+		{17, revision{}, revision{}, 0, ErrRevisionNotFound},
+		{16, revision{}, revision{}, 0, ErrRevisionNotFound},
+
+		// get on generation 3
+		{15, revision{14, 1}, revision{14, 0}, 2, nil},
+		{14, revision{14, 1}, revision{14, 0}, 2, nil},
+
+		{13, revision{}, revision{}, 0, ErrRevisionNotFound},
+		{12, revision{}, revision{}, 0, ErrRevisionNotFound},
 
 
 		// get on generation 2
 		// get on generation 2
-		{11, 10, nil},
-		{10, 10, nil},
-		{9, 8, nil},
-		{8, 8, nil},
+		{11, revision{10, 0}, revision{8, 0}, 2, nil},
+		{10, revision{10, 0}, revision{8, 0}, 2, nil},
+		{9, revision{8, 0}, revision{8, 0}, 1, nil},
+		{8, revision{8, 0}, revision{8, 0}, 1, nil},
 
 
-		{7, 0, ErrRevisionNotFound},
-		{6, 0, ErrRevisionNotFound},
+		{7, revision{}, revision{}, 0, ErrRevisionNotFound},
+		{6, revision{}, revision{}, 0, ErrRevisionNotFound},
 
 
 		// get on generation 1
 		// get on generation 1
-		{5, 4, nil},
-		{4, 4, nil},
+		{5, revision{4, 0}, revision{2, 0}, 2, nil},
+		{4, revision{4, 0}, revision{2, 0}, 2, nil},
 
 
-		{3, 0, ErrRevisionNotFound},
-		{2, 0, ErrRevisionNotFound},
-		{1, 0, ErrRevisionNotFound},
-		{0, 0, ErrRevisionNotFound},
+		{3, revision{}, revision{}, 0, ErrRevisionNotFound},
+		{2, revision{}, revision{}, 0, ErrRevisionNotFound},
+		{1, revision{}, revision{}, 0, ErrRevisionNotFound},
+		{0, revision{}, revision{}, 0, ErrRevisionNotFound},
 	}
 	}
 
 
 	for i, tt := range tests {
 	for i, tt := range tests {
-		rev, _, _, err := ki.get(tt.rev)
+		mod, creat, ver, err := ki.get(tt.rev)
 		if err != tt.werr {
 		if err != tt.werr {
 			t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
 			t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
 		}
 		}
-		if rev.main != tt.wrev {
-			t.Errorf("#%d: rev = %d, want %d", i, rev.main, tt.rev)
+		if mod != tt.wmod {
+			t.Errorf("#%d: modified = %+v, want %+v", i, mod, tt.wmod)
+		}
+		if creat != tt.wcreat {
+			t.Errorf("#%d: created = %+v, want %+v", i, creat, tt.wcreat)
+		}
+		if ver != tt.wver {
+			t.Errorf("#%d: version = %d, want %d", i, ver, tt.wver)
+		}
+	}
+}
+
+func TestKeyIndexSince(t *testing.T) {
+	ki := newTestKeyIndex()
+	ki.compact(4, make(map[revision]struct{}))
+
+	allRevs := []revision{{4, 0}, {6, 0}, {8, 0}, {10, 0}, {12, 0}, {14, 1}, {16, 0}}
+	tests := []struct {
+		rev int64
+
+		wrevs []revision
+	}{
+		{17, nil},
+		{16, allRevs[6:]},
+		{15, allRevs[6:]},
+		{14, allRevs[5:]},
+		{13, allRevs[5:]},
+		{12, allRevs[4:]},
+		{11, allRevs[4:]},
+		{10, allRevs[3:]},
+		{9, allRevs[3:]},
+		{8, allRevs[2:]},
+		{7, allRevs[2:]},
+		{6, allRevs[1:]},
+		{5, allRevs[1:]},
+		{4, allRevs},
+		{3, allRevs},
+		{2, allRevs},
+		{1, allRevs},
+		{0, allRevs},
+	}
+
+	for i, tt := range tests {
+		revs := ki.since(tt.rev)
+		if !reflect.DeepEqual(revs, tt.wrevs) {
+			t.Errorf("#%d: revs = %+v, want %+v", i, revs, tt.wrevs)
 		}
 		}
 	}
 	}
 }
 }
@@ -162,10 +216,11 @@ func TestKeyIndexCompact(t *testing.T) {
 			1,
 			1,
 			&keyIndex{
 			&keyIndex{
 				key:      []byte("foo"),
 				key:      []byte("foo"),
-				modified: revision{12, 0},
+				modified: revision{16, 0},
 				generations: []generation{
 				generations: []generation{
 					{created: revision{2, 0}, ver: 3, revs: []revision{{main: 2}, {main: 4}, {main: 6}}},
 					{created: revision{2, 0}, ver: 3, revs: []revision{{main: 2}, {main: 4}, {main: 6}}},
 					{created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}},
 					{created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}},
+					{created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}},
 					{},
 					{},
 				},
 				},
 			},
 			},
@@ -175,10 +230,11 @@ func TestKeyIndexCompact(t *testing.T) {
 			2,
 			2,
 			&keyIndex{
 			&keyIndex{
 				key:      []byte("foo"),
 				key:      []byte("foo"),
-				modified: revision{12, 0},
+				modified: revision{16, 0},
 				generations: []generation{
 				generations: []generation{
 					{created: revision{2, 0}, ver: 3, revs: []revision{{main: 2}, {main: 4}, {main: 6}}},
 					{created: revision{2, 0}, ver: 3, revs: []revision{{main: 2}, {main: 4}, {main: 6}}},
 					{created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}},
 					{created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}},
+					{created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}},
 					{},
 					{},
 				},
 				},
 			},
 			},
@@ -190,10 +246,11 @@ func TestKeyIndexCompact(t *testing.T) {
 			3,
 			3,
 			&keyIndex{
 			&keyIndex{
 				key:      []byte("foo"),
 				key:      []byte("foo"),
-				modified: revision{12, 0},
+				modified: revision{16, 0},
 				generations: []generation{
 				generations: []generation{
 					{created: revision{2, 0}, ver: 3, revs: []revision{{main: 2}, {main: 4}, {main: 6}}},
 					{created: revision{2, 0}, ver: 3, revs: []revision{{main: 2}, {main: 4}, {main: 6}}},
 					{created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}},
 					{created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}},
+					{created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}},
 					{},
 					{},
 				},
 				},
 			},
 			},
@@ -205,10 +262,11 @@ func TestKeyIndexCompact(t *testing.T) {
 			4,
 			4,
 			&keyIndex{
 			&keyIndex{
 				key:      []byte("foo"),
 				key:      []byte("foo"),
-				modified: revision{12, 0},
+				modified: revision{16, 0},
 				generations: []generation{
 				generations: []generation{
 					{created: revision{2, 0}, ver: 3, revs: []revision{{main: 4}, {main: 6}}},
 					{created: revision{2, 0}, ver: 3, revs: []revision{{main: 4}, {main: 6}}},
 					{created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}},
 					{created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}},
+					{created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}},
 					{},
 					{},
 				},
 				},
 			},
 			},
@@ -220,10 +278,11 @@ func TestKeyIndexCompact(t *testing.T) {
 			5,
 			5,
 			&keyIndex{
 			&keyIndex{
 				key:      []byte("foo"),
 				key:      []byte("foo"),
-				modified: revision{12, 0},
+				modified: revision{16, 0},
 				generations: []generation{
 				generations: []generation{
 					{created: revision{2, 0}, ver: 3, revs: []revision{{main: 4}, {main: 6}}},
 					{created: revision{2, 0}, ver: 3, revs: []revision{{main: 4}, {main: 6}}},
 					{created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}},
 					{created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}},
+					{created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}},
 					{},
 					{},
 				},
 				},
 			},
 			},
@@ -235,9 +294,10 @@ func TestKeyIndexCompact(t *testing.T) {
 			6,
 			6,
 			&keyIndex{
 			&keyIndex{
 				key:      []byte("foo"),
 				key:      []byte("foo"),
-				modified: revision{12, 0},
+				modified: revision{16, 0},
 				generations: []generation{
 				generations: []generation{
 					{created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}},
 					{created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}},
+					{created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}},
 					{},
 					{},
 				},
 				},
 			},
 			},
@@ -247,9 +307,10 @@ func TestKeyIndexCompact(t *testing.T) {
 			7,
 			7,
 			&keyIndex{
 			&keyIndex{
 				key:      []byte("foo"),
 				key:      []byte("foo"),
-				modified: revision{12, 0},
+				modified: revision{16, 0},
 				generations: []generation{
 				generations: []generation{
 					{created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}},
 					{created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}},
+					{created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}},
 					{},
 					{},
 				},
 				},
 			},
 			},
@@ -259,9 +320,10 @@ func TestKeyIndexCompact(t *testing.T) {
 			8,
 			8,
 			&keyIndex{
 			&keyIndex{
 				key:      []byte("foo"),
 				key:      []byte("foo"),
-				modified: revision{12, 0},
+				modified: revision{16, 0},
 				generations: []generation{
 				generations: []generation{
 					{created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}},
 					{created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}},
+					{created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}},
 					{},
 					{},
 				},
 				},
 			},
 			},
@@ -273,9 +335,10 @@ func TestKeyIndexCompact(t *testing.T) {
 			9,
 			9,
 			&keyIndex{
 			&keyIndex{
 				key:      []byte("foo"),
 				key:      []byte("foo"),
-				modified: revision{12, 0},
+				modified: revision{16, 0},
 				generations: []generation{
 				generations: []generation{
 					{created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}},
 					{created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}},
+					{created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}},
 					{},
 					{},
 				},
 				},
 			},
 			},
@@ -287,9 +350,10 @@ func TestKeyIndexCompact(t *testing.T) {
 			10,
 			10,
 			&keyIndex{
 			&keyIndex{
 				key:      []byte("foo"),
 				key:      []byte("foo"),
-				modified: revision{12, 0},
+				modified: revision{16, 0},
 				generations: []generation{
 				generations: []generation{
 					{created: revision{8, 0}, ver: 3, revs: []revision{{main: 10}, {main: 12}}},
 					{created: revision{8, 0}, ver: 3, revs: []revision{{main: 10}, {main: 12}}},
+					{created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}},
 					{},
 					{},
 				},
 				},
 			},
 			},
@@ -301,9 +365,10 @@ func TestKeyIndexCompact(t *testing.T) {
 			11,
 			11,
 			&keyIndex{
 			&keyIndex{
 				key:      []byte("foo"),
 				key:      []byte("foo"),
-				modified: revision{12, 0},
+				modified: revision{16, 0},
 				generations: []generation{
 				generations: []generation{
 					{created: revision{8, 0}, ver: 3, revs: []revision{{main: 10}, {main: 12}}},
 					{created: revision{8, 0}, ver: 3, revs: []revision{{main: 10}, {main: 12}}},
+					{created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}},
 					{},
 					{},
 				},
 				},
 			},
 			},
@@ -314,9 +379,63 @@ func TestKeyIndexCompact(t *testing.T) {
 		{
 		{
 			12,
 			12,
 			&keyIndex{
 			&keyIndex{
-				key:         []byte("foo"),
-				modified:    revision{12, 0},
-				generations: []generation{{}},
+				key:      []byte("foo"),
+				modified: revision{16, 0},
+				generations: []generation{
+					{created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}},
+					{},
+				},
+			},
+			map[revision]struct{}{},
+		},
+		{
+			13,
+			&keyIndex{
+				key:      []byte("foo"),
+				modified: revision{16, 0},
+				generations: []generation{
+					{created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}},
+					{},
+				},
+			},
+			map[revision]struct{}{},
+		},
+		{
+			14,
+			&keyIndex{
+				key:      []byte("foo"),
+				modified: revision{16, 0},
+				generations: []generation{
+					{created: revision{14, 0}, ver: 3, revs: []revision{{main: 14, sub: 1}, {main: 16}}},
+					{},
+				},
+			},
+			map[revision]struct{}{
+				revision{main: 14, sub: 1}: {},
+			},
+		},
+		{
+			15,
+			&keyIndex{
+				key:      []byte("foo"),
+				modified: revision{16, 0},
+				generations: []generation{
+					{created: revision{14, 0}, ver: 3, revs: []revision{{main: 14, sub: 1}, {main: 16}}},
+					{},
+				},
+			},
+			map[revision]struct{}{
+				revision{main: 14, sub: 1}: {},
+			},
+		},
+		{
+			16,
+			&keyIndex{
+				key:      []byte("foo"),
+				modified: revision{16, 0},
+				generations: []generation{
+					{},
+				},
 			},
 			},
 			map[revision]struct{}{},
 			map[revision]struct{}{},
 		},
 		},
@@ -513,12 +632,13 @@ func TestGenerationWalk(t *testing.T) {
 }
 }
 
 
 func newTestKeyIndex() *keyIndex {
 func newTestKeyIndex() *keyIndex {
-	// key:     "foo"
-	// rev: 12
+	// key: "foo"
+	// rev: 16
 	// generations:
 	// generations:
 	//    {empty}
 	//    {empty}
-	//    {8[1], 10[2], 12(t)[3]}
-	//    {2[1], 4[2], 6(t)[3]}
+	//    {{14, 0}[1], {14, 1}[2], {16, 0}(t)[3]}
+	//    {{8, 0}[1], {10, 0}[2], {12, 0}(t)[3]}
+	//    {{2, 0}[1], {4, 0}[2], {6, 0}(t)[3]}
 
 
 	ki := &keyIndex{key: []byte("foo")}
 	ki := &keyIndex{key: []byte("foo")}
 	ki.put(2, 0)
 	ki.put(2, 0)
@@ -527,5 +647,8 @@ func newTestKeyIndex() *keyIndex {
 	ki.put(8, 0)
 	ki.put(8, 0)
 	ki.put(10, 0)
 	ki.put(10, 0)
 	ki.tombstone(12, 0)
 	ki.tombstone(12, 0)
+	ki.put(14, 0)
+	ki.put(14, 1)
+	ki.tombstone(16, 0)
 	return ki
 	return ki
 }
 }

+ 19 - 5
storage/kvstore.go

@@ -193,20 +193,34 @@ func (s *store) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err
 	return n, rev, nil
 	return n, rev, nil
 }
 }
 
 
-// RangeEvents gets the events from key to end at or after rangeRev.
-// If rangeRev <=0, rangeEvents returns events from the beginning of the history.
+// RangeEvents gets the events from key to end in [startRev, endRev).
 // If `end` is nil, the request only observes the events on key.
 // If `end` is nil, the request only observes the events on key.
 // If `end` is not nil, it observes the events on key range [key, range_end).
 // If `end` is not nil, it observes the events on key range [key, range_end).
 // Limit limits the number of events returned.
 // Limit limits the number of events returned.
-// If the required rev is compacted, ErrCompacted will be returned.
+// If startRev <=0, rangeEvents returns events from the beginning of uncompacted history.
+// If endRev <=0, it indicates there is no end revision.
+//
+// If the required start rev is compacted, ErrCompacted will be returned.
+// If the required start rev has not happened, ErrFutureRev will be returned.
+//
+// RangeEvents returns events that satisfy the requirement (0 <= n <= limit).
+// If events in the revision range have not all happened, it returns immeidately
+// what is available.
+// It also returns nextRev which indicates the start revision used for the following
+// RangeEvents call. The nextRev could be smaller than the given endRev if the store
+// has not progressed so far or it hits the event limit.
+//
 // TODO: return byte slices instead of events to avoid meaningless encode and decode.
 // TODO: return byte slices instead of events to avoid meaningless encode and decode.
 func (s *store) RangeEvents(key, end []byte, limit, startRev, endRev int64) (evs []storagepb.Event, nextRev int64, err error) {
 func (s *store) RangeEvents(key, end []byte, limit, startRev, endRev int64) (evs []storagepb.Event, nextRev int64, err error) {
 	s.mu.Lock()
 	s.mu.Lock()
 	defer s.mu.Unlock()
 	defer s.mu.Unlock()
 
 
-	if startRev <= s.compactMainRev {
+	if startRev > 0 && startRev <= s.compactMainRev {
 		return nil, 0, ErrCompacted
 		return nil, 0, ErrCompacted
 	}
 	}
+	if startRev > s.currentRev.main {
+		return nil, 0, ErrFutureRev
+	}
 
 
 	revs := s.kvindex.RangeEvents(key, end, startRev)
 	revs := s.kvindex.RangeEvents(key, end, startRev)
 	if len(revs) == 0 {
 	if len(revs) == 0 {
@@ -218,7 +232,7 @@ func (s *store) RangeEvents(key, end []byte, limit, startRev, endRev int64) (evs
 	defer tx.Unlock()
 	defer tx.Unlock()
 	// fetch events from the backend using revisions
 	// fetch events from the backend using revisions
 	for _, rev := range revs {
 	for _, rev := range revs {
-		if rev.main >= endRev {
+		if endRev > 0 && rev.main >= endRev {
 			return evs, rev.main, nil
 			return evs, rev.main, nil
 		}
 		}
 		revbytes := newRevBytes()
 		revbytes := newRevBytes()

+ 280 - 7
storage/kvstore_test.go

@@ -258,6 +258,68 @@ func TestStoreDeleteRange(t *testing.T) {
 	}
 	}
 }
 }
 
 
+func TestStoreRangeEvents(t *testing.T) {
+	ev := storagepb.Event{
+		Type: storagepb.PUT,
+		Kv: &storagepb.KeyValue{
+			Key:            []byte("foo"),
+			Value:          []byte("bar"),
+			CreateRevision: 1,
+			ModRevision:    2,
+			Version:        1,
+		},
+	}
+	evb, err := ev.Marshal()
+	if err != nil {
+		t.Fatal(err)
+	}
+	currev := revision{2, 0}
+
+	tests := []struct {
+		idxr indexRangeEventsResp
+		r    rangeResp
+	}{
+		{
+			indexRangeEventsResp{[]revision{{2, 0}}},
+			rangeResp{[][]byte{newTestBytes(revision{2, 0})}, [][]byte{evb}},
+		},
+		{
+			indexRangeEventsResp{[]revision{{2, 0}, {3, 0}}},
+			rangeResp{[][]byte{newTestBytes(revision{2, 0})}, [][]byte{evb}},
+		},
+	}
+	for i, tt := range tests {
+		s, b, index := newFakeStore()
+		s.currentRev = currev
+		index.indexRangeEventsRespc <- tt.idxr
+		b.tx.rangeRespc <- tt.r
+
+		evs, _, err := s.RangeEvents([]byte("foo"), []byte("goo"), 1, 1, 4)
+		if err != nil {
+			t.Errorf("#%d: err = %v, want nil", i, err)
+		}
+		if w := []storagepb.Event{ev}; !reflect.DeepEqual(evs, w) {
+			t.Errorf("#%d: evs = %+v, want %+v", i, evs, w)
+		}
+
+		wact := []testutil.Action{
+			{"rangeEvents", []interface{}{[]byte("foo"), []byte("goo"), int64(1)}},
+		}
+		if g := index.Action(); !reflect.DeepEqual(g, wact) {
+			t.Errorf("#%d: index action = %+v, want %+v", i, g, wact)
+		}
+		wact = []testutil.Action{
+			{"range", []interface{}{keyBucketName, newTestBytes(tt.idxr.revs[0]), []byte(nil), int64(0)}},
+		}
+		if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
+			t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact)
+		}
+		if s.currentRev != currev {
+			t.Errorf("#%d: current rev = %+v, want %+v", i, s.currentRev, currev)
+		}
+	}
+}
+
 func TestStoreCompact(t *testing.T) {
 func TestStoreCompact(t *testing.T) {
 	s, b, index := newFakeStore()
 	s, b, index := newFakeStore()
 	s.currentRev = revision{3, 0}
 	s.currentRev = revision{3, 0}
@@ -346,6 +408,209 @@ func TestStoreRestore(t *testing.T) {
 	}
 	}
 }
 }
 
 
+// tests end parameter works well
+func TestStoreRangeEventsEnd(t *testing.T) {
+	s := newStore(tmpPath)
+	defer cleanup(s, tmpPath)
+
+	s.Put([]byte("foo"), []byte("bar"))
+	s.Put([]byte("foo1"), []byte("bar1"))
+	s.Put([]byte("foo2"), []byte("bar2"))
+	evs := []storagepb.Event{
+		{
+			Type: storagepb.PUT,
+			Kv:   &storagepb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1},
+		},
+		{
+			Type: storagepb.PUT,
+			Kv:   &storagepb.KeyValue{Key: []byte("foo1"), Value: []byte("bar1"), CreateRevision: 2, ModRevision: 2, Version: 1},
+		},
+		{
+			Type: storagepb.PUT,
+			Kv:   &storagepb.KeyValue{Key: []byte("foo2"), Value: []byte("bar2"), CreateRevision: 3, ModRevision: 3, Version: 1},
+		},
+	}
+
+	tests := []struct {
+		key, end []byte
+		wevs     []storagepb.Event
+	}{
+		// get no keys
+		{
+			[]byte("doo"), []byte("foo"),
+			nil,
+		},
+		// get no keys when key == end
+		{
+			[]byte("foo"), []byte("foo"),
+			nil,
+		},
+		// get no keys when ranging single key
+		{
+			[]byte("doo"), nil,
+			nil,
+		},
+		// get all keys
+		{
+			[]byte("foo"), []byte("foo3"),
+			evs,
+		},
+		// get partial keys
+		{
+			[]byte("foo"), []byte("foo1"),
+			evs[:1],
+		},
+		// get single key
+		{
+			[]byte("foo"), nil,
+			evs[:1],
+		},
+	}
+
+	for i, tt := range tests {
+		evs, rev, err := s.RangeEvents(tt.key, tt.end, 0, 1, 100)
+		if err != nil {
+			t.Fatal(err)
+		}
+		if rev != 4 {
+			t.Errorf("#%d: rev = %d, want %d", i, rev, 4)
+		}
+		if !reflect.DeepEqual(evs, tt.wevs) {
+			t.Errorf("#%d: evs = %+v, want %+v", i, evs, tt.wevs)
+		}
+	}
+}
+
+func TestStoreRangeEventsRev(t *testing.T) {
+	s := newStore(tmpPath)
+	defer cleanup(s, tmpPath)
+
+	s.Put([]byte("foo"), []byte("bar"))
+	s.DeleteRange([]byte("foo"), nil)
+	s.Put([]byte("foo"), []byte("bar"))
+	s.Put([]byte("unrelated"), []byte("unrelated"))
+	evs := []storagepb.Event{
+		{
+			Type: storagepb.PUT,
+			Kv:   &storagepb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1},
+		},
+		{
+			Type: storagepb.DELETE,
+			Kv:   &storagepb.KeyValue{Key: []byte("foo")},
+		},
+		{
+			Type: storagepb.PUT,
+			Kv:   &storagepb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 3, ModRevision: 3, Version: 1},
+		},
+	}
+
+	tests := []struct {
+		start int64
+		end   int64
+		wevs  []storagepb.Event
+		wnext int64
+	}{
+		{1, 1, nil, 1},
+		{1, 2, evs[:1], 2},
+		{1, 3, evs[:2], 3},
+		{1, 4, evs, 5},
+		{1, 5, evs, 5},
+		{1, 10, evs, 5},
+		{3, 4, evs[2:], 5},
+		{0, 10, evs, 5},
+		{1, 0, evs, 5},
+		{0, 0, evs, 5},
+	}
+
+	for i, tt := range tests {
+		evs, next, err := s.RangeEvents([]byte("foo"), nil, 0, tt.start, tt.end)
+		if err != nil {
+			t.Fatal(err)
+		}
+		if !reflect.DeepEqual(evs, tt.wevs) {
+			t.Errorf("#%d: evs = %+v, want %+v", i, evs, tt.wevs)
+		}
+		if next != tt.wnext {
+			t.Errorf("#%d: next = %d, want %d", i, next, tt.wnext)
+		}
+	}
+}
+
+func TestStoreRangeEventsBad(t *testing.T) {
+	s := newStore(tmpPath)
+	defer cleanup(s, tmpPath)
+
+	s.Put([]byte("foo"), []byte("bar"))
+	s.Put([]byte("foo"), []byte("bar1"))
+	s.Put([]byte("foo"), []byte("bar2"))
+	if err := s.Compact(3); err != nil {
+		t.Fatalf("compact error (%v)", err)
+	}
+
+	tests := []struct {
+		rev  int64
+		werr error
+	}{
+		{1, ErrCompacted},
+		{2, ErrCompacted},
+		{3, ErrCompacted},
+		{4, ErrFutureRev},
+		{10, ErrFutureRev},
+	}
+	for i, tt := range tests {
+		_, _, err := s.RangeEvents([]byte("foo"), nil, 0, tt.rev, 100)
+		if err != tt.werr {
+			t.Errorf("#%d: error = %v, want %v", i, err, tt.werr)
+		}
+	}
+}
+
+func TestStoreRangeEventsLimit(t *testing.T) {
+	s := newStore(tmpPath)
+	defer cleanup(s, tmpPath)
+
+	s.Put([]byte("foo"), []byte("bar"))
+	s.DeleteRange([]byte("foo"), nil)
+	s.Put([]byte("foo"), []byte("bar"))
+	evs := []storagepb.Event{
+		{
+			Type: storagepb.PUT,
+			Kv:   &storagepb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1},
+		},
+		{
+			Type: storagepb.DELETE,
+			Kv:   &storagepb.KeyValue{Key: []byte("foo")},
+		},
+		{
+			Type: storagepb.PUT,
+			Kv:   &storagepb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 3, ModRevision: 3, Version: 1},
+		},
+	}
+
+	tests := []struct {
+		limit int64
+		wevs  []storagepb.Event
+	}{
+		// no limit
+		{-1, evs},
+		// no limit
+		{0, evs},
+		{1, evs[:1]},
+		{2, evs[:2]},
+		{3, evs},
+		{100, evs},
+	}
+	for i, tt := range tests {
+		evs, _, err := s.RangeEvents([]byte("foo"), nil, tt.limit, 1, 100)
+		if err != nil {
+			t.Fatalf("#%d: range error (%v)", i, err)
+		}
+		if !reflect.DeepEqual(evs, tt.wevs) {
+			t.Errorf("#%d: evs = %+v, want %+v", i, evs, tt.wevs)
+		}
+	}
+}
+
 func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
 func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
 	s0 := newStore(tmpPath)
 	s0 := newStore(tmpPath)
 	defer os.Remove(tmpPath)
 	defer os.Remove(tmpPath)
@@ -412,9 +677,10 @@ func newTestBytes(rev revision) []byte {
 func newFakeStore() (*store, *fakeBackend, *fakeIndex) {
 func newFakeStore() (*store, *fakeBackend, *fakeIndex) {
 	b := &fakeBackend{&fakeBatchTx{rangeRespc: make(chan rangeResp, 5)}}
 	b := &fakeBackend{&fakeBatchTx{rangeRespc: make(chan rangeResp, 5)}}
 	index := &fakeIndex{
 	index := &fakeIndex{
-		indexGetRespc:     make(chan indexGetResp, 1),
-		indexRangeRespc:   make(chan indexRangeResp, 1),
-		indexCompactRespc: make(chan map[revision]struct{}, 1),
+		indexGetRespc:         make(chan indexGetResp, 1),
+		indexRangeRespc:       make(chan indexRangeResp, 1),
+		indexRangeEventsRespc: make(chan indexRangeEventsResp, 1),
+		indexCompactRespc:     make(chan map[revision]struct{}, 1),
 	}
 	}
 	return &store{
 	return &store{
 		b:              b,
 		b:              b,
@@ -472,11 +738,16 @@ type indexRangeResp struct {
 	revs []revision
 	revs []revision
 }
 }
 
 
+type indexRangeEventsResp struct {
+	revs []revision
+}
+
 type fakeIndex struct {
 type fakeIndex struct {
 	testutil.Recorder
 	testutil.Recorder
-	indexGetRespc     chan indexGetResp
-	indexRangeRespc   chan indexRangeResp
-	indexCompactRespc chan map[revision]struct{}
+	indexGetRespc         chan indexGetResp
+	indexRangeRespc       chan indexRangeResp
+	indexRangeEventsRespc chan indexRangeEventsResp
+	indexCompactRespc     chan map[revision]struct{}
 }
 }
 
 
 func (i *fakeIndex) Get(key []byte, atRev int64) (rev, created revision, ver int64, err error) {
 func (i *fakeIndex) Get(key []byte, atRev int64) (rev, created revision, ver int64, err error) {
@@ -500,7 +771,9 @@ func (i *fakeIndex) Tombstone(key []byte, rev revision) error {
 	return nil
 	return nil
 }
 }
 func (i *fakeIndex) RangeEvents(key, end []byte, rev int64) []revision {
 func (i *fakeIndex) RangeEvents(key, end []byte, rev int64) []revision {
-	return nil
+	i.Recorder.Record(testutil.Action{Name: "rangeEvents", Params: []interface{}{key, end, rev}})
+	r := <-i.indexRangeEventsRespc
+	return r.revs
 }
 }
 func (i *fakeIndex) Compact(rev int64) map[revision]struct{} {
 func (i *fakeIndex) Compact(rev int64) map[revision]struct{} {
 	i.Recorder.Record(testutil.Action{Name: "compact", Params: []interface{}{rev}})
 	i.Recorder.Record(testutil.Action{Name: "compact", Params: []interface{}{rev}})