Browse Source

Merge pull request #3851 from yichengq/storage-kv-data

storage: save the KeyValue instead of Event in backend
Xiang Li 10 years ago
parent
commit
cf2d20c5c9
6 changed files with 270 additions and 225 deletions
  1. 3 3
      storage/index.go
  2. 2 2
      storage/index_test.go
  3. 77 51
      storage/kvstore.go
  4. 168 165
      storage/kvstore_test.go
  5. 6 1
      storage/revision.go
  6. 14 3
      storage/watchable_store.go

+ 3 - 3
storage/index.go

@@ -28,7 +28,7 @@ type index interface {
 	Put(key []byte, rev revision)
 	Restore(key []byte, created, modified revision, ver int64)
 	Tombstone(key []byte, rev revision) error
-	RangeEvents(key, end []byte, rev int64) []revision
+	RangeSince(key, end []byte, rev int64) []revision
 	Compact(rev int64) map[revision]struct{}
 	Equal(b index) bool
 }
@@ -134,10 +134,10 @@ func (ti *treeIndex) Tombstone(key []byte, rev revision) error {
 	return ki.tombstone(rev.main, rev.sub)
 }
 
-// RangeEvents returns all revisions from key(including) to end(excluding)
+// RangeSince returns all revisions from key(including) to end(excluding)
 // at or after the given rev. The returned slice is sorted in the order
 // of revision.
-func (ti *treeIndex) RangeEvents(key, end []byte, rev int64) []revision {
+func (ti *treeIndex) RangeSince(key, end []byte, rev int64) []revision {
 	ti.RLock()
 	defer ti.RUnlock()
 

+ 2 - 2
storage/index_test.go

@@ -136,7 +136,7 @@ func TestIndexTombstone(t *testing.T) {
 	}
 }
 
-func TestIndexRangeEvents(t *testing.T) {
+func TestIndexRangeSince(t *testing.T) {
 	allKeys := [][]byte{[]byte("foo"), []byte("foo1"), []byte("foo2"), []byte("foo2"), []byte("foo1"), []byte("foo")}
 	allRevs := []revision{{main: 1}, {main: 2}, {main: 3}, {main: 4}, {main: 5}, {main: 6}}
 
@@ -184,7 +184,7 @@ func TestIndexRangeEvents(t *testing.T) {
 		},
 	}
 	for i, tt := range tests {
-		revs := index.RangeEvents(tt.key, tt.end, atRev)
+		revs := index.RangeSince(tt.key, tt.end, atRev)
 		if !reflect.DeepEqual(revs, tt.wrevs) {
 			t.Errorf("#%d: revs = %+v, want %+v", i, revs, tt.wrevs)
 		}

+ 77 - 51
storage/kvstore.go

@@ -32,6 +32,13 @@ var (
 	keyBucketName  = []byte("key")
 	metaBucketName = []byte("meta")
 
+	// markedRevBytesLen is the byte length of marked revision.
+	// The first `revBytesLen` bytes represents a normal revision. The last
+	// one byte is the mark.
+	markedRevBytesLen      = revBytesLen + 1
+	markBytePosition       = markedRevBytesLen - 1
+	markTombstone     byte = 't'
+
 	scheduledCompactKeyName = []byte("scheduledCompactRev")
 	finishedCompactKeyName  = []byte("finishedCompactRev")
 
@@ -193,7 +200,7 @@ func (s *store) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err
 	return n, rev, nil
 }
 
-// RangeEvents gets the events from key to end starting from startRev.
+// RangeHistory ranges the history from key to end starting from startRev.
 // If `end` is nil, the request only observes the events on key.
 // If `end` is not nil, it observes the events on key range [key, range_end).
 // Limit limits the number of events returned.
@@ -202,28 +209,29 @@ func (s *store) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err
 // If the required start rev is compacted, ErrCompacted will be returned.
 // If the required start rev has not happened, ErrFutureRev will be returned.
 //
-// RangeEvents returns events that satisfy the requirement (0 <= n <= limit).
-// If events in the revision range have not all happened, it returns immeidately
+// RangeHistory returns revision bytes slice and key-values that satisfy the requirement (0 <= n <= limit).
+// If history in the revision range has not all happened, it returns immeidately
 // what is available.
 // It also returns nextRev which indicates the start revision used for the following
 // RangeEvents call. The nextRev could be smaller than the given endRev if the store
 // has not progressed so far or it hits the event limit.
 //
-// TODO: return byte slices instead of events to avoid meaningless encode and decode.
-func (s *store) RangeEvents(key, end []byte, limit, startRev int64) (evs []storagepb.Event, nextRev int64, err error) {
+// TODO: return byte slices instead of keyValues to avoid meaningless encode and decode.
+// This also helps to return raw (key, val) pair directly to make API consistent.
+func (s *store) RangeHistory(key, end []byte, limit, startRev int64) (revbs [][]byte, kvs []storagepb.KeyValue, nextRev int64, err error) {
 	s.mu.Lock()
 	defer s.mu.Unlock()
 
 	if startRev > 0 && startRev <= s.compactMainRev {
-		return nil, 0, ErrCompacted
+		return nil, nil, 0, ErrCompacted
 	}
 	if startRev > s.currentRev.main {
-		return nil, 0, ErrFutureRev
+		return nil, nil, 0, ErrFutureRev
 	}
 
-	revs := s.kvindex.RangeEvents(key, end, startRev)
+	revs := s.kvindex.RangeSince(key, end, startRev)
 	if len(revs) == 0 {
-		return nil, s.currentRev.main + 1, nil
+		return nil, nil, s.currentRev.main + 1, nil
 	}
 
 	tx := s.b.BatchTx()
@@ -231,24 +239,24 @@ func (s *store) RangeEvents(key, end []byte, limit, startRev int64) (evs []stora
 	defer tx.Unlock()
 	// fetch events from the backend using revisions
 	for _, rev := range revs {
-		revbytes := newRevBytes()
-		revToBytes(rev, revbytes)
+		start, end := revBytesRange(rev)
 
-		_, vs := tx.UnsafeRange(keyBucketName, revbytes, nil, 0)
+		ks, vs := tx.UnsafeRange(keyBucketName, start, end, 0)
 		if len(vs) != 1 {
 			log.Fatalf("storage: range cannot find rev (%d,%d)", rev.main, rev.sub)
 		}
 
-		e := storagepb.Event{}
-		if err := e.Unmarshal(vs[0]); err != nil {
+		var kv storagepb.KeyValue
+		if err := kv.Unmarshal(vs[0]); err != nil {
 			log.Fatalf("storage: cannot unmarshal event: %v", err)
 		}
-		evs = append(evs, e)
-		if limit > 0 && len(evs) >= int(limit) {
-			return evs, rev.main + 1, nil
+		revbs = append(revbs, ks[0])
+		kvs = append(kvs, kv)
+		if limit > 0 && len(kvs) >= int(limit) {
+			return revbs, kvs, rev.main + 1, nil
 		}
 	}
-	return evs, s.currentRev.main + 1, nil
+	return revbs, kvs, s.currentRev.main + 1, nil
 }
 
 func (s *store) Compact(rev int64) error {
@@ -316,21 +324,19 @@ func (s *store) Restore() error {
 	// TODO: limit N to reduce max memory usage
 	keys, vals := tx.UnsafeRange(keyBucketName, min, max, 0)
 	for i, key := range keys {
-		e := &storagepb.Event{}
-		if err := e.Unmarshal(vals[i]); err != nil {
+		var kv storagepb.KeyValue
+		if err := kv.Unmarshal(vals[i]); err != nil {
 			log.Fatalf("storage: cannot unmarshal event: %v", err)
 		}
 
-		rev := bytesToRev(key)
+		rev := bytesToRev(key[:revBytesLen])
 
 		// restore index
-		switch e.Type {
-		case storagepb.PUT:
-			s.kvindex.Restore(e.Kv.Key, revision{e.Kv.CreateRevision, 0}, rev, e.Kv.Version)
-		case storagepb.DELETE:
-			s.kvindex.Tombstone(e.Kv.Key, rev)
+		switch {
+		case isTombstone(key):
+			s.kvindex.Tombstone(kv.Key, rev)
 		default:
-			log.Panicf("storage: unexpected event type %s", e.Type)
+			s.kvindex.Restore(kv.Key, revision{kv.CreateRevision, 0}, rev, kv.Version)
 		}
 
 		// update revision
@@ -392,19 +398,18 @@ func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64) (kvs []storage
 	}
 
 	for _, revpair := range revpairs {
-		revbytes := newRevBytes()
-		revToBytes(revpair, revbytes)
+		start, end := revBytesRange(revpair)
 
-		_, vs := s.tx.UnsafeRange(keyBucketName, revbytes, nil, 0)
+		_, vs := s.tx.UnsafeRange(keyBucketName, start, end, 0)
 		if len(vs) != 1 {
 			log.Fatalf("storage: range cannot find rev (%d,%d)", revpair.main, revpair.sub)
 		}
 
-		e := &storagepb.Event{}
-		if err := e.Unmarshal(vs[0]); err != nil {
+		var kv storagepb.KeyValue
+		if err := kv.Unmarshal(vs[0]); err != nil {
 			log.Fatalf("storage: cannot unmarshal event: %v", err)
 		}
-		kvs = append(kvs, *e.Kv)
+		kvs = append(kvs, kv)
 		if limit > 0 && len(kvs) >= int(limit) {
 			break
 		}
@@ -426,18 +431,15 @@ func (s *store) put(key, value []byte) {
 	revToBytes(revision{main: rev, sub: s.currentRev.sub}, ibytes)
 
 	ver = ver + 1
-	event := storagepb.Event{
-		Type: storagepb.PUT,
-		Kv: &storagepb.KeyValue{
-			Key:            key,
-			Value:          value,
-			CreateRevision: c,
-			ModRevision:    rev,
-			Version:        ver,
-		},
-	}
-
-	d, err := event.Marshal()
+	kv := storagepb.KeyValue{
+		Key:            key,
+		Value:          value,
+		CreateRevision: c,
+		ModRevision:    rev,
+		Version:        ver,
+	}
+
+	d, err := kv.Marshal()
 	if err != nil {
 		log.Fatalf("storage: cannot marshal event: %v", err)
 	}
@@ -469,15 +471,13 @@ func (s *store) delete(key []byte) {
 
 	ibytes := newRevBytes()
 	revToBytes(revision{main: mainrev, sub: s.currentRev.sub}, ibytes)
+	ibytes = appendMarkTombstone(ibytes)
 
-	event := storagepb.Event{
-		Type: storagepb.DELETE,
-		Kv: &storagepb.KeyValue{
-			Key: key,
-		},
+	kv := storagepb.KeyValue{
+		Key: key,
 	}
 
-	d, err := event.Marshal()
+	d, err := kv.Marshal()
 	if err != nil {
 		log.Fatalf("storage: cannot marshal event: %v", err)
 	}
@@ -489,3 +489,29 @@ func (s *store) delete(key []byte) {
 	}
 	s.currentRev.sub += 1
 }
+
+// appendMarkTombstone appends tombstone mark to normal revision bytes.
+func appendMarkTombstone(b []byte) []byte {
+	if len(b) != revBytesLen {
+		log.Panicf("cannot append mark to non normal revision bytes")
+	}
+	return append(b, markTombstone)
+}
+
+// isTombstone checks whether the revision bytes is a tombstone.
+func isTombstone(b []byte) bool {
+	return len(b) == markedRevBytesLen && b[markBytePosition] == markTombstone
+}
+
+// revBytesRange returns the range of revision bytes at
+// the given revision.
+func revBytesRange(rev revision) (start, end []byte) {
+	start = newRevBytes()
+	revToBytes(rev, start)
+
+	end = newRevBytes()
+	endRev := revision{main: rev.main, sub: rev.sub + 1}
+	revToBytes(endRev, end)
+
+	return start, end
+}

+ 168 - 165
storage/kvstore_test.go

@@ -46,22 +46,21 @@ func TestStorePut(t *testing.T) {
 		r   indexGetResp
 
 		wrev    revision
-		wev     storagepb.Event
+		wkey    []byte
+		wkv     storagepb.KeyValue
 		wputrev revision
 	}{
 		{
 			revision{1, 0},
 			indexGetResp{revision{}, revision{}, 0, ErrRevisionNotFound},
 			revision{1, 1},
-			storagepb.Event{
-				Type: storagepb.PUT,
-				Kv: &storagepb.KeyValue{
-					Key:            []byte("foo"),
-					Value:          []byte("bar"),
-					CreateRevision: 2,
-					ModRevision:    2,
-					Version:        1,
-				},
+			newTestKeyBytes(revision{2, 0}, false),
+			storagepb.KeyValue{
+				Key:            []byte("foo"),
+				Value:          []byte("bar"),
+				CreateRevision: 2,
+				ModRevision:    2,
+				Version:        1,
 			},
 			revision{2, 0},
 		},
@@ -69,15 +68,13 @@ func TestStorePut(t *testing.T) {
 			revision{1, 1},
 			indexGetResp{revision{2, 0}, revision{2, 0}, 1, nil},
 			revision{1, 2},
-			storagepb.Event{
-				Type: storagepb.PUT,
-				Kv: &storagepb.KeyValue{
-					Key:            []byte("foo"),
-					Value:          []byte("bar"),
-					CreateRevision: 2,
-					ModRevision:    2,
-					Version:        2,
-				},
+			newTestKeyBytes(revision{2, 1}, false),
+			storagepb.KeyValue{
+				Key:            []byte("foo"),
+				Value:          []byte("bar"),
+				CreateRevision: 2,
+				ModRevision:    2,
+				Version:        2,
 			},
 			revision{2, 1},
 		},
@@ -85,15 +82,13 @@ func TestStorePut(t *testing.T) {
 			revision{2, 0},
 			indexGetResp{revision{2, 1}, revision{2, 0}, 2, nil},
 			revision{2, 1},
-			storagepb.Event{
-				Type: storagepb.PUT,
-				Kv: &storagepb.KeyValue{
-					Key:            []byte("foo"),
-					Value:          []byte("bar"),
-					CreateRevision: 2,
-					ModRevision:    3,
-					Version:        3,
-				},
+			newTestKeyBytes(revision{3, 0}, false),
+			storagepb.KeyValue{
+				Key:            []byte("foo"),
+				Value:          []byte("bar"),
+				CreateRevision: 2,
+				ModRevision:    3,
+				Version:        3,
 			},
 			revision{3, 0},
 		},
@@ -106,12 +101,12 @@ func TestStorePut(t *testing.T) {
 
 		s.put([]byte("foo"), []byte("bar"))
 
-		data, err := tt.wev.Marshal()
+		data, err := tt.wkv.Marshal()
 		if err != nil {
 			t.Errorf("#%d: marshal err = %v, want nil", i, err)
 		}
 		wact := []testutil.Action{
-			{"put", []interface{}{keyBucketName, newTestBytes(tt.wputrev), data}},
+			{"put", []interface{}{keyBucketName, tt.wkey, data}},
 		}
 		if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
 			t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact)
@@ -130,17 +125,15 @@ func TestStorePut(t *testing.T) {
 }
 
 func TestStoreRange(t *testing.T) {
-	ev := storagepb.Event{
-		Type: storagepb.PUT,
-		Kv: &storagepb.KeyValue{
-			Key:            []byte("foo"),
-			Value:          []byte("bar"),
-			CreateRevision: 1,
-			ModRevision:    2,
-			Version:        1,
-		},
-	}
-	evb, err := ev.Marshal()
+	key := newTestKeyBytes(revision{2, 0}, false)
+	kv := storagepb.KeyValue{
+		Key:            []byte("foo"),
+		Value:          []byte("bar"),
+		CreateRevision: 1,
+		ModRevision:    2,
+		Version:        1,
+	}
+	kvb, err := kv.Marshal()
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -153,11 +146,11 @@ func TestStoreRange(t *testing.T) {
 	}{
 		{
 			indexRangeResp{[][]byte{[]byte("foo")}, []revision{{2, 0}}},
-			rangeResp{[][]byte{newTestBytes(revision{2, 0})}, [][]byte{evb}},
+			rangeResp{[][]byte{key}, [][]byte{kvb}},
 		},
 		{
 			indexRangeResp{[][]byte{[]byte("foo"), []byte("foo1")}, []revision{{2, 0}, {3, 0}}},
-			rangeResp{[][]byte{newTestBytes(revision{2, 0})}, [][]byte{evb}},
+			rangeResp{[][]byte{key}, [][]byte{kvb}},
 		},
 	}
 	for i, tt := range tests {
@@ -171,15 +164,16 @@ func TestStoreRange(t *testing.T) {
 		if err != nil {
 			t.Errorf("#%d: err = %v, want nil", i, err)
 		}
-		if w := []storagepb.KeyValue{*ev.Kv}; !reflect.DeepEqual(kvs, w) {
+		if w := []storagepb.KeyValue{kv}; !reflect.DeepEqual(kvs, w) {
 			t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, w)
 		}
 		if rev != wrev {
 			t.Errorf("#%d: rev = %d, want %d", i, rev, wrev)
 		}
 
+		wstart, wend := revBytesRange(tt.idxr.revs[0])
 		wact := []testutil.Action{
-			{"range", []interface{}{keyBucketName, newTestBytes(tt.idxr.revs[0]), []byte(nil), int64(0)}},
+			{"range", []interface{}{keyBucketName, wstart, wend, int64(0)}},
 		}
 		if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
 			t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact)
@@ -201,6 +195,7 @@ func TestStoreDeleteRange(t *testing.T) {
 		rev revision
 		r   indexRangeResp
 
+		wkey    []byte
 		wrev    revision
 		wrrev   int64
 		wdelrev revision
@@ -208,6 +203,7 @@ func TestStoreDeleteRange(t *testing.T) {
 		{
 			revision{2, 0},
 			indexRangeResp{[][]byte{[]byte("foo")}, []revision{{2, 0}}},
+			newTestKeyBytes(revision{3, 0}, true),
 			revision{2, 1},
 			2,
 			revision{3, 0},
@@ -215,6 +211,7 @@ func TestStoreDeleteRange(t *testing.T) {
 		{
 			revision{2, 1},
 			indexRangeResp{[][]byte{[]byte("foo")}, []revision{{2, 0}}},
+			newTestKeyBytes(revision{3, 1}, true),
 			revision{2, 2},
 			3,
 			revision{3, 1},
@@ -231,17 +228,14 @@ func TestStoreDeleteRange(t *testing.T) {
 			t.Errorf("#%d: n = %d, want 1", i, n)
 		}
 
-		data, err := (&storagepb.Event{
-			Type: storagepb.DELETE,
-			Kv: &storagepb.KeyValue{
-				Key: []byte("foo"),
-			},
+		data, err := (&storagepb.KeyValue{
+			Key: []byte("foo"),
 		}).Marshal()
 		if err != nil {
 			t.Errorf("#%d: marshal err = %v, want nil", i, err)
 		}
 		wact := []testutil.Action{
-			{"put", []interface{}{keyBucketName, newTestBytes(tt.wdelrev), data}},
+			{"put", []interface{}{keyBucketName, tt.wkey, data}},
 		}
 		if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
 			t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact)
@@ -259,18 +253,16 @@ func TestStoreDeleteRange(t *testing.T) {
 	}
 }
 
-func TestStoreRangeEvents(t *testing.T) {
-	ev := storagepb.Event{
-		Type: storagepb.PUT,
-		Kv: &storagepb.KeyValue{
-			Key:            []byte("foo"),
-			Value:          []byte("bar"),
-			CreateRevision: 1,
-			ModRevision:    2,
-			Version:        1,
-		},
+func TestStoreRangeHistory(t *testing.T) {
+	key := newTestKeyBytes(revision{2, 0}, false)
+	kv := storagepb.KeyValue{
+		Key:            []byte("foo"),
+		Value:          []byte("bar"),
+		CreateRevision: 1,
+		ModRevision:    2,
+		Version:        1,
 	}
-	evb, err := ev.Marshal()
+	kvb, err := kv.Marshal()
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -282,11 +274,11 @@ func TestStoreRangeEvents(t *testing.T) {
 	}{
 		{
 			indexRangeEventsResp{[]revision{{2, 0}}},
-			rangeResp{[][]byte{newTestBytes(revision{2, 0})}, [][]byte{evb}},
+			rangeResp{[][]byte{key}, [][]byte{kvb}},
 		},
 		{
 			indexRangeEventsResp{[]revision{{2, 0}, {3, 0}}},
-			rangeResp{[][]byte{newTestBytes(revision{2, 0})}, [][]byte{evb}},
+			rangeResp{[][]byte{key}, [][]byte{kvb}},
 		},
 	}
 	for i, tt := range tests {
@@ -295,12 +287,15 @@ func TestStoreRangeEvents(t *testing.T) {
 		index.indexRangeEventsRespc <- tt.idxr
 		b.tx.rangeRespc <- tt.r
 
-		evs, _, err := s.RangeEvents([]byte("foo"), []byte("goo"), 1, 1)
+		keys, kvs, _, err := s.RangeHistory([]byte("foo"), []byte("goo"), 1, 1)
 		if err != nil {
 			t.Errorf("#%d: err = %v, want nil", i, err)
 		}
-		if w := []storagepb.Event{ev}; !reflect.DeepEqual(evs, w) {
-			t.Errorf("#%d: evs = %+v, want %+v", i, evs, w)
+		if w := [][]byte{key}; !reflect.DeepEqual(keys, w) {
+			t.Errorf("#%d: keys = %+v, want %+v", i, keys, w)
+		}
+		if w := []storagepb.KeyValue{kv}; !reflect.DeepEqual(kvs, w) {
+			t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, w)
 		}
 
 		wact := []testutil.Action{
@@ -309,8 +304,9 @@ func TestStoreRangeEvents(t *testing.T) {
 		if g := index.Action(); !reflect.DeepEqual(g, wact) {
 			t.Errorf("#%d: index action = %+v, want %+v", i, g, wact)
 		}
+		wstart, wend := revBytesRange(tt.idxr.revs[0])
 		wact = []testutil.Action{
-			{"range", []interface{}{keyBucketName, newTestBytes(tt.idxr.revs[0]), []byte(nil), int64(0)}},
+			{"range", []interface{}{keyBucketName, wstart, wend, int64(0)}},
 		}
 		if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
 			t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact)
@@ -325,7 +321,9 @@ func TestStoreCompact(t *testing.T) {
 	s, b, index := newFakeStore()
 	s.currentRev = revision{3, 0}
 	index.indexCompactRespc <- map[revision]struct{}{revision{1, 0}: {}}
-	b.tx.rangeRespc <- rangeResp{[][]byte{newTestBytes(revision{1, 0}), newTestBytes(revision{2, 0})}, nil}
+	key1 := newTestKeyBytes(revision{1, 0}, false)
+	key2 := newTestKeyBytes(revision{2, 0}, false)
+	b.tx.rangeRespc <- rangeResp{[][]byte{key1, key2}, nil}
 
 	s.Compact(3)
 	s.wg.Wait()
@@ -336,10 +334,10 @@ func TestStoreCompact(t *testing.T) {
 	end := make([]byte, 8)
 	binary.BigEndian.PutUint64(end, uint64(4))
 	wact := []testutil.Action{
-		{"put", []interface{}{metaBucketName, scheduledCompactKeyName, newTestBytes(revision{3, 0})}},
+		{"put", []interface{}{metaBucketName, scheduledCompactKeyName, newTestRevBytes(revision{3, 0})}},
 		{"range", []interface{}{keyBucketName, make([]byte, 17), end, int64(10000)}},
-		{"delete", []interface{}{keyBucketName, newTestBytes(revision{2, 0})}},
-		{"put", []interface{}{metaBucketName, finishedCompactKeyName, newTestBytes(revision{3, 0})}},
+		{"delete", []interface{}{keyBucketName, key2}},
+		{"put", []interface{}{metaBucketName, finishedCompactKeyName, newTestRevBytes(revision{3, 0})}},
 	}
 	if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
 		t.Errorf("tx actions = %+v, want %+v", g, wact)
@@ -355,33 +353,29 @@ func TestStoreCompact(t *testing.T) {
 func TestStoreRestore(t *testing.T) {
 	s, b, index := newFakeStore()
 
-	putev := storagepb.Event{
-		Type: storagepb.PUT,
-		Kv: &storagepb.KeyValue{
-			Key:            []byte("foo"),
-			Value:          []byte("bar"),
-			CreateRevision: 3,
-			ModRevision:    3,
-			Version:        1,
-		},
+	putkey := newTestKeyBytes(revision{3, 0}, false)
+	putkv := storagepb.KeyValue{
+		Key:            []byte("foo"),
+		Value:          []byte("bar"),
+		CreateRevision: 3,
+		ModRevision:    3,
+		Version:        1,
 	}
-	putevb, err := putev.Marshal()
+	putkvb, err := putkv.Marshal()
 	if err != nil {
 		t.Fatal(err)
 	}
-	delev := storagepb.Event{
-		Type: storagepb.DELETE,
-		Kv: &storagepb.KeyValue{
-			Key: []byte("foo"),
-		},
+	delkey := newTestKeyBytes(revision{4, 0}, true)
+	delkv := storagepb.KeyValue{
+		Key: []byte("foo"),
 	}
-	delevb, err := delev.Marshal()
+	delkvb, err := delkv.Marshal()
 	if err != nil {
 		t.Fatal(err)
 	}
-	b.tx.rangeRespc <- rangeResp{[][]byte{finishedCompactKeyName}, [][]byte{newTestBytes(revision{2, 0})}}
-	b.tx.rangeRespc <- rangeResp{[][]byte{newTestBytes(revision{3, 0}), newTestBytes(revision{4, 0})}, [][]byte{putevb, delevb}}
-	b.tx.rangeRespc <- rangeResp{[][]byte{scheduledCompactKeyName}, [][]byte{newTestBytes(revision{2, 0})}}
+	b.tx.rangeRespc <- rangeResp{[][]byte{finishedCompactKeyName}, [][]byte{newTestRevBytes(revision{2, 0})}}
+	b.tx.rangeRespc <- rangeResp{[][]byte{putkey, delkey}, [][]byte{putkvb, delkvb}}
+	b.tx.rangeRespc <- rangeResp{[][]byte{scheduledCompactKeyName}, [][]byte{newTestRevBytes(revision{2, 0})}}
 
 	s.Restore()
 
@@ -394,7 +388,7 @@ func TestStoreRestore(t *testing.T) {
 	}
 	wact := []testutil.Action{
 		{"range", []interface{}{metaBucketName, finishedCompactKeyName, []byte(nil), int64(0)}},
-		{"range", []interface{}{keyBucketName, newTestBytes(revision{}), newTestBytes(revision{math.MaxInt64, math.MaxInt64}), int64(0)}},
+		{"range", []interface{}{keyBucketName, newTestRevBytes(revision{}), newTestRevBytes(revision{math.MaxInt64, math.MaxInt64}), int64(0)}},
 		{"range", []interface{}{metaBucketName, scheduledCompactKeyName, []byte(nil), int64(0)}},
 	}
 	if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
@@ -410,79 +404,79 @@ func TestStoreRestore(t *testing.T) {
 }
 
 // tests end parameter works well
-func TestStoreRangeEventsEnd(t *testing.T) {
+func TestStoreRangeHistoryEnd(t *testing.T) {
 	s := newStore(tmpPath)
 	defer cleanup(s, tmpPath)
 
 	s.Put([]byte("foo"), []byte("bar"))
 	s.Put([]byte("foo1"), []byte("bar1"))
 	s.Put([]byte("foo2"), []byte("bar2"))
-	evs := []storagepb.Event{
-		{
-			Type: storagepb.PUT,
-			Kv:   &storagepb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1},
-		},
-		{
-			Type: storagepb.PUT,
-			Kv:   &storagepb.KeyValue{Key: []byte("foo1"), Value: []byte("bar1"), CreateRevision: 2, ModRevision: 2, Version: 1},
-		},
-		{
-			Type: storagepb.PUT,
-			Kv:   &storagepb.KeyValue{Key: []byte("foo2"), Value: []byte("bar2"), CreateRevision: 3, ModRevision: 3, Version: 1},
-		},
+	keys := [][]byte{
+		newTestKeyBytes(revision{1, 0}, false),
+		newTestKeyBytes(revision{2, 0}, false),
+		newTestKeyBytes(revision{3, 0}, false),
+	}
+	kvs := []storagepb.KeyValue{
+		{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1},
+		{Key: []byte("foo1"), Value: []byte("bar1"), CreateRevision: 2, ModRevision: 2, Version: 1},
+		{Key: []byte("foo2"), Value: []byte("bar2"), CreateRevision: 3, ModRevision: 3, Version: 1},
 	}
 
 	tests := []struct {
 		key, end []byte
-		wevs     []storagepb.Event
+		wkeys    [][]byte
+		wkvs     []storagepb.KeyValue
 	}{
 		// get no keys
 		{
 			[]byte("doo"), []byte("foo"),
-			nil,
+			nil, nil,
 		},
 		// get no keys when key == end
 		{
 			[]byte("foo"), []byte("foo"),
-			nil,
+			nil, nil,
 		},
 		// get no keys when ranging single key
 		{
 			[]byte("doo"), nil,
-			nil,
+			nil, nil,
 		},
 		// get all keys
 		{
 			[]byte("foo"), []byte("foo3"),
-			evs,
+			keys, kvs,
 		},
 		// get partial keys
 		{
 			[]byte("foo"), []byte("foo1"),
-			evs[:1],
+			keys[:1], kvs[:1],
 		},
 		// get single key
 		{
 			[]byte("foo"), nil,
-			evs[:1],
+			keys[:1], kvs[:1],
 		},
 	}
 
 	for i, tt := range tests {
-		evs, rev, err := s.RangeEvents(tt.key, tt.end, 0, 1)
+		keys, kvs, rev, err := s.RangeHistory(tt.key, tt.end, 0, 1)
 		if err != nil {
 			t.Fatal(err)
 		}
 		if rev != 4 {
 			t.Errorf("#%d: rev = %d, want %d", i, rev, 4)
 		}
-		if !reflect.DeepEqual(evs, tt.wevs) {
-			t.Errorf("#%d: evs = %+v, want %+v", i, evs, tt.wevs)
+		if !reflect.DeepEqual(keys, tt.wkeys) {
+			t.Errorf("#%d: actions = %+v, want %+v", i, keys, tt.wkeys)
+		}
+		if !reflect.DeepEqual(kvs, tt.wkvs) {
+			t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, tt.wkvs)
 		}
 	}
 }
 
-func TestStoreRangeEventsRev(t *testing.T) {
+func TestStoreRangeHistoryRev(t *testing.T) {
 	s := newStore(tmpPath)
 	defer cleanup(s, tmpPath)
 
@@ -490,39 +484,39 @@ func TestStoreRangeEventsRev(t *testing.T) {
 	s.DeleteRange([]byte("foo"), nil)
 	s.Put([]byte("foo"), []byte("bar"))
 	s.Put([]byte("unrelated"), []byte("unrelated"))
-	evs := []storagepb.Event{
-		{
-			Type: storagepb.PUT,
-			Kv:   &storagepb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1},
-		},
-		{
-			Type: storagepb.DELETE,
-			Kv:   &storagepb.KeyValue{Key: []byte("foo")},
-		},
-		{
-			Type: storagepb.PUT,
-			Kv:   &storagepb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 3, ModRevision: 3, Version: 1},
-		},
+	keys := [][]byte{
+		newTestKeyBytes(revision{1, 0}, false),
+		newTestKeyBytes(revision{2, 0}, true),
+		newTestKeyBytes(revision{3, 0}, false),
+	}
+	kvs := []storagepb.KeyValue{
+		{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1},
+		{Key: []byte("foo")},
+		{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 3, ModRevision: 3, Version: 1},
 	}
 
 	tests := []struct {
 		start int64
 
-		wevs  []storagepb.Event
+		wkeys [][]byte
+		wkvs  []storagepb.KeyValue
 		wnext int64
 	}{
-		{0, evs, 5},
-		{1, evs, 5},
-		{3, evs[2:], 5},
+		{0, keys, kvs, 5},
+		{1, keys, kvs, 5},
+		{3, keys[2:], kvs[2:], 5},
 	}
 
 	for i, tt := range tests {
-		evs, next, err := s.RangeEvents([]byte("foo"), nil, 0, tt.start)
+		keys, kvs, next, err := s.RangeHistory([]byte("foo"), nil, 0, tt.start)
 		if err != nil {
 			t.Fatal(err)
 		}
-		if !reflect.DeepEqual(evs, tt.wevs) {
-			t.Errorf("#%d: evs = %+v, want %+v", i, evs, tt.wevs)
+		if !reflect.DeepEqual(keys, tt.wkeys) {
+			t.Errorf("#%d: acts = %+v, want %+v", i, keys, tt.wkeys)
+		}
+		if !reflect.DeepEqual(kvs, tt.wkvs) {
+			t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, tt.wkvs)
 		}
 		if next != tt.wnext {
 			t.Errorf("#%d: next = %d, want %d", i, next, tt.wnext)
@@ -530,7 +524,7 @@ func TestStoreRangeEventsRev(t *testing.T) {
 	}
 }
 
-func TestStoreRangeEventsBad(t *testing.T) {
+func TestStoreRangeHistoryBad(t *testing.T) {
 	s := newStore(tmpPath)
 	defer cleanup(s, tmpPath)
 
@@ -552,55 +546,55 @@ func TestStoreRangeEventsBad(t *testing.T) {
 		{10, ErrFutureRev},
 	}
 	for i, tt := range tests {
-		_, _, err := s.RangeEvents([]byte("foo"), nil, 0, tt.rev)
+		_, _, _, err := s.RangeHistory([]byte("foo"), nil, 0, tt.rev)
 		if err != tt.werr {
 			t.Errorf("#%d: error = %v, want %v", i, err, tt.werr)
 		}
 	}
 }
 
-func TestStoreRangeEventsLimit(t *testing.T) {
+func TestStoreRangeHistoryLimit(t *testing.T) {
 	s := newStore(tmpPath)
 	defer cleanup(s, tmpPath)
 
 	s.Put([]byte("foo"), []byte("bar"))
 	s.DeleteRange([]byte("foo"), nil)
 	s.Put([]byte("foo"), []byte("bar"))
-	evs := []storagepb.Event{
-		{
-			Type: storagepb.PUT,
-			Kv:   &storagepb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1},
-		},
-		{
-			Type: storagepb.DELETE,
-			Kv:   &storagepb.KeyValue{Key: []byte("foo")},
-		},
-		{
-			Type: storagepb.PUT,
-			Kv:   &storagepb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 3, ModRevision: 3, Version: 1},
-		},
+	keys := [][]byte{
+		newTestKeyBytes(revision{1, 0}, false),
+		newTestKeyBytes(revision{2, 0}, true),
+		newTestKeyBytes(revision{3, 0}, false),
+	}
+	kvs := []storagepb.KeyValue{
+		{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1},
+		{Key: []byte("foo")},
+		{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 3, ModRevision: 3, Version: 1},
 	}
 
 	tests := []struct {
 		limit int64
-		wevs  []storagepb.Event
+		wkeys [][]byte
+		wkvs  []storagepb.KeyValue
 	}{
 		// no limit
-		{-1, evs},
+		{-1, keys, kvs},
 		// no limit
-		{0, evs},
-		{1, evs[:1]},
-		{2, evs[:2]},
-		{3, evs},
-		{100, evs},
+		{0, keys, kvs},
+		{1, keys[:1], kvs[:1]},
+		{2, keys[:2], kvs[:2]},
+		{3, keys, kvs},
+		{100, keys, kvs},
 	}
 	for i, tt := range tests {
-		evs, _, err := s.RangeEvents([]byte("foo"), nil, tt.limit, 1)
+		keys, kvs, _, err := s.RangeHistory([]byte("foo"), nil, tt.limit, 1)
 		if err != nil {
 			t.Fatalf("#%d: range error (%v)", i, err)
 		}
-		if !reflect.DeepEqual(evs, tt.wevs) {
-			t.Errorf("#%d: evs = %+v, want %+v", i, evs, tt.wevs)
+		if !reflect.DeepEqual(keys, tt.wkeys) {
+			t.Errorf("#%d: acts = %+v, want %+v", i, keys, tt.wkeys)
+		}
+		if !reflect.DeepEqual(kvs, tt.wkvs) {
+			t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, tt.wkvs)
 		}
 	}
 }
@@ -688,9 +682,18 @@ func BenchmarkStorePut(b *testing.B) {
 	}
 }
 
-func newTestBytes(rev revision) []byte {
+func newTestRevBytes(rev revision) []byte {
+	bytes := newRevBytes()
+	revToBytes(rev, bytes)
+	return bytes
+}
+
+func newTestKeyBytes(rev revision, tombstone bool) []byte {
 	bytes := newRevBytes()
 	revToBytes(rev, bytes)
+	if tombstone {
+		bytes = appendMarkTombstone(bytes)
+	}
 	return bytes
 }
 
@@ -792,7 +795,7 @@ func (i *fakeIndex) Tombstone(key []byte, rev revision) error {
 	i.Recorder.Record(testutil.Action{Name: "tombstone", Params: []interface{}{key, rev}})
 	return nil
 }
-func (i *fakeIndex) RangeEvents(key, end []byte, rev int64) []revision {
+func (i *fakeIndex) RangeSince(key, end []byte, rev int64) []revision {
 	i.Recorder.Record(testutil.Action{Name: "rangeEvents", Params: []interface{}{key, end, rev}})
 	r := <-i.indexRangeEventsRespc
 	return r.revs

+ 6 - 1
storage/revision.go

@@ -16,6 +16,11 @@ package storage
 
 import "encoding/binary"
 
+// revBytesLen is the byte length of a normal revision.
+// First 8 bytes is the revision.main in big-endian format. The 9th byte
+// is a '_'. The last 8 bytes is the revision.sub in big-endian format.
+const revBytesLen = 8 + 1 + 8
+
 type revision struct {
 	main int64
 	sub  int64
@@ -32,7 +37,7 @@ func (a revision) GreaterThan(b revision) bool {
 }
 
 func newRevBytes() []byte {
-	return make([]byte, 8+1+8)
+	return make([]byte, revBytesLen, markedRevBytesLen)
 }
 
 func revToBytes(rev revision, bytes []byte) {

+ 14 - 3
storage/watchable_store.go

@@ -247,7 +247,7 @@ func (s *watchableStore) syncWatchings() {
 		if limit == 0 {
 			continue
 		}
-		evs, nextRev, err := s.store.RangeEvents(w.key, end, int64(limit), w.cur)
+		revbs, kvs, nextRev, err := s.store.RangeHistory(w.key, end, int64(limit), w.cur)
 		if err != nil {
 			// TODO: send error event to watching
 			delete(s.unsynced, w)
@@ -255,8 +255,19 @@ func (s *watchableStore) syncWatchings() {
 		}
 
 		// push events to the channel
-		for _, ev := range evs {
-			w.ch <- ev
+		for i, kv := range kvs {
+			var evt storagepb.Event_EventType
+			switch {
+			case isTombstone(revbs[i]):
+				evt = storagepb.DELETE
+			default:
+				evt = storagepb.PUT
+			}
+
+			w.ch <- storagepb.Event{
+				Type: evt,
+				Kv:   &kv,
+			}
 			pendingEventsGauge.Inc()
 		}
 		// switch to tracking future events if needed