Browse Source

Merge pull request #3067 from xiang90/storage_created_mod

storage: save created index and modified index
Xiang Li 10 years ago
parent
commit
c069119abe
6 changed files with 102 additions and 76 deletions
  1. 5 5
      storage/index.go
  2. 1 1
      storage/index_test.go
  3. 23 17
      storage/key_index.go
  4. 50 50
      storage/key_index_test.go
  5. 13 3
      storage/kvstore.go
  6. 10 0
      storage/reversion.go

+ 5 - 5
storage/index.go

@@ -8,7 +8,7 @@ import (
 )
 
 type index interface {
-	Get(key []byte, atRev int64) (rev reversion, err error)
+	Get(key []byte, atRev int64) (rev, created reversion, err error)
 	Range(key, end []byte, atRev int64) ([][]byte, []reversion)
 	Put(key []byte, rev reversion)
 	Tombstone(key []byte, rev reversion) error
@@ -42,14 +42,14 @@ func (ti *treeIndex) Put(key []byte, rev reversion) {
 	okeyi.put(rev.main, rev.sub)
 }
 
-func (ti *treeIndex) Get(key []byte, atRev int64) (rev reversion, err error) {
+func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created reversion, err error) {
 	keyi := &keyIndex{key: key}
 
 	ti.RLock()
 	defer ti.RUnlock()
 	item := ti.tree.Get(keyi)
 	if item == nil {
-		return reversion{}, ErrReversionNotFound
+		return reversion{}, reversion{}, ErrReversionNotFound
 	}
 
 	keyi = item.(*keyIndex)
@@ -58,7 +58,7 @@ func (ti *treeIndex) Get(key []byte, atRev int64) (rev reversion, err error) {
 
 func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs []reversion) {
 	if end == nil {
-		rev, err := ti.Get(key, atRev)
+		rev, _, err := ti.Get(key, atRev)
 		if err != nil {
 			return nil, nil
 		}
@@ -76,7 +76,7 @@ func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs []
 			return false
 		}
 		curKeyi := item.(*keyIndex)
-		rev, err := curKeyi.get(atRev)
+		rev, _, err := curKeyi.get(atRev)
 		if err != nil {
 			return true
 		}

+ 1 - 1
storage/index_test.go

@@ -197,7 +197,7 @@ func TestContinuousCompact(t *testing.T) {
 
 func verify(t *testing.T, index index, tests []T) {
 	for i, tt := range tests {
-		h, err := index.Get(tt.key, tt.rev)
+		h, _, err := index.Get(tt.key, tt.rev)
 		if err != tt.werr {
 			t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
 		}

+ 23 - 17
storage/key_index.go

@@ -55,43 +55,48 @@ var (
 //    {empty} -> key SHOULD be removed.
 type keyIndex struct {
 	key         []byte
-	rev         int64
+	modified    reversion // the main rev of the last modification
 	generations []generation
 }
 
 // put puts a reversion to the keyIndex.
-func (ki *keyIndex) put(rev int64, subrev int64) {
-	if rev < ki.rev {
-		log.Panicf("store.keyindex: put with unexpected smaller reversion [%d / %d]", rev, ki.rev)
+func (ki *keyIndex) put(main int64, sub int64) {
+	rev := reversion{main: main, sub: sub}
+
+	if !rev.GreaterThan(ki.modified) {
+		log.Panicf("store.keyindex: put with unexpected smaller reversion [%v / %v]", rev, ki.modified)
 	}
 	if len(ki.generations) == 0 {
 		ki.generations = append(ki.generations, generation{})
 	}
 	g := &ki.generations[len(ki.generations)-1]
-	g.revs = append(g.revs, reversion{rev, subrev})
+	if len(g.revs) == 0 {
+		g.created = rev
+	}
+	g.revs = append(g.revs, rev)
 	g.ver++
-	ki.rev = rev
+	ki.modified = rev
 }
 
 // tombstone puts a reversion, pointing to a tombstone, to the keyIndex.
 // It also creates a new empty generation in the keyIndex.
-func (ki *keyIndex) tombstone(rev int64, subrev int64) {
+func (ki *keyIndex) tombstone(main int64, sub int64) {
 	if ki.isEmpty() {
 		log.Panicf("store.keyindex: unexpected tombstone on empty keyIndex %s", string(ki.key))
 	}
-	ki.put(rev, subrev)
+	ki.put(main, sub)
 	ki.generations = append(ki.generations, generation{})
 }
 
-// get gets the reversion of the key that satisfies the given atRev.
+// get gets the modified and created reversion of the key that satisfies the given atRev.
 // Rev must be higher than or equal to the given atRev.
-func (ki *keyIndex) get(atRev int64) (rev reversion, err error) {
+func (ki *keyIndex) get(atRev int64) (modified, created reversion, err error) {
 	if ki.isEmpty() {
 		log.Panicf("store.keyindex: unexpected get on empty keyIndex %s", string(ki.key))
 	}
 	g := ki.findGeneration(atRev)
 	if g.isEmpty() {
-		return reversion{}, ErrReversionNotFound
+		return reversion{}, reversion{}, ErrReversionNotFound
 	}
 
 	f := func(rev reversion) bool {
@@ -103,10 +108,10 @@ func (ki *keyIndex) get(atRev int64) (rev reversion, err error) {
 
 	n := g.walk(f)
 	if n != -1 {
-		return g.revs[n], nil
+		return g.revs[n], g.created, nil
 	}
 
-	return reversion{}, ErrReversionNotFound
+	return reversion{}, reversion{}, ErrReversionNotFound
 }
 
 // compact compacts a keyIndex by removing the versions with smaller or equal
@@ -191,7 +196,7 @@ func (a *keyIndex) equal(b *keyIndex) bool {
 	if !bytes.Equal(a.key, b.key) {
 		return false
 	}
-	if a.rev != b.rev {
+	if a.modified != b.modified {
 		return false
 	}
 	if len(a.generations) != len(b.generations) {
@@ -215,8 +220,9 @@ func (ki *keyIndex) String() string {
 }
 
 type generation struct {
-	ver  int64
-	revs []reversion
+	ver     int64
+	created reversion // when the generation is created (put in first reversion).
+	revs    []reversion
 }
 
 func (g *generation) isEmpty() bool { return g == nil || len(g.revs) == 0 }
@@ -238,7 +244,7 @@ func (g *generation) walk(f func(rev reversion) bool) int {
 }
 
 func (g *generation) String() string {
-	return fmt.Sprintf("g: ver[%d], revs %#v\n", g.ver, g.revs)
+	return fmt.Sprintf("g: created[%d] ver[%d], revs %#v\n", g.created, g.ver, g.revs)
 }
 
 func (a generation) equal(b generation) bool {

+ 50 - 50
storage/key_index_test.go

@@ -39,7 +39,7 @@ func TestKeyIndexGet(t *testing.T) {
 	}
 
 	for i, tt := range tests {
-		rev, err := ki.get(tt.rev)
+		rev, _, err := ki.get(tt.rev)
 		if err != tt.werr {
 			t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
 		}
@@ -55,8 +55,8 @@ func TestKeyIndexPut(t *testing.T) {
 
 	wki := &keyIndex{
 		key:         []byte("foo"),
-		rev:         5,
-		generations: []generation{{ver: 1, revs: []reversion{{main: 5}}}},
+		modified:    reversion{5, 0},
+		generations: []generation{{created: reversion{5, 0}, ver: 1, revs: []reversion{{main: 5}}}},
 	}
 	if !reflect.DeepEqual(ki, wki) {
 		t.Errorf("ki = %+v, want %+v", ki, wki)
@@ -66,8 +66,8 @@ func TestKeyIndexPut(t *testing.T) {
 
 	wki = &keyIndex{
 		key:         []byte("foo"),
-		rev:         7,
-		generations: []generation{{ver: 2, revs: []reversion{{main: 5}, {main: 7}}}},
+		modified:    reversion{7, 0},
+		generations: []generation{{created: reversion{5, 0}, ver: 2, revs: []reversion{{main: 5}, {main: 7}}}},
 	}
 	if !reflect.DeepEqual(ki, wki) {
 		t.Errorf("ki = %+v, want %+v", ki, wki)
@@ -82,8 +82,8 @@ func TestKeyIndexTombstone(t *testing.T) {
 
 	wki := &keyIndex{
 		key:         []byte("foo"),
-		rev:         7,
-		generations: []generation{{ver: 2, revs: []reversion{{main: 5}, {main: 7}}}, {}},
+		modified:    reversion{7, 0},
+		generations: []generation{{created: reversion{5, 0}, ver: 2, revs: []reversion{{main: 5}, {main: 7}}}, {}},
 	}
 	if !reflect.DeepEqual(ki, wki) {
 		t.Errorf("ki = %+v, want %+v", ki, wki)
@@ -94,11 +94,11 @@ func TestKeyIndexTombstone(t *testing.T) {
 	ki.tombstone(15, 0)
 
 	wki = &keyIndex{
-		key: []byte("foo"),
-		rev: 15,
+		key:      []byte("foo"),
+		modified: reversion{15, 0},
 		generations: []generation{
-			{ver: 2, revs: []reversion{{main: 5}, {main: 7}}},
-			{ver: 3, revs: []reversion{{main: 8}, {main: 9}, {main: 15}}},
+			{created: reversion{5, 0}, ver: 2, revs: []reversion{{main: 5}, {main: 7}}},
+			{created: reversion{8, 0}, ver: 3, revs: []reversion{{main: 8}, {main: 9}, {main: 15}}},
 			{},
 		},
 	}
@@ -117,11 +117,11 @@ func TestKeyIndexCompact(t *testing.T) {
 		{
 			1,
 			&keyIndex{
-				key: []byte("foo"),
-				rev: 12,
+				key:      []byte("foo"),
+				modified: reversion{12, 0},
 				generations: []generation{
-					{ver: 3, revs: []reversion{{main: 2}, {main: 4}, {main: 6}}},
-					{ver: 3, revs: []reversion{{main: 8}, {main: 10}, {main: 12}}},
+					{created: reversion{2, 0}, ver: 3, revs: []reversion{{main: 2}, {main: 4}, {main: 6}}},
+					{created: reversion{8, 0}, ver: 3, revs: []reversion{{main: 8}, {main: 10}, {main: 12}}},
 					{},
 				},
 			},
@@ -130,11 +130,11 @@ func TestKeyIndexCompact(t *testing.T) {
 		{
 			2,
 			&keyIndex{
-				key: []byte("foo"),
-				rev: 12,
+				key:      []byte("foo"),
+				modified: reversion{12, 0},
 				generations: []generation{
-					{ver: 3, revs: []reversion{{main: 2}, {main: 4}, {main: 6}}},
-					{ver: 3, revs: []reversion{{main: 8}, {main: 10}, {main: 12}}},
+					{created: reversion{2, 0}, ver: 3, revs: []reversion{{main: 2}, {main: 4}, {main: 6}}},
+					{created: reversion{8, 0}, ver: 3, revs: []reversion{{main: 8}, {main: 10}, {main: 12}}},
 					{},
 				},
 			},
@@ -145,11 +145,11 @@ func TestKeyIndexCompact(t *testing.T) {
 		{
 			3,
 			&keyIndex{
-				key: []byte("foo"),
-				rev: 12,
+				key:      []byte("foo"),
+				modified: reversion{12, 0},
 				generations: []generation{
-					{ver: 3, revs: []reversion{{main: 2}, {main: 4}, {main: 6}}},
-					{ver: 3, revs: []reversion{{main: 8}, {main: 10}, {main: 12}}},
+					{created: reversion{2, 0}, ver: 3, revs: []reversion{{main: 2}, {main: 4}, {main: 6}}},
+					{created: reversion{8, 0}, ver: 3, revs: []reversion{{main: 8}, {main: 10}, {main: 12}}},
 					{},
 				},
 			},
@@ -160,11 +160,11 @@ func TestKeyIndexCompact(t *testing.T) {
 		{
 			4,
 			&keyIndex{
-				key: []byte("foo"),
-				rev: 12,
+				key:      []byte("foo"),
+				modified: reversion{12, 0},
 				generations: []generation{
-					{ver: 3, revs: []reversion{{main: 4}, {main: 6}}},
-					{ver: 3, revs: []reversion{{main: 8}, {main: 10}, {main: 12}}},
+					{created: reversion{2, 0}, ver: 3, revs: []reversion{{main: 4}, {main: 6}}},
+					{created: reversion{8, 0}, ver: 3, revs: []reversion{{main: 8}, {main: 10}, {main: 12}}},
 					{},
 				},
 			},
@@ -175,11 +175,11 @@ func TestKeyIndexCompact(t *testing.T) {
 		{
 			5,
 			&keyIndex{
-				key: []byte("foo"),
-				rev: 12,
+				key:      []byte("foo"),
+				modified: reversion{12, 0},
 				generations: []generation{
-					{ver: 3, revs: []reversion{{main: 4}, {main: 6}}},
-					{ver: 3, revs: []reversion{{main: 8}, {main: 10}, {main: 12}}},
+					{created: reversion{2, 0}, ver: 3, revs: []reversion{{main: 4}, {main: 6}}},
+					{created: reversion{8, 0}, ver: 3, revs: []reversion{{main: 8}, {main: 10}, {main: 12}}},
 					{},
 				},
 			},
@@ -190,10 +190,10 @@ func TestKeyIndexCompact(t *testing.T) {
 		{
 			6,
 			&keyIndex{
-				key: []byte("foo"),
-				rev: 12,
+				key:      []byte("foo"),
+				modified: reversion{12, 0},
 				generations: []generation{
-					{ver: 3, revs: []reversion{{main: 8}, {main: 10}, {main: 12}}},
+					{created: reversion{8, 0}, ver: 3, revs: []reversion{{main: 8}, {main: 10}, {main: 12}}},
 					{},
 				},
 			},
@@ -202,10 +202,10 @@ func TestKeyIndexCompact(t *testing.T) {
 		{
 			7,
 			&keyIndex{
-				key: []byte("foo"),
-				rev: 12,
+				key:      []byte("foo"),
+				modified: reversion{12, 0},
 				generations: []generation{
-					{ver: 3, revs: []reversion{{main: 8}, {main: 10}, {main: 12}}},
+					{created: reversion{8, 0}, ver: 3, revs: []reversion{{main: 8}, {main: 10}, {main: 12}}},
 					{},
 				},
 			},
@@ -214,10 +214,10 @@ func TestKeyIndexCompact(t *testing.T) {
 		{
 			8,
 			&keyIndex{
-				key: []byte("foo"),
-				rev: 12,
+				key:      []byte("foo"),
+				modified: reversion{12, 0},
 				generations: []generation{
-					{ver: 3, revs: []reversion{{main: 8}, {main: 10}, {main: 12}}},
+					{created: reversion{8, 0}, ver: 3, revs: []reversion{{main: 8}, {main: 10}, {main: 12}}},
 					{},
 				},
 			},
@@ -228,10 +228,10 @@ func TestKeyIndexCompact(t *testing.T) {
 		{
 			9,
 			&keyIndex{
-				key: []byte("foo"),
-				rev: 12,
+				key:      []byte("foo"),
+				modified: reversion{12, 0},
 				generations: []generation{
-					{ver: 3, revs: []reversion{{main: 8}, {main: 10}, {main: 12}}},
+					{created: reversion{8, 0}, ver: 3, revs: []reversion{{main: 8}, {main: 10}, {main: 12}}},
 					{},
 				},
 			},
@@ -242,10 +242,10 @@ func TestKeyIndexCompact(t *testing.T) {
 		{
 			10,
 			&keyIndex{
-				key: []byte("foo"),
-				rev: 12,
+				key:      []byte("foo"),
+				modified: reversion{12, 0},
 				generations: []generation{
-					{ver: 3, revs: []reversion{{main: 10}, {main: 12}}},
+					{created: reversion{8, 0}, ver: 3, revs: []reversion{{main: 10}, {main: 12}}},
 					{},
 				},
 			},
@@ -256,10 +256,10 @@ func TestKeyIndexCompact(t *testing.T) {
 		{
 			11,
 			&keyIndex{
-				key: []byte("foo"),
-				rev: 12,
+				key:      []byte("foo"),
+				modified: reversion{12, 0},
 				generations: []generation{
-					{ver: 3, revs: []reversion{{main: 10}, {main: 12}}},
+					{created: reversion{8, 0}, ver: 3, revs: []reversion{{main: 10}, {main: 12}}},
 					{},
 				},
 			},
@@ -271,7 +271,7 @@ func TestKeyIndexCompact(t *testing.T) {
 			12,
 			&keyIndex{
 				key:         []byte("foo"),
-				rev:         12,
+				modified:    reversion{12, 0},
 				generations: []generation{{}},
 			},
 			map[reversion]struct{}{},

+ 13 - 3
storage/kvstore.go

@@ -295,14 +295,24 @@ func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64) (kvs []storage
 }
 
 func (s *store) put(key, value []byte, rev int64) {
+	c := rev
+
+	// if the key exists before, use its previous created
+	_, created, err := s.kvindex.Get(key, rev)
+	if err == nil {
+		c = created.main
+	}
+
 	ibytes := newRevBytes()
 	revToBytes(reversion{main: rev, sub: s.currentRev.sub}, ibytes)
 
 	event := storagepb.Event{
 		Type: storagepb.PUT,
 		Kv: storagepb.KeyValue{
-			Key:   key,
-			Value: value,
+			Key:         key,
+			Value:       value,
+			CreateIndex: c,
+			ModIndex:    rev,
 		},
 	}
 
@@ -345,7 +355,7 @@ func (s *store) delete(key []byte, mainrev int64) bool {
 	if s.currentRev.sub > 0 {
 		grev += 1
 	}
-	rev, err := s.kvindex.Get(key, grev)
+	rev, _, err := s.kvindex.Get(key, grev)
 	if err != nil {
 		// key not exist
 		return false

+ 10 - 0
storage/reversion.go

@@ -7,6 +7,16 @@ type reversion struct {
 	sub  int64
 }
 
+func (a reversion) GreaterThan(b reversion) bool {
+	if a.main > b.main {
+		return true
+	}
+	if a.main < b.main {
+		return false
+	}
+	return a.sub > b.sub
+}
+
 func newRevBytes() []byte {
 	return make([]byte, 8+1+8)
 }