Browse Source

Merge pull request #7876 from fanminshi/fix_7628

etcdserver: renaming db happens after snapshot persists to wal and snap files
fanmin shi 8 years ago
parent
commit
47f5b7c3ad
6 changed files with 174 additions and 26 deletions
  1. 11 7
      etcdserver/raft.go
  2. 1 1
      etcdserver/raft_test.go
  3. 10 15
      etcdserver/server.go
  4. 83 0
      etcdserver/server_test.go
  5. 59 0
      etcdserver/util.go
  6. 10 3
      snap/db.go

+ 11 - 7
etcdserver/raft.go

@@ -83,7 +83,8 @@ type RaftTimer interface {
 type apply struct {
 	entries  []raftpb.Entry
 	snapshot raftpb.Snapshot
-	raftDone <-chan struct{} // rx {} after raft has persisted messages
+	// notifyc synchronizes etcd server applies with the raft node
+	notifyc chan struct{}
 }
 
 type raftNode struct {
@@ -190,11 +191,11 @@ func (r *raftNode) start(rh *raftReadyHandler) {
 					}
 				}
 
-				raftDone := make(chan struct{}, 1)
+				notifyc := make(chan struct{}, 1)
 				ap := apply{
 					entries:  rd.CommittedEntries,
 					snapshot: rd.Snapshot,
-					raftDone: raftDone,
+					notifyc:  notifyc,
 				}
 
 				updateCommittedIndex(&ap, rh)
@@ -227,6 +228,9 @@ func (r *raftNode) start(rh *raftReadyHandler) {
 					if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
 						plog.Fatalf("raft save snapshot error: %v", err)
 					}
+					// etcdserver now claim the snapshot has been persisted onto the disk
+					notifyc <- struct{}{}
+
 					// gofail: var raftAfterSaveSnap struct{}
 					r.raftStorage.ApplySnapshot(rd.Snapshot)
 					plog.Infof("raft applied incoming snapshot at index %d", rd.Snapshot.Metadata.Index)
@@ -240,7 +244,7 @@ func (r *raftNode) start(rh *raftReadyHandler) {
 					msgs := r.processMessages(rd.Messages)
 
 					// now unblocks 'applyAll' that waits on Raft log disk writes before triggering snapshots
-					raftDone <- struct{}{}
+					notifyc <- struct{}{}
 
 					// Candidate or follower needs to wait for all pending configuration
 					// changes to be applied before sending messages.
@@ -259,9 +263,9 @@ func (r *raftNode) start(rh *raftReadyHandler) {
 					if waitApply {
 						// blocks until 'applyAll' calls 'applyWait.Trigger'
 						// to be in sync with scheduled config-change job
-						// (assume raftDone has cap of 1)
+						// (assume notifyc has cap of 1)
 						select {
-						case raftDone <- struct{}{}:
+						case notifyc <- struct{}{}:
 						case <-r.stopped:
 							return
 						}
@@ -271,7 +275,7 @@ func (r *raftNode) start(rh *raftReadyHandler) {
 					r.transport.Send(msgs)
 				} else {
 					// leader already processed 'MsgSnap' and signaled
-					raftDone <- struct{}{}
+					notifyc <- struct{}{}
 				}
 
 				r.Advance()

+ 1 - 1
etcdserver/raft_test.go

@@ -211,7 +211,7 @@ func TestConfgChangeBlocksApply(t *testing.T) {
 	}
 
 	// finish apply, unblock raft routine
-	<-ap.raftDone
+	<-ap.notifyc
 
 	select {
 	case <-continueC:

+ 10 - 15
etcdserver/server.go

@@ -274,20 +274,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
 	bepath := filepath.Join(cfg.SnapDir(), databaseFilename)
 	beExist := fileutil.Exist(bepath)
 
-	var be backend.Backend
-	beOpened := make(chan struct{})
-	go func() {
-		be = newBackend(bepath, cfg.QuotaBackendBytes)
-		beOpened <- struct{}{}
-	}()
-
-	select {
-	case <-beOpened:
-	case <-time.After(time.Second):
-		plog.Warningf("another etcd process is running with the same data dir and holding the file lock.")
-		plog.Warningf("waiting for it to exit before starting...")
-		<-beOpened
-	}
+	be := openBackend(bepath, cfg.QuotaBackendBytes)
 
 	defer func() {
 		if err != nil {
@@ -385,6 +372,11 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
 				plog.Panicf("recovered store from snapshot error: %v", err)
 			}
 			plog.Infof("recovered store from snapshot at index %d", snapshot.Metadata.Index)
+
+			be, err = checkAndRecoverDB(snapshot, be, cfg.QuotaBackendBytes, cfg.SnapDir())
+			if err != nil {
+				plog.Panicf("recovering backend from snapshot error: %v", err)
+			}
 		}
 		cfg.Print()
 		if !cfg.ForceNewCluster {
@@ -778,7 +770,7 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
 	// wait for the raft routine to finish the disk writes before triggering a
 	// snapshot. or applied index might be greater than the last index in raft
 	// storage, since the raft routine might be slower than apply routine.
-	<-apply.raftDone
+	<-apply.notifyc
 
 	s.triggerSnapshot(ep)
 	select {
@@ -803,6 +795,9 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
 			apply.snapshot.Metadata.Index, ep.appliedi)
 	}
 
+	// wait for raftNode to persist snashot onto the disk
+	<-apply.notifyc
+
 	snapfn, err := s.r.storage.DBFilePath(apply.snapshot.Metadata.Index)
 	if err != nil {
 		plog.Panicf("get database snapshot file path error: %v", err)

+ 83 - 0
etcdserver/server_test.go

@@ -951,6 +951,89 @@ func TestSnapshot(t *testing.T) {
 	<-ch
 }
 
+// TestSnapshotOrdering ensures raft persists snapshot onto disk before
+// snapshot db is applied.
+func TestSnapshotOrdering(t *testing.T) {
+	n := newNopReadyNode()
+	st := store.New()
+	cl := membership.NewCluster("abc")
+	cl.SetStore(st)
+
+	testdir, err := ioutil.TempDir(os.TempDir(), "testsnapdir")
+	if err != nil {
+		t.Fatalf("couldn't open tempdir (%v)", err)
+	}
+	defer os.RemoveAll(testdir)
+	if err := os.MkdirAll(testdir+"/member/snap", 0755); err != nil {
+		t.Fatalf("couldn't make snap dir (%v)", err)
+	}
+
+	rs := raft.NewMemoryStorage()
+	p := mockstorage.NewStorageRecorderStream(testdir)
+	tr, snapDoneC := rafthttp.NewSnapTransporter(testdir)
+	r := newRaftNode(raftNodeConfig{
+		isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
+		Node:        n,
+		transport:   tr,
+		storage:     p,
+		raftStorage: rs,
+	})
+	s := &EtcdServer{
+		Cfg: &ServerConfig{
+			DataDir: testdir,
+		},
+		r:          *r,
+		store:      st,
+		cluster:    cl,
+		SyncTicker: &time.Ticker{},
+	}
+	s.applyV2 = &applierV2store{store: s.store, cluster: s.cluster}
+
+	be, tmpPath := backend.NewDefaultTmpBackend()
+	defer os.RemoveAll(tmpPath)
+	s.kv = mvcc.New(be, &lease.FakeLessor{}, &s.consistIndex)
+	s.be = be
+
+	s.start()
+	defer s.Stop()
+
+	actionc := p.Chan()
+	n.readyc <- raft.Ready{Messages: []raftpb.Message{{Type: raftpb.MsgSnap}}}
+	if ac := <-actionc; ac.Name != "Save" {
+		// MsgSnap triggers raftNode to call Save()
+		t.Fatalf("expect save() is called, but got %v", ac.Name)
+	}
+
+	// get the snapshot sent by the transport
+	snapMsg := <-snapDoneC
+
+	// Snapshot first triggers raftnode to persists the snapshot onto disk
+	// before renaming db snapshot file to db
+	snapMsg.Snapshot.Metadata.Index = 1
+	n.readyc <- raft.Ready{Snapshot: snapMsg.Snapshot}
+	var seenSaveSnap bool
+	timer := time.After(5 * time.Second)
+	for {
+		select {
+		case ac := <-actionc:
+			switch ac.Name {
+			// DBFilePath() is called immediately before snapshot renaming.
+			case "DBFilePath":
+				if !seenSaveSnap {
+					t.Fatalf("DBFilePath called before SaveSnap")
+				}
+				return
+			case "SaveSnap":
+				seenSaveSnap = true
+			default:
+				continue
+			}
+		case <-timer:
+			t.Fatalf("timeout waiting on actions")
+		}
+	}
+}
+
 // Applied > SnapCount should trigger a SaveSnap event
 func TestTriggerSnap(t *testing.T) {
 	be, tmpPath := backend.NewDefaultTmpBackend()

+ 59 - 0
etcdserver/util.go

@@ -15,11 +15,18 @@
 package etcdserver
 
 import (
+	"fmt"
+	"os"
 	"time"
 
 	"github.com/coreos/etcd/etcdserver/membership"
+	"github.com/coreos/etcd/lease"
+	"github.com/coreos/etcd/mvcc"
+	"github.com/coreos/etcd/mvcc/backend"
 	"github.com/coreos/etcd/pkg/types"
+	"github.com/coreos/etcd/raft/raftpb"
 	"github.com/coreos/etcd/rafthttp"
+	"github.com/coreos/etcd/snap"
 )
 
 // isConnectedToQuorumSince checks whether the local member is connected to the
@@ -95,3 +102,55 @@ func (nc *notifier) notify(err error) {
 	nc.err = err
 	close(nc.c)
 }
+
+// checkAndRecoverDB attempts to recover db in the scenario when
+// etcd server crashes before updating its in-state db
+// and after persisting snapshot to disk from syncing with leader,
+// snapshot can be newer than db where
+// (snapshot.Metadata.Index > db.consistentIndex ).
+//
+// when that happen:
+// 1. find xxx.snap.db that matches snap index.
+// 2. rename xxx.snap.db to db.
+// 3. open the new db as the backend.
+func checkAndRecoverDB(snapshot *raftpb.Snapshot, oldbe backend.Backend, quotaBackendBytes int64, snapdir string) (be backend.Backend, err error) {
+	var cIndex consistentIndex
+	kv := mvcc.New(oldbe, &lease.FakeLessor{}, &cIndex)
+	defer kv.Close()
+	kvindex := kv.ConsistentIndex()
+	if snapshot.Metadata.Index <= kvindex {
+		return oldbe, nil
+	}
+
+	id := snapshot.Metadata.Index
+	snapfn, err := snap.DBFilePathFromID(snapdir, id)
+	if err != nil {
+		return nil, fmt.Errorf("finding %v error: %v", snapdir+fmt.Sprintf("%016x.snap.db", id), err)
+	}
+
+	bepath := snapdir + databaseFilename
+	if err := os.Rename(snapfn, bepath); err != nil {
+		return nil, fmt.Errorf("rename snapshot file error: %v", err)
+	}
+
+	oldbe.Close()
+	be = openBackend(bepath, quotaBackendBytes)
+	return be, nil
+}
+
+func openBackend(bepath string, quotaBackendBytes int64) (be backend.Backend) {
+	beOpened := make(chan struct{})
+	go func() {
+		be = newBackend(bepath, quotaBackendBytes)
+		beOpened <- struct{}{}
+	}()
+
+	select {
+	case <-beOpened:
+	case <-time.After(time.Second):
+		plog.Warningf("another etcd process is running with the same data dir and holding the file lock.")
+		plog.Warningf("waiting for it to exit before starting...")
+		<-beOpened
+	}
+	return be
+}

+ 10 - 3
snap/db.go

@@ -15,6 +15,7 @@
 package snap
 
 import (
+	"errors"
 	"fmt"
 	"io"
 	"io/ioutil"
@@ -24,6 +25,8 @@ import (
 	"github.com/coreos/etcd/pkg/fileutil"
 )
 
+var ErrNoDBSnapshot = errors.New("snap: snapshot file doesn't exist")
+
 // SaveDBFrom saves snapshot of the database from the given reader. It
 // guarantees the save operation is atomic.
 func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) {
@@ -60,15 +63,19 @@ func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) {
 // DBFilePath returns the file path for the snapshot of the database with
 // given id. If the snapshot does not exist, it returns error.
 func (s *Snapshotter) DBFilePath(id uint64) (string, error) {
-	fns, err := fileutil.ReadDir(s.dir)
+	return DBFilePathFromID(s.dir, id)
+}
+
+func DBFilePathFromID(dbPath string, id uint64) (string, error) {
+	fns, err := fileutil.ReadDir(dbPath)
 	if err != nil {
 		return "", err
 	}
 	wfn := fmt.Sprintf("%016x.snap.db", id)
 	for _, fn := range fns {
 		if fn == wfn {
-			return filepath.Join(s.dir, fn), nil
+			return filepath.Join(dbPath, fn), nil
 		}
 	}
-	return "", fmt.Errorf("snap: snapshot file doesn't exist")
+	return "", ErrNoDBSnapshot
 }