Browse Source

Merge pull request #7917 from heyitsanthony/refactor-backend-paths

snap, etcdserver: tighten up snapshot path handling
Anthony Romano 8 years ago
parent
commit
90893735cf

+ 81 - 0
etcdserver/backend.go

@@ -0,0 +1,81 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package etcdserver
+
+import (
+	"fmt"
+	"os"
+	"time"
+
+	"github.com/coreos/etcd/lease"
+	"github.com/coreos/etcd/mvcc"
+	"github.com/coreos/etcd/mvcc/backend"
+	"github.com/coreos/etcd/raft/raftpb"
+	"github.com/coreos/etcd/snap"
+)
+
+func newBackend(cfg *ServerConfig) backend.Backend {
+	bcfg := backend.DefaultBackendConfig()
+	bcfg.Path = cfg.backendPath()
+	if cfg.QuotaBackendBytes > 0 && cfg.QuotaBackendBytes != DefaultQuotaBytes {
+		// permit 10% excess over quota for disarm
+		bcfg.MmapSize = uint64(cfg.QuotaBackendBytes + cfg.QuotaBackendBytes/10)
+	}
+	return backend.New(bcfg)
+}
+
+// openSnapshotBackend renames a snapshot db to the current etcd db and opens it.
+func openSnapshotBackend(cfg *ServerConfig, ss *snap.Snapshotter, snapshot raftpb.Snapshot) (backend.Backend, error) {
+	snapPath, err := ss.DBFilePath(snapshot.Metadata.Index)
+	if err != nil {
+		return nil, fmt.Errorf("database snapshot file path error: %v", err)
+	}
+	if err := os.Rename(snapPath, cfg.backendPath()); err != nil {
+		return nil, fmt.Errorf("rename snapshot file error: %v", err)
+	}
+	return openBackend(cfg), nil
+}
+
+// openBackend returns a backend using the current etcd db.
+func openBackend(cfg *ServerConfig) backend.Backend {
+	fn := cfg.backendPath()
+	beOpened := make(chan backend.Backend)
+	go func() {
+		beOpened <- newBackend(cfg)
+	}()
+	select {
+	case be := <-beOpened:
+		return be
+	case <-time.After(time.Second):
+		plog.Warningf("another etcd process is using %q and holds the file lock.", fn)
+		plog.Warningf("waiting for it to exit before starting...")
+	}
+	return <-beOpened
+}
+
+// recoverBackendSnapshot recovers the DB from a snapshot in case etcd crashes
+// before updating the backend db after persisting raft snapshot to disk,
+// violating the invariant snapshot.Metadata.Index < db.consistentIndex. In this
+// case, replace the db with the snapshot db sent by the leader.
+func recoverSnapshotBackend(cfg *ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot) (backend.Backend, error) {
+	var cIndex consistentIndex
+	kv := mvcc.New(oldbe, &lease.FakeLessor{}, &cIndex)
+	defer kv.Close()
+	if snapshot.Metadata.Index <= kv.ConsistentIndex() {
+		return oldbe, nil
+	}
+	oldbe.Close()
+	return openSnapshotBackend(cfg, snap.New(cfg.SnapDir()), snapshot)
+}

+ 2 - 0
etcdserver/config.go

@@ -200,3 +200,5 @@ func (c *ServerConfig) bootstrapTimeout() time.Duration {
 	}
 	return time.Second
 }
+
+func (c *ServerConfig) backendPath() string { return filepath.Join(c.SnapDir(), "db") }

+ 14 - 34
etcdserver/server.go

@@ -23,7 +23,6 @@ import (
 	"net/http"
 	"os"
 	"path"
-	"path/filepath"
 	"regexp"
 	"sync"
 	"sync/atomic"
@@ -76,7 +75,6 @@ const (
 	// (since it will timeout).
 	monitorVersionInterval = rafthttp.ConnWriteTimeout - time.Second
 
-	databaseFilename = "db"
 	// max number of in-flight snapshot messages etcdserver allows to have
 	// This number is more than enough for most clusters with 5 machines.
 	maxInFlightMsgSnap = 16
@@ -200,7 +198,8 @@ type EtcdServer struct {
 
 	cluster *membership.RaftCluster
 
-	store store.Store
+	store       store.Store
+	snapshotter *snap.Snapshotter
 
 	applyV2 ApplierV2
 
@@ -271,10 +270,9 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
 	}
 	ss := snap.New(cfg.SnapDir())
 
-	bepath := filepath.Join(cfg.SnapDir(), databaseFilename)
+	bepath := cfg.backendPath()
 	beExist := fileutil.Exist(bepath)
-
-	be := openBackend(bepath, cfg.QuotaBackendBytes)
+	be := openBackend(cfg)
 
 	defer func() {
 		if err != nil {
@@ -372,9 +370,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
 				plog.Panicf("recovered store from snapshot error: %v", err)
 			}
 			plog.Infof("recovered store from snapshot at index %d", snapshot.Metadata.Index)
-
-			be, err = checkAndRecoverDB(snapshot, be, cfg.QuotaBackendBytes, cfg.SnapDir())
-			if err != nil {
+			if be, err = recoverSnapshotBackend(cfg, be, *snapshot); err != nil {
 				plog.Panicf("recovering backend from snapshot error: %v", err)
 			}
 		}
@@ -408,11 +404,12 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
 
 	heartbeat := time.Duration(cfg.TickMs) * time.Millisecond
 	srv = &EtcdServer{
-		readych:   make(chan struct{}),
-		Cfg:       cfg,
-		snapCount: cfg.SnapCount,
-		errorc:    make(chan error, 1),
-		store:     st,
+		readych:     make(chan struct{}),
+		Cfg:         cfg,
+		snapCount:   cfg.SnapCount,
+		errorc:      make(chan error, 1),
+		store:       st,
+		snapshotter: ss,
 		r: *newRaftNode(
 			raftNodeConfig{
 				isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
@@ -795,21 +792,14 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
 			apply.snapshot.Metadata.Index, ep.appliedi)
 	}
 
-	// wait for raftNode to persist snashot onto the disk
+	// wait for raftNode to persist snapshot onto the disk
 	<-apply.notifyc
 
-	snapfn, err := s.r.storage.DBFilePath(apply.snapshot.Metadata.Index)
+	newbe, err := openSnapshotBackend(s.Cfg, s.snapshotter, apply.snapshot)
 	if err != nil {
-		plog.Panicf("get database snapshot file path error: %v", err)
-	}
-
-	fn := filepath.Join(s.Cfg.SnapDir(), databaseFilename)
-	if err := os.Rename(snapfn, fn); err != nil {
-		plog.Panicf("rename snapshot file error: %v", err)
+		plog.Panic(err)
 	}
 
-	newbe := newBackend(fn, s.Cfg.QuotaBackendBytes)
-
 	// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
 	// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
 	if s.lessor != nil {
@@ -1662,13 +1652,3 @@ func (s *EtcdServer) goAttach(f func()) {
 		f()
 	}()
 }
-
-func newBackend(path string, quotaBytes int64) backend.Backend {
-	bcfg := backend.DefaultBackendConfig()
-	bcfg.Path = path
-	if quotaBytes > 0 && quotaBytes != DefaultQuotaBytes {
-		// permit 10% excess over quota for disarm
-		bcfg.MmapSize = uint64(quotaBytes + quotaBytes/10)
-	}
-	return backend.New(bcfg)
-}

+ 40 - 43
etcdserver/server_test.go

@@ -20,6 +20,7 @@ import (
 	"io/ioutil"
 	"os"
 	"path"
+	"path/filepath"
 	"reflect"
 	"testing"
 	"time"
@@ -29,6 +30,7 @@ import (
 	"github.com/coreos/etcd/lease"
 	"github.com/coreos/etcd/mvcc"
 	"github.com/coreos/etcd/mvcc/backend"
+	"github.com/coreos/etcd/pkg/fileutil"
 	"github.com/coreos/etcd/pkg/idutil"
 	"github.com/coreos/etcd/pkg/mock/mockstorage"
 	"github.com/coreos/etcd/pkg/mock/mockstore"
@@ -40,6 +42,7 @@ import (
 	"github.com/coreos/etcd/raft"
 	"github.com/coreos/etcd/raft/raftpb"
 	"github.com/coreos/etcd/rafthttp"
+	"github.com/coreos/etcd/snap"
 	"github.com/coreos/etcd/store"
 	"golang.org/x/net/context"
 )
@@ -964,13 +967,15 @@ func TestSnapshotOrdering(t *testing.T) {
 		t.Fatalf("couldn't open tempdir (%v)", err)
 	}
 	defer os.RemoveAll(testdir)
-	if err := os.MkdirAll(testdir+"/member/snap", 0755); err != nil {
+
+	snapdir := filepath.Join(testdir, "member", "snap")
+	if err := os.MkdirAll(snapdir, 0755); err != nil {
 		t.Fatalf("couldn't make snap dir (%v)", err)
 	}
 
 	rs := raft.NewMemoryStorage()
 	p := mockstorage.NewStorageRecorderStream(testdir)
-	tr, snapDoneC := rafthttp.NewSnapTransporter(testdir)
+	tr, snapDoneC := rafthttp.NewSnapTransporter(snapdir)
 	r := newRaftNode(raftNodeConfig{
 		isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
 		Node:        n,
@@ -982,10 +987,11 @@ func TestSnapshotOrdering(t *testing.T) {
 		Cfg: &ServerConfig{
 			DataDir: testdir,
 		},
-		r:          *r,
-		store:      st,
-		cluster:    cl,
-		SyncTicker: &time.Ticker{},
+		r:           *r,
+		store:       st,
+		snapshotter: snap.New(snapdir),
+		cluster:     cl,
+		SyncTicker:  &time.Ticker{},
 	}
 	s.applyV2 = &applierV2store{store: s.store, cluster: s.cluster}
 
@@ -997,40 +1003,30 @@ func TestSnapshotOrdering(t *testing.T) {
 	s.start()
 	defer s.Stop()
 
-	actionc := p.Chan()
 	n.readyc <- raft.Ready{Messages: []raftpb.Message{{Type: raftpb.MsgSnap}}}
-	if ac := <-actionc; ac.Name != "Save" {
-		// MsgSnap triggers raftNode to call Save()
-		t.Fatalf("expect save() is called, but got %v", ac.Name)
-	}
-
-	// get the snapshot sent by the transport
-	snapMsg := <-snapDoneC
-
-	// Snapshot first triggers raftnode to persists the snapshot onto disk
-	// before renaming db snapshot file to db
-	snapMsg.Snapshot.Metadata.Index = 1
-	n.readyc <- raft.Ready{Snapshot: snapMsg.Snapshot}
-	var seenSaveSnap bool
-	timer := time.After(5 * time.Second)
-	for {
-		select {
-		case ac := <-actionc:
-			switch ac.Name {
-			// DBFilePath() is called immediately before snapshot renaming.
-			case "DBFilePath":
-				if !seenSaveSnap {
-					t.Fatalf("DBFilePath called before SaveSnap")
-				}
-				return
-			case "SaveSnap":
-				seenSaveSnap = true
-			default:
-				continue
-			}
-		case <-timer:
-			t.Fatalf("timeout waiting on actions")
-		}
+	go func() {
+		// get the snapshot sent by the transport
+		snapMsg := <-snapDoneC
+		// Snapshot first triggers raftnode to persists the snapshot onto disk
+		// before renaming db snapshot file to db
+		snapMsg.Snapshot.Metadata.Index = 1
+		n.readyc <- raft.Ready{Snapshot: snapMsg.Snapshot}
+	}()
+
+	if ac := <-p.Chan(); ac.Name != "Save" {
+		t.Fatalf("expected Save, got %+v", ac)
+	}
+	if ac := <-p.Chan(); ac.Name != "Save" {
+		t.Fatalf("expected Save, got %+v", ac)
+	}
+	// confirm snapshot file still present before calling SaveSnap
+	snapPath := filepath.Join(snapdir, fmt.Sprintf("%016x.snap.db", 1))
+	if !fileutil.Exist(snapPath) {
+		t.Fatalf("expected file %q, got missing", snapPath)
+	}
+	// unblock SaveSnapshot, etcdserver now permitted to move snapshot file
+	if ac := <-p.Chan(); ac.Name != "SaveSnap" {
+		t.Fatalf("expected SaveSnap, got %+v", ac)
 	}
 }
 
@@ -1119,10 +1115,11 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
 		Cfg: &ServerConfig{
 			DataDir: testdir,
 		},
-		r:          *r,
-		store:      st,
-		cluster:    cl,
-		SyncTicker: &time.Ticker{},
+		r:           *r,
+		store:       st,
+		snapshotter: snap.New(testdir),
+		cluster:     cl,
+		SyncTicker:  &time.Ticker{},
 	}
 	s.applyV2 = &applierV2store{store: s.store, cluster: s.cluster}
 

+ 0 - 3
etcdserver/storage.go

@@ -32,9 +32,6 @@ type Storage interface {
 	Save(st raftpb.HardState, ents []raftpb.Entry) error
 	// SaveSnap function saves snapshot to the underlying stable storage.
 	SaveSnap(snap raftpb.Snapshot) error
-	// DBFilePath returns the file path of database snapshot saved with given
-	// id.
-	DBFilePath(id uint64) (string, error)
 	// Close closes the Storage and performs finalization.
 	Close() error
 }

+ 0 - 59
etcdserver/util.go

@@ -15,18 +15,11 @@
 package etcdserver
 
 import (
-	"fmt"
-	"os"
 	"time"
 
 	"github.com/coreos/etcd/etcdserver/membership"
-	"github.com/coreos/etcd/lease"
-	"github.com/coreos/etcd/mvcc"
-	"github.com/coreos/etcd/mvcc/backend"
 	"github.com/coreos/etcd/pkg/types"
-	"github.com/coreos/etcd/raft/raftpb"
 	"github.com/coreos/etcd/rafthttp"
-	"github.com/coreos/etcd/snap"
 )
 
 // isConnectedToQuorumSince checks whether the local member is connected to the
@@ -102,55 +95,3 @@ func (nc *notifier) notify(err error) {
 	nc.err = err
 	close(nc.c)
 }
-
-// checkAndRecoverDB attempts to recover db in the scenario when
-// etcd server crashes before updating its in-state db
-// and after persisting snapshot to disk from syncing with leader,
-// snapshot can be newer than db where
-// (snapshot.Metadata.Index > db.consistentIndex ).
-//
-// when that happen:
-// 1. find xxx.snap.db that matches snap index.
-// 2. rename xxx.snap.db to db.
-// 3. open the new db as the backend.
-func checkAndRecoverDB(snapshot *raftpb.Snapshot, oldbe backend.Backend, quotaBackendBytes int64, snapdir string) (be backend.Backend, err error) {
-	var cIndex consistentIndex
-	kv := mvcc.New(oldbe, &lease.FakeLessor{}, &cIndex)
-	defer kv.Close()
-	kvindex := kv.ConsistentIndex()
-	if snapshot.Metadata.Index <= kvindex {
-		return oldbe, nil
-	}
-
-	id := snapshot.Metadata.Index
-	snapfn, err := snap.DBFilePathFromID(snapdir, id)
-	if err != nil {
-		return nil, fmt.Errorf("finding %v error: %v", snapdir+fmt.Sprintf("%016x.snap.db", id), err)
-	}
-
-	bepath := snapdir + databaseFilename
-	if err := os.Rename(snapfn, bepath); err != nil {
-		return nil, fmt.Errorf("rename snapshot file error: %v", err)
-	}
-
-	oldbe.Close()
-	be = openBackend(bepath, quotaBackendBytes)
-	return be, nil
-}
-
-func openBackend(bepath string, quotaBackendBytes int64) (be backend.Backend) {
-	beOpened := make(chan struct{})
-	go func() {
-		be = newBackend(bepath, quotaBackendBytes)
-		beOpened <- struct{}{}
-	}()
-
-	select {
-	case <-beOpened:
-	case <-time.After(time.Second):
-		plog.Warningf("another etcd process is running with the same data dir and holding the file lock.")
-		plog.Warningf("waiting for it to exit before starting...")
-		<-beOpened
-	}
-	return be
-}

+ 0 - 11
pkg/mock/mockstorage/storage_recorder.go

@@ -15,8 +15,6 @@
 package mockstorage
 
 import (
-	"fmt"
-
 	"github.com/coreos/etcd/pkg/testutil"
 	"github.com/coreos/etcd/raft"
 	"github.com/coreos/etcd/raft/raftpb"
@@ -47,13 +45,4 @@ func (p *storageRecorder) SaveSnap(st raftpb.Snapshot) error {
 	return nil
 }
 
-func (p *storageRecorder) DBFilePath(id uint64) (string, error) {
-	p.Record(testutil.Action{Name: "DBFilePath"})
-	path := p.dbPath
-	if path != "" {
-		path = path + "/"
-	}
-	return fmt.Sprintf("%s%016x.snap.db", path, id), nil
-}
-
 func (p *storageRecorder) Close() error { return nil }

+ 8 - 12
snap/db.go

@@ -44,7 +44,7 @@ func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) {
 		os.Remove(f.Name())
 		return n, err
 	}
-	fn := filepath.Join(s.dir, fmt.Sprintf("%016x.snap.db", id))
+	fn := s.dbFilePath(id)
 	if fileutil.Exist(fn) {
 		os.Remove(f.Name())
 		return n, nil
@@ -63,19 +63,15 @@ func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) {
 // DBFilePath returns the file path for the snapshot of the database with
 // given id. If the snapshot does not exist, it returns error.
 func (s *Snapshotter) DBFilePath(id uint64) (string, error) {
-	return DBFilePathFromID(s.dir, id)
-}
-
-func DBFilePathFromID(dbPath string, id uint64) (string, error) {
-	fns, err := fileutil.ReadDir(dbPath)
-	if err != nil {
+	if _, err := fileutil.ReadDir(s.dir); err != nil {
 		return "", err
 	}
-	wfn := fmt.Sprintf("%016x.snap.db", id)
-	for _, fn := range fns {
-		if fn == wfn {
-			return filepath.Join(dbPath, fn), nil
-		}
+	if fn := s.dbFilePath(id); fileutil.Exist(fn) {
+		return fn, nil
 	}
 	return "", ErrNoDBSnapshot
 }
+
+func (s *Snapshotter) dbFilePath(id uint64) string {
+	return filepath.Join(s.dir, fmt.Sprintf("%016x.snap.db", id))
+}