Browse Source

Merge pull request #4151 from xiang90/s

storage: support recovering from backend
Xiang Li 10 years ago
parent
commit
b42a0e4283
6 changed files with 51 additions and 37 deletions
  1. 22 30
      etcdserver/server.go
  2. 1 1
      etcdserver/server_test.go
  3. 2 1
      storage/kv.go
  4. 1 2
      storage/kv_test.go
  5. 24 1
      storage/kvstore.go
  6. 1 2
      storage/kvstore_test.go

+ 22 - 30
etcdserver/server.go

@@ -24,7 +24,6 @@ import (
 	"os"
 	"path"
 	"regexp"
-	"sync"
 	"sync/atomic"
 	"time"
 
@@ -167,8 +166,8 @@ type EtcdServer struct {
 
 	store store.Store
 
-	kvMu sync.RWMutex
-	kv   dstorage.ConsistentWatchableKV
+	kv dstorage.ConsistentWatchableKV
+	be backend.Backend
 
 	stats  *stats.ServerStats
 	lstats *stats.LeaderStats
@@ -359,11 +358,8 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
 	}
 
 	if cfg.V3demo {
-		be := backend.NewDefaultBackend(path.Join(cfg.SnapDir(), databaseFilename))
-		srv.kv = dstorage.New(be, &srv.consistIndex)
-		if err := srv.kv.Restore(); err != nil {
-			plog.Fatalf("v3 storage restore error: %v", err)
-		}
+		srv.be = backend.NewDefaultBackend(path.Join(cfg.SnapDir(), databaseFilename))
+		srv.kv = dstorage.New(srv.be, &srv.consistIndex)
 	}
 
 	// TODO: move transport initialization near the definition of remote
@@ -542,6 +538,14 @@ func (s *EtcdServer) run() {
 
 	defer func() {
 		s.r.stop()
+		// kv and backend can be nil if runing without v3 enabled
+		// or running unit tests.
+		if s.kv != nil {
+			s.kv.Close()
+		}
+		if s.be != nil {
+			s.be.Close()
+		}
 		close(s.done)
 		<-appdonec
 	}()
@@ -586,21 +590,21 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
 		}
 
 		newbe := backend.NewDefaultBackend(fn)
-		newKV := dstorage.New(newbe, &s.consistIndex)
-		if err := newKV.Restore(); err != nil {
+		if err := s.kv.Restore(newbe); err != nil {
 			plog.Panicf("restore KV error: %v", err)
 		}
 
-		oldKV := s.swapKV(newKV)
-
-		// Closing oldKV might block until all the txns
-		// on the kv are finished.
-		// We do not want to wait on closing the old kv.
+		// Closing old backend might block until all the txns
+		// on the backend are finished.
+		// We do not want to wait on closing the old backend.
+		oldbe := s.be
 		go func() {
-			if err := oldKV.Close(); err != nil {
-				plog.Panicf("close KV error: %v", err)
+			if err := oldbe.Close(); err != nil {
+				plog.Panicf("close backend error: %v", err)
 			}
 		}()
+
+		s.be = newbe
 	}
 	if err := s.store.Recovery(apply.snapshot.Data); err != nil {
 		plog.Panicf("recovery store error: %v", err)
@@ -1277,16 +1281,4 @@ func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error {
 	}
 }
 
-func (s *EtcdServer) getKV() dstorage.ConsistentWatchableKV {
-	s.kvMu.RLock()
-	defer s.kvMu.RUnlock()
-	return s.kv
-}
-
-func (s *EtcdServer) swapKV(kv dstorage.ConsistentWatchableKV) dstorage.ConsistentWatchableKV {
-	s.kvMu.Lock()
-	defer s.kvMu.Unlock()
-	old := s.kv
-	s.kv = kv
-	return old
-}
+func (s *EtcdServer) getKV() dstorage.ConsistentWatchableKV { return s.kv }

+ 1 - 1
etcdserver/server_test.go

@@ -867,10 +867,10 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
 
 	be, tmpPath := backend.NewDefaultTmpBackend()
 	defer func() {
-		be.Close()
 		os.RemoveAll(tmpPath)
 	}()
 	s.kv = dstorage.New(be, &s.consistIndex)
+	s.be = be
 
 	s.start()
 	defer s.Stop()

+ 2 - 1
storage/kv.go

@@ -71,7 +71,8 @@ type KV interface {
 	// Commit commits txns into the underlying backend.
 	Commit()
 
-	Restore() error
+	// Restore restores the KV store from a backend.
+	Restore(b backend.Backend) error
 	Close() error
 }
 

+ 1 - 2
storage/kv_test.go

@@ -676,8 +676,8 @@ func TestKVRestore(t *testing.T) {
 		}
 		s.Close()
 
+		// ns should recover the the previous state from backend.
 		ns := NewStore(b)
-		ns.Restore()
 		// wait for possible compaction to finish
 		testutil.WaitSchedule()
 		var nkvss [][]storagepb.KeyValue
@@ -724,7 +724,6 @@ func TestKVSnapshot(t *testing.T) {
 
 	ns := NewStore(b)
 	defer ns.Close()
-	ns.Restore()
 	kvs, rev, err := ns.Range([]byte("a"), []byte("z"), 0, 0)
 	if err != nil {
 		t.Errorf("unexpect range error (%v)", err)

+ 24 - 1
storage/kvstore.go

@@ -84,6 +84,11 @@ func NewStore(b backend.Backend) *store {
 	tx.Unlock()
 	s.b.ForceCommit()
 
+	if err := s.restore(); err != nil {
+		// TODO: return the error instead of panic here?
+		panic("failed to recover store from backend")
+	}
+
 	return s
 }
 
@@ -237,10 +242,28 @@ func (s *store) Snapshot() Snapshot {
 
 func (s *store) Commit() { s.b.ForceCommit() }
 
-func (s *store) Restore() error {
+func (s *store) Restore(b backend.Backend) error {
 	s.mu.Lock()
 	defer s.mu.Unlock()
 
+	close(s.stopc)
+	// TODO: restore without waiting for compaction routine to finish.
+	// We need a way to notify that the store is finished using the old
+	// backend though.
+	s.wg.Wait()
+
+	s.b = b
+	s.kvindex = newTreeIndex()
+	s.currentRev = revision{}
+	s.compactMainRev = -1
+	s.tx = b.BatchTx()
+	s.txnID = -1
+	s.stopc = make(chan struct{})
+
+	return s.restore()
+}
+
+func (s *store) restore() error {
 	min, max := newRevBytes(), newRevBytes()
 	revToBytes(revision{}, min)
 	revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max)

+ 1 - 2
storage/kvstore_test.go

@@ -332,7 +332,7 @@ func TestStoreRestore(t *testing.T) {
 	b.tx.rangeRespc <- rangeResp{[][]byte{putkey, delkey}, [][]byte{putkvb, delkvb}}
 	b.tx.rangeRespc <- rangeResp{[][]byte{scheduledCompactKeyName}, [][]byte{newTestRevBytes(revision{2, 0})}}
 
-	s.Restore()
+	s.restore()
 
 	if s.compactMainRev != 2 {
 		t.Errorf("compact rev = %d, want 4", s.compactMainRev)
@@ -378,7 +378,6 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
 	s0.Close()
 
 	s1 := NewStore(b)
-	s1.Restore()
 
 	// wait for scheduled compaction to be finished
 	time.Sleep(100 * time.Millisecond)