Преглед изворни кода

Merge pull request #2930 from xiang90/storage_restore

storage: initial snapshot and restore
Xiang Li пре 10 година
родитељ
комит
97709b202d
7 измењених фајлова са 192 додато и 7 уклоњено
  1. 11 0
      storage/backend/backend.go
  2. 23 0
      storage/index.go
  3. 36 0
      storage/key_index.go
  4. 11 1
      storage/kv.go
  5. 82 6
      storage/kvstore.go
  6. 25 0
      storage/kvstore_test.go
  7. 4 0
      storage/reversion.go

+ 11 - 0
storage/backend/backend.go

@@ -1,6 +1,7 @@
 package backend
 
 import (
+	"io"
 	"log"
 	"time"
 
@@ -9,6 +10,7 @@ import (
 
 type Backend interface {
 	BatchTx() BatchTx
+	Snapshot(w io.Writer) (n int64, err error)
 	ForceCommit()
 	Close() error
 }
@@ -60,6 +62,14 @@ func (b *backend) ForceCommit() {
 	b.batchTx.Commit()
 }
 
+func (b *backend) Snapshot(w io.Writer) (n int64, err error) {
+	b.db.View(func(tx *bolt.Tx) error {
+		n, err = tx.WriteTo(w)
+		return nil
+	})
+	return n, err
+}
+
 func (b *backend) run() {
 	defer close(b.donec)
 
@@ -70,6 +80,7 @@ func (b *backend) run() {
 		select {
 		case <-time.After(b.batchInterval):
 		case <-b.stopc:
+			b.batchTx.Commit()
 			return
 		}
 		b.batchTx.Commit()

+ 23 - 0
storage/index.go

@@ -13,6 +13,7 @@ type index interface {
 	Put(key []byte, rev reversion)
 	Tombstone(key []byte, rev reversion) error
 	Compact(rev int64) map[reversion]struct{}
+	Equal(b index) bool
 }
 
 type treeIndex struct {
@@ -130,3 +131,25 @@ func compactIndex(rev int64, available map[reversion]struct{}, emptyki *[]*keyIn
 		return true
 	}
 }
+
+func (a *treeIndex) Equal(bi index) bool {
+	b := bi.(*treeIndex)
+
+	if a.tree.Len() != b.tree.Len() {
+		return false
+	}
+
+	equal := true
+
+	a.tree.Ascend(func(item btree.Item) bool {
+		aki := item.(*keyIndex)
+		bki := b.tree.Get(item).(*keyIndex)
+		if !aki.equal(bki) {
+			equal = false
+			return false
+		}
+		return true
+	})
+
+	return equal
+}

+ 36 - 0
storage/key_index.go

@@ -187,6 +187,25 @@ func (a *keyIndex) Less(b btree.Item) bool {
 	return bytes.Compare(a.key, b.(*keyIndex).key) == -1
 }
 
+func (a *keyIndex) equal(b *keyIndex) bool {
+	if !bytes.Equal(a.key, b.key) {
+		return false
+	}
+	if a.rev != b.rev {
+		return false
+	}
+	if len(a.generations) != len(b.generations) {
+		return false
+	}
+	for i := range a.generations {
+		ag, bg := a.generations[i], b.generations[i]
+		if !ag.equal(bg) {
+			return false
+		}
+	}
+	return true
+}
+
 func (ki *keyIndex) String() string {
 	var s string
 	for _, g := range ki.generations {
@@ -221,3 +240,20 @@ func (g *generation) walk(f func(rev reversion) bool) int {
 func (g *generation) String() string {
 	return fmt.Sprintf("g: ver[%d], revs %#v\n", g.ver, g.revs)
 }
+
+func (a generation) equal(b generation) bool {
+	if a.ver != b.ver {
+		return false
+	}
+	if len(a.revs) != len(b.revs) {
+		return false
+	}
+
+	for i := range a.revs {
+		ar, br := a.revs[i], b.revs[i]
+		if ar != br {
+			return false
+		}
+	}
+	return true
+}

+ 11 - 1
storage/kv.go

@@ -1,6 +1,10 @@
 package storage
 
-import "github.com/coreos/etcd/storage/storagepb"
+import (
+	"io"
+
+	"github.com/coreos/etcd/storage/storagepb"
+)
 
 type KV interface {
 	// Range gets the keys in the range at rangeRev.
@@ -35,4 +39,10 @@ type KV interface {
 	TnxDeleteRange(tnxID int64, key, end []byte) (n, rev int64, err error)
 
 	Compact(rev int64) error
+
+	// Write a snapshot to the given io writer
+	Snapshot(w io.Writer) (int64, error)
+
+	Restore() error
+	Close() error
 }

+ 82 - 6
storage/kvstore.go

@@ -2,7 +2,9 @@ package storage
 
 import (
 	"errors"
+	"io"
 	"log"
+	"math"
 	"math/rand"
 	"sync"
 	"time"
@@ -37,7 +39,7 @@ type store struct {
 	tnxID int64      // tracks the current tnxID to verify tnx operations
 }
 
-func newStore(path string) KV {
+func newStore(path string) *store {
 	s := &store{
 		b:              backend.New(path, batchInterval, batchLimit),
 		kvindex:        newTreeIndex(),
@@ -146,7 +148,7 @@ func (s *store) Compact(rev int64) error {
 
 	s.compactMainRev = rev
 
-	rbytes := make([]byte, 8+1+8)
+	rbytes := newRevBytes()
 	revToBytes(reversion{main: rev}, rbytes)
 
 	tx := s.b.BatchTx()
@@ -160,6 +162,80 @@ func (s *store) Compact(rev int64) error {
 	return nil
 }
 
+func (s *store) Snapshot(w io.Writer) (int64, error) {
+	s.b.ForceCommit()
+	return s.b.Snapshot(w)
+}
+
+func (s *store) Restore() error {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+
+	min, max := newRevBytes(), newRevBytes()
+	revToBytes(reversion{}, min)
+	revToBytes(reversion{main: math.MaxInt64, sub: math.MaxInt64}, max)
+
+	// restore index
+	tx := s.b.BatchTx()
+	tx.Lock()
+	_, finishedCompactBytes := tx.UnsafeRange(keyBucketName, finishedCompactKeyName, nil, 0)
+	if len(finishedCompactBytes) != 0 {
+		s.compactMainRev = bytesToRev(finishedCompactBytes[0]).main
+		log.Printf("storage: restore compact to %d", s.compactMainRev)
+	}
+
+	// TODO: limit N to reduce max memory usage
+	keys, vals := tx.UnsafeRange(keyBucketName, min, max, 0)
+	for i, key := range keys {
+		e := &storagepb.Event{}
+		if err := e.Unmarshal(vals[i]); err != nil {
+			log.Fatalf("storage: cannot unmarshal event: %v", err)
+		}
+
+		rev := bytesToRev(key)
+
+		// restore index
+		switch e.Type {
+		case storagepb.PUT:
+			s.kvindex.Put(e.Kv.Key, rev)
+		case storagepb.DELETE:
+			s.kvindex.Tombstone(e.Kv.Key, rev)
+		default:
+			log.Panicf("storage: unexpected event type %s", e.Type)
+		}
+
+		// update reversion
+		s.currentRev = rev
+	}
+
+	_, scheduledCompactBytes := tx.UnsafeRange(keyBucketName, scheduledCompactKeyName, nil, 0)
+	if len(scheduledCompactBytes) != 0 {
+		scheduledCompact := bytesToRev(finishedCompactBytes[0]).main
+		if scheduledCompact > s.compactMainRev {
+			log.Printf("storage: resume scheduled compaction at %d", scheduledCompact)
+			go s.Compact(scheduledCompact)
+		}
+	}
+
+	tx.Unlock()
+
+	return nil
+}
+
+func (s *store) Close() error {
+	return s.b.Close()
+}
+
+func (a *store) Equal(b *store) bool {
+	if a.currentRev != b.currentRev {
+		return false
+	}
+	if a.compactMainRev != b.compactMainRev {
+		return false
+	}
+	return a.kvindex.Equal(b.kvindex)
+}
+
 // range is a keyword in Go, add Keys suffix.
 func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) {
 	if rangeRev <= 0 {
@@ -186,7 +262,7 @@ func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64) (kvs []storage
 	tx.Lock()
 	defer tx.Unlock()
 	for _, revpair := range revpairs {
-		revbytes := make([]byte, 8+1+8)
+		revbytes := newRevBytes()
 		revToBytes(revpair, revbytes)
 
 		_, vs := tx.UnsafeRange(keyBucketName, revbytes, nil, 0)
@@ -206,7 +282,7 @@ func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64) (kvs []storage
 }
 
 func (s *store) put(key, value []byte, rev int64) {
-	ibytes := make([]byte, 8+1+8)
+	ibytes := newRevBytes()
 	revToBytes(reversion{main: rev, sub: s.currentRev.sub}, ibytes)
 
 	event := storagepb.Event{
@@ -266,7 +342,7 @@ func (s *store) delete(key []byte, mainrev int64) bool {
 	tx.Lock()
 	defer tx.Unlock()
 
-	revbytes := make([]byte, 8+1+8)
+	revbytes := newRevBytes()
 	revToBytes(rev, revbytes)
 
 	_, vs := tx.UnsafeRange(keyBucketName, revbytes, nil, 0)
@@ -282,7 +358,7 @@ func (s *store) delete(key []byte, mainrev int64) bool {
 		return false
 	}
 
-	ibytes := make([]byte, 8+1+8)
+	ibytes := newRevBytes()
 	revToBytes(reversion{main: mainrev, sub: s.currentRev.sub}, ibytes)
 
 	event := storagepb.Event{

+ 25 - 0
storage/kvstore_test.go

@@ -320,6 +320,31 @@ func TestCompaction(t *testing.T) {
 	}
 }
 
+// TODO: test more complicated cases:
+// with unfinished compaction
+// with removed keys
+func TestRestore(t *testing.T) {
+	s0 := newStore("test")
+	defer os.Remove("test")
+
+	s0.Put([]byte("foo"), []byte("bar"))
+	s0.Put([]byte("foo1"), []byte("bar1"))
+	s0.Put([]byte("foo2"), []byte("bar2"))
+	s0.Put([]byte("foo"), []byte("bar11"))
+	s0.Put([]byte("foo1"), []byte("bar12"))
+	s0.Put([]byte("foo2"), []byte("bar13"))
+	s0.Put([]byte("foo1"), []byte("bar14"))
+
+	s0.Close()
+
+	s1 := newStore("test")
+	s1.Restore()
+
+	if !s0.Equal(s1) {
+		t.Errorf("not equal!")
+	}
+}
+
 func BenchmarkStorePut(b *testing.B) {
 	s := newStore("test")
 	defer os.Remove("test")

+ 4 - 0
storage/reversion.go

@@ -7,6 +7,10 @@ type reversion struct {
 	sub  int64
 }
 
+func newRevBytes() []byte {
+	return make([]byte, 8+1+8)
+}
+
 func revToBytes(rev reversion, bytes []byte) {
 	binary.BigEndian.PutUint64(bytes, uint64(rev.main))
 	bytes[8] = '_'