Browse Source

*: rewrite snapshot sending

Xiang Li 10 years ago
parent
commit
23bd60ccce

+ 0 - 2
etcdserver/config.go

@@ -121,8 +121,6 @@ func (c *ServerConfig) WALDir() string {
 
 func (c *ServerConfig) SnapDir() string { return path.Join(c.MemberDir(), "snap") }
 
-func (c *ServerConfig) StorageDir() string { return path.Join(c.MemberDir(), "storage") }
-
 func (c *ServerConfig) ShouldDiscover() bool { return c.DiscoveryURL != "" }
 
 // ReqTimeout returns timeout for request to finish.

+ 7 - 8
etcdserver/raft.go

@@ -109,7 +109,7 @@ type raftNode struct {
 
 	// utility
 	ticker      <-chan time.Time
-	raftStorage *raftStorage
+	raftStorage *raft.MemoryStorage
 	storage     Storage
 	// transport specifies the transport to send and receive msgs to members.
 	// Sending messages MUST NOT block. It is okay to drop messages, since
@@ -126,7 +126,6 @@ type raftNode struct {
 // TODO: Ideally raftNode should get rid of the passed in server structure.
 func (r *raftNode) start(s *EtcdServer) {
 	r.s = s
-	r.raftStorage.raftStarted = true
 	r.applyc = make(chan apply)
 	r.stopped = make(chan struct{})
 	r.done = make(chan struct{})
@@ -245,7 +244,7 @@ func advanceTicksForElection(n raft.Node, electionTicks int) {
 	}
 }
 
-func startNode(cfg *ServerConfig, cl *cluster, ids []types.ID) (id types.ID, n raft.Node, s *raftStorage, w *wal.WAL) {
+func startNode(cfg *ServerConfig, cl *cluster, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) {
 	var err error
 	member := cl.MemberByName(cfg.Name)
 	metadata := pbutil.MustMarshal(
@@ -270,7 +269,7 @@ func startNode(cfg *ServerConfig, cl *cluster, ids []types.ID) (id types.ID, n r
 	}
 	id = member.ID
 	plog.Infof("starting member %s in cluster %s", id, cl.ID())
-	s = newRaftStorage()
+	s = raft.NewMemoryStorage()
 	c := &raft.Config{
 		ID:              uint64(id),
 		ElectionTick:    cfg.ElectionTicks,
@@ -287,7 +286,7 @@ func startNode(cfg *ServerConfig, cl *cluster, ids []types.ID) (id types.ID, n r
 	return
 }
 
-func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *cluster, raft.Node, *raftStorage, *wal.WAL) {
+func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *cluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
 	var walsnap walpb.Snapshot
 	if snapshot != nil {
 		walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
@@ -297,7 +296,7 @@ func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *clust
 	plog.Infof("restarting member %s in cluster %s at commit index %d", id, cid, st.Commit)
 	cl := newCluster("")
 	cl.SetID(cid)
-	s := newRaftStorage()
+	s := raft.NewMemoryStorage()
 	if snapshot != nil {
 		s.ApplySnapshot(*snapshot)
 	}
@@ -319,7 +318,7 @@ func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *clust
 	return id, cl, n, s, w
 }
 
-func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *cluster, raft.Node, *raftStorage, *wal.WAL) {
+func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *cluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
 	var walsnap walpb.Snapshot
 	if snapshot != nil {
 		walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
@@ -351,7 +350,7 @@ func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (type
 	plog.Printf("forcing restart of member %s in cluster %s at commit index %d", id, cid, st.Commit)
 	cl := newCluster("")
 	cl.SetID(cid)
-	s := newRaftStorage()
+	s := raft.NewMemoryStorage()
 	if snapshot != nil {
 		s.ApplySnapshot(*snapshot)
 	}

+ 0 - 64
etcdserver/raft_storage.go

@@ -1,64 +0,0 @@
-// Copyright 2015 CoreOS, Inc.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package etcdserver
-
-import (
-	"github.com/coreos/etcd/raft"
-	"github.com/coreos/etcd/raft/raftpb"
-)
-
-type raftStorage struct {
-	*raft.MemoryStorage
-	// snapStore is the place to request snapshot when v3demo is enabled.
-	// If snapStore is nil, it uses the snapshot saved in MemoryStorage.
-	snapStore *snapshotStore
-	// raftStarted indicates whether raft starts to function. If not, it cannot
-	// request snapshot, and should get snapshot from MemoryStorage.
-	raftStarted bool
-}
-
-func newRaftStorage() *raftStorage {
-	return &raftStorage{
-		MemoryStorage: raft.NewMemoryStorage(),
-	}
-}
-
-func (rs *raftStorage) reqsnap() <-chan struct{} {
-	if rs.snapStore == nil {
-		return nil
-	}
-	return rs.snapStore.reqsnapc
-}
-
-func (rs *raftStorage) raftsnap() chan<- raftpb.Snapshot {
-	if rs.snapStore == nil {
-		return nil
-	}
-	return rs.snapStore.raftsnapc
-}
-
-// Snapshot returns raft snapshot. If snapStore is nil or raft is not started, this method
-// returns snapshot saved in MemoryStorage. Otherwise, this method
-// returns snapshot from snapStore.
-func (rs *raftStorage) Snapshot() (raftpb.Snapshot, error) {
-	if rs.snapStore == nil || !rs.raftStarted {
-		return rs.MemoryStorage.Snapshot()
-	}
-	snap, err := rs.snapStore.getSnap()
-	if err != nil {
-		return raftpb.Snapshot{}, err
-	}
-	return snap.raft(), nil
-}

+ 1 - 1
etcdserver/raft_test.go

@@ -153,7 +153,7 @@ func TestStopRaftWhenWaitingForApplyDone(t *testing.T) {
 	r := raftNode{
 		Node:        n,
 		storage:     &storageRecorder{},
-		raftStorage: newRaftStorage(),
+		raftStorage: raft.NewMemoryStorage(),
 		transport:   &nopTransporter{},
 	}
 	r.start(&EtcdServer{r: r})

+ 39 - 54
etcdserver/server.go

@@ -19,7 +19,6 @@ import (
 	"errors"
 	"expvar"
 	"fmt"
-	"log"
 	"math/rand"
 	"net/http"
 	"os"
@@ -65,6 +64,9 @@ const (
 	monitorVersionInterval = 5 * time.Second
 
 	databaseFilename = "db"
+	// max number of in-flight snapshot messages etcdserver allows to have
+	// This number is more than enough for most clusters with 5 machines.
+	maxInFlightMsgSnap = 16
 )
 
 var (
@@ -177,19 +179,23 @@ type EtcdServer struct {
 	// forceVersionC is used to force the version monitor loop
 	// to detect the cluster version immediately.
 	forceVersionC chan struct{}
+
+	msgSnapC chan raftpb.Message
 }
 
 // NewServer creates a new EtcdServer from the supplied configuration. The
 // configuration is considered static for the lifetime of the EtcdServer.
 func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
 	st := store.New(StoreClusterPrefix, StoreKeysPrefix)
-	var w *wal.WAL
-	var n raft.Node
-	var s *raftStorage
-	var id types.ID
-	var cl *cluster
+	var (
+		w  *wal.WAL
+		n  raft.Node
+		s  *raft.MemoryStorage
+		id types.ID
+		cl *cluster
+	)
 
-	if !cfg.V3demo && fileutil.Exist(path.Join(cfg.StorageDir(), databaseFilename)) {
+	if !cfg.V3demo && fileutil.Exist(path.Join(cfg.SnapDir(), databaseFilename)) {
 		return nil, errors.New("experimental-v3demo cannot be disabled once it is enabled")
 	}
 
@@ -340,18 +346,14 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
 		versionRt:     prt,
 		reqIDGen:      idutil.NewGenerator(uint8(id), time.Now()),
 		forceVersionC: make(chan struct{}),
+		msgSnapC:      make(chan raftpb.Message, maxInFlightMsgSnap),
 	}
 
 	if cfg.V3demo {
-		err = os.MkdirAll(cfg.StorageDir(), privateDirMode)
-		if err != nil && err != os.ErrExist {
-			return nil, err
-		}
-		srv.kv = dstorage.New(path.Join(cfg.StorageDir(), databaseFilename), &srv.consistIndex)
+		srv.kv = dstorage.New(path.Join(cfg.SnapDir(), databaseFilename), &srv.consistIndex)
 		if err := srv.kv.Restore(); err != nil {
 			plog.Fatalf("v3 storage restore error: %v", err)
 		}
-		s.snapStore = newSnapshotStore(cfg.StorageDir(), srv.kv)
 	}
 
 	// TODO: move transport initialization near the definition of remote
@@ -361,7 +363,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
 		ID:          id,
 		ClusterID:   cl.ID(),
 		Raft:        srv,
-		SnapSaver:   s.snapStore,
+		Snapshotter: ss,
 		ServerStats: sstats,
 		LeaderStats: lstats,
 		ErrorC:      srv.errorc,
@@ -383,10 +385,6 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
 	}
 	srv.r.transport = tr
 
-	if cfg.V3demo {
-		s.snapStore.tr = tr
-	}
-
 	return srv, nil
 }
 
@@ -465,9 +463,6 @@ func (s *EtcdServer) ReportUnreachable(id uint64) { s.r.ReportUnreachable(id) }
 // and clears the used snapshot from the snapshot store.
 func (s *EtcdServer) ReportSnapshot(id uint64, status raft.SnapshotStatus) {
 	s.r.ReportSnapshot(id, status)
-	if s.cfg.V3demo {
-		s.r.raftStorage.snapStore.clearUsedSnap()
-	}
 }
 
 func (s *EtcdServer) run() {
@@ -496,12 +491,12 @@ func (s *EtcdServer) run() {
 				}
 
 				if s.cfg.V3demo {
-					snapfn, err := s.r.raftStorage.snapStore.getSnapFilePath(apply.snapshot.Metadata.Index)
+					snapfn, err := s.r.storage.DBFilePath(apply.snapshot.Metadata.Index)
 					if err != nil {
-						plog.Panicf("get snapshot file path error: %v", err)
+						plog.Panicf("get database snapshot file path error: %v", err)
 					}
 
-					fn := path.Join(s.cfg.StorageDir(), databaseFilename)
+					fn := path.Join(s.cfg.SnapDir(), databaseFilename)
 					if err := os.Rename(snapfn, fn); err != nil {
 						plog.Panicf("rename snapshot file error: %v", err)
 					}
@@ -514,7 +509,6 @@ func (s *EtcdServer) run() {
 					oldKV := s.kv
 					// TODO: swap the kv pointer atomically
 					s.kv = newKV
-					s.r.raftStorage.snapStore.kv = newKV
 
 					// Closing oldKV might block until all the txns
 					// on the kv are finished.
@@ -571,9 +565,9 @@ func (s *EtcdServer) run() {
 				s.snapshot(appliedi, confState)
 				snapi = appliedi
 			}
-		case <-s.r.raftStorage.reqsnap():
-			s.r.raftStorage.raftsnap() <- s.createRaftSnapshot(appliedi, confState)
-			plog.Infof("requested snapshot created at %d", appliedi)
+		case m := <-s.msgSnapC:
+			merged := s.createMergedSnapshotMessage(m, appliedi, confState)
+			s.r.transport.SendSnapshot(merged)
 		case err := <-s.errorc:
 			plog.Errorf("%s", err)
 			plog.Infof("the data-dir used by this member must be removed.")
@@ -828,7 +822,24 @@ func (s *EtcdServer) send(ms []raftpb.Message) {
 		if s.cluster.IsIDRemoved(types.ID(ms[i].To)) {
 			ms[i].To = 0
 		}
+
+		if s.cfg.V3demo {
+			if ms[i].Type == raftpb.MsgSnap {
+				// There are two separate data store when v3 demo is enabled: the store for v2,
+				// and the KV for v3.
+				// The msgSnap only contains the most recent snapshot of store without KV.
+				// So we need to redirect the msgSnap to etcd server main loop for merging in the
+				// current store snapshot and KV snapshot.
+				select {
+				case s.msgSnapC <- ms[i]:
+				default:
+					// drop msgSnap if the inflight chan if full.
+				}
+				ms[i].To = 0
+			}
+		}
 	}
+
 	s.r.transport.Send(ms)
 }
 
@@ -998,29 +1009,6 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
 	return false, nil
 }
 
-// createRaftSnapshot creates a raft snapshot that includes the state of store for v2 api.
-func (s *EtcdServer) createRaftSnapshot(snapi uint64, confState raftpb.ConfState) raftpb.Snapshot {
-	snapt, err := s.r.raftStorage.Term(snapi)
-	if err != nil {
-		log.Panicf("get term should never fail: %v", err)
-	}
-
-	clone := s.store.Clone()
-	d, err := clone.SaveNoCopy()
-	if err != nil {
-		plog.Panicf("store save should never fail: %v", err)
-	}
-
-	return raftpb.Snapshot{
-		Metadata: raftpb.SnapshotMetadata{
-			Index:     snapi,
-			Term:      snapt,
-			ConfState: confState,
-		},
-		Data: d,
-	}
-}
-
 // TODO: non-blocking snapshot
 func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
 	clone := s.store.Clone()
@@ -1068,9 +1056,6 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
 			plog.Panicf("unexpected compaction error %v", err)
 		}
 		plog.Infof("compacted raft log at %d", compacti)
-		if s.cfg.V3demo && s.r.raftStorage.snapStore.closeSnapBefore(compacti) {
-			plog.Infof("closed snapshot stored due to compaction at %d", compacti)
-		}
 	}()
 }
 

+ 30 - 65
etcdserver/server_test.go

@@ -17,7 +17,6 @@ package etcdserver
 import (
 	"encoding/json"
 	"fmt"
-	"io"
 	"net/http"
 	"path"
 	"reflect"
@@ -33,6 +32,7 @@ import (
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/raft"
 	"github.com/coreos/etcd/raft/raftpb"
+	"github.com/coreos/etcd/snap"
 	"github.com/coreos/etcd/store"
 )
 
@@ -523,7 +523,7 @@ func TestDoProposal(t *testing.T) {
 			r: raftNode{
 				Node:        newNodeCommitter(),
 				storage:     &storageRecorder{},
-				raftStorage: newRaftStorage(),
+				raftStorage: raft.NewMemoryStorage(),
 				transport:   &nopTransporter{},
 			},
 			store:    st,
@@ -675,7 +675,7 @@ func TestSyncTrigger(t *testing.T) {
 		cfg: &ServerConfig{TickMs: 1},
 		r: raftNode{
 			Node:        n,
-			raftStorage: newRaftStorage(),
+			raftStorage: raft.NewMemoryStorage(),
 			transport:   &nopTransporter{},
 			storage:     &storageRecorder{},
 		},
@@ -712,51 +712,9 @@ func TestSyncTrigger(t *testing.T) {
 	}
 }
 
-func TestCreateRaftSnapshot(t *testing.T) {
-	s := newRaftStorage()
-	s.Append([]raftpb.Entry{{Index: 1, Term: 1}})
-	st := &storeRecorder{}
-	srv := &EtcdServer{
-		r: raftNode{
-			raftStorage: s,
-		},
-		store: st,
-	}
-
-	snap := srv.createRaftSnapshot(1, raftpb.ConfState{Nodes: []uint64{1}})
-	wdata, err := st.Save()
-	if err != nil {
-		t.Fatal(err)
-	}
-	wsnap := raftpb.Snapshot{
-		Metadata: raftpb.SnapshotMetadata{
-			Index:     1,
-			Term:      1,
-			ConfState: raftpb.ConfState{Nodes: []uint64{1}},
-		},
-		Data: wdata,
-	}
-	if !reflect.DeepEqual(snap, wsnap) {
-		t.Errorf("snap = %+v, want %+v", snap, wsnap)
-	}
-
-	gaction := st.Action()
-	// the third action is store.Save used in testing
-	if len(gaction) != 3 {
-		t.Fatalf("len(action) = %d, want 3", len(gaction))
-	}
-	if !reflect.DeepEqual(gaction[0], testutil.Action{Name: "Clone"}) {
-		t.Errorf("action = %s, want Clone", gaction[0])
-	}
-	if !reflect.DeepEqual(gaction[1], testutil.Action{Name: "SaveNoCopy"}) {
-		t.Errorf("action = %s, want SaveNoCopy", gaction[1])
-	}
-
-}
-
 // snapshot should snapshot the store and cut the persistent
 func TestSnapshot(t *testing.T) {
-	s := newRaftStorage()
+	s := raft.NewMemoryStorage()
 	s.Append([]raftpb.Entry{{Index: 1}})
 	st := &storeRecorder{}
 	p := &storageRecorder{}
@@ -800,7 +758,7 @@ func TestTriggerSnap(t *testing.T) {
 		snapCount: uint64(snapc),
 		r: raftNode{
 			Node:        newNodeCommitter(),
-			raftStorage: newRaftStorage(),
+			raftStorage: raft.NewMemoryStorage(),
 			storage:     p,
 			transport:   &nopTransporter{},
 		},
@@ -841,7 +799,7 @@ func TestRecvSnapshot(t *testing.T) {
 			Node:        n,
 			transport:   &nopTransporter{},
 			storage:     p,
-			raftStorage: newRaftStorage(),
+			raftStorage: raft.NewMemoryStorage(),
 		},
 		store:   st,
 		cluster: cl,
@@ -874,7 +832,7 @@ func TestApplySnapshotAndCommittedEntries(t *testing.T) {
 	st := &storeRecorder{}
 	cl := newCluster("abc")
 	cl.SetStore(store.New())
-	storage := newRaftStorage()
+	storage := raft.NewMemoryStorage()
 	s := &EtcdServer{
 		cfg: &ServerConfig{},
 		r: raftNode{
@@ -923,7 +881,7 @@ func TestAddMember(t *testing.T) {
 	s := &EtcdServer{
 		r: raftNode{
 			Node:        n,
-			raftStorage: newRaftStorage(),
+			raftStorage: raft.NewMemoryStorage(),
 			storage:     &storageRecorder{},
 			transport:   &nopTransporter{},
 		},
@@ -963,7 +921,7 @@ func TestRemoveMember(t *testing.T) {
 	s := &EtcdServer{
 		r: raftNode{
 			Node:        n,
-			raftStorage: newRaftStorage(),
+			raftStorage: raft.NewMemoryStorage(),
 			storage:     &storageRecorder{},
 			transport:   &nopTransporter{},
 		},
@@ -1002,7 +960,7 @@ func TestUpdateMember(t *testing.T) {
 	s := &EtcdServer{
 		r: raftNode{
 			Node:        n,
-			raftStorage: newRaftStorage(),
+			raftStorage: raft.NewMemoryStorage(),
 			storage:     &storageRecorder{},
 			transport:   &nopTransporter{},
 		},
@@ -1355,12 +1313,19 @@ func (p *storageRecorder) Save(st raftpb.HardState, ents []raftpb.Entry) error {
 	p.Record(testutil.Action{Name: "Save"})
 	return nil
 }
+
 func (p *storageRecorder) SaveSnap(st raftpb.Snapshot) error {
 	if !raft.IsEmptySnap(st) {
 		p.Record(testutil.Action{Name: "SaveSnap"})
 	}
 	return nil
 }
+
+func (p *storageRecorder) DBFilePath(id uint64) (string, error) {
+	p.Record(testutil.Action{Name: "DBFilePath"})
+	return fmt.Sprintf("%016x.snap.db", id), nil
+}
+
 func (p *storageRecorder) Close() error { return nil }
 
 type nodeRecorder struct{ testutil.Recorder }
@@ -1477,16 +1442,16 @@ func (n *readyNode) Ready() <-chan raft.Ready { return n.readyc }
 
 type nopTransporter struct{}
 
-func (s *nopTransporter) Start() error                                 { return nil }
-func (s *nopTransporter) Handler() http.Handler                        { return nil }
-func (s *nopTransporter) Send(m []raftpb.Message)                      {}
-func (s *nopTransporter) AddRemote(id types.ID, us []string)           {}
-func (s *nopTransporter) AddPeer(id types.ID, us []string)             {}
-func (s *nopTransporter) RemovePeer(id types.ID)                       {}
-func (s *nopTransporter) RemoveAllPeers()                              {}
-func (s *nopTransporter) UpdatePeer(id types.ID, us []string)          {}
-func (s *nopTransporter) ActiveSince(id types.ID) time.Time            { return time.Time{} }
-func (s *nopTransporter) SnapshotReady(rc io.ReadCloser, index uint64) {}
-func (s *nopTransporter) Stop()                                        {}
-func (s *nopTransporter) Pause()                                       {}
-func (s *nopTransporter) Resume()                                      {}
+func (s *nopTransporter) Start() error                        { return nil }
+func (s *nopTransporter) Handler() http.Handler               { return nil }
+func (s *nopTransporter) Send(m []raftpb.Message)             {}
+func (s *nopTransporter) SendSnapshot(m snap.Message)         {}
+func (s *nopTransporter) AddRemote(id types.ID, us []string)  {}
+func (s *nopTransporter) AddPeer(id types.ID, us []string)    {}
+func (s *nopTransporter) RemovePeer(id types.ID)              {}
+func (s *nopTransporter) RemoveAllPeers()                     {}
+func (s *nopTransporter) UpdatePeer(id types.ID, us []string) {}
+func (s *nopTransporter) ActiveSince(id types.ID) time.Time   { return time.Time{} }
+func (s *nopTransporter) Stop()                               {}
+func (s *nopTransporter) Pause()                              {}
+func (s *nopTransporter) Resume()                             {}

+ 71 - 0
etcdserver/snapshot_merge.go

@@ -0,0 +1,71 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package etcdserver
+
+import (
+	"io"
+	"log"
+
+	"github.com/coreos/etcd/raft/raftpb"
+	"github.com/coreos/etcd/snap"
+	dstorage "github.com/coreos/etcd/storage"
+)
+
+// createMergedSnapshotMessage creates a snapshot message that contains: raft status (term, conf),
+// a snapshot of v2 store inside raft.Snapshot as []byte, a snapshot of v3 KV in the top level message
+// as ReadCloser.
+func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapi uint64, confState raftpb.ConfState) snap.Message {
+	snapt, err := s.r.raftStorage.Term(snapi)
+	if err != nil {
+		log.Panicf("get term should never fail: %v", err)
+	}
+
+	// get a snapshot of v2 store as []byte
+	clone := s.store.Clone()
+	d, err := clone.SaveNoCopy()
+	if err != nil {
+		plog.Panicf("store save should never fail: %v", err)
+	}
+
+	// get a snapshot of v3 KV as readCloser
+	rc := newSnapshotReaderCloser(s.kv.Snapshot())
+
+	// put the []byte snapshot of store into raft snapshot and return the merged snapshot with
+	// KV readCloser snapshot.
+	snapshot := raftpb.Snapshot{
+		Metadata: raftpb.SnapshotMetadata{
+			Index:     snapi,
+			Term:      snapt,
+			ConfState: confState,
+		},
+		Data: d,
+	}
+	m.Snapshot = snapshot
+
+	return snap.Message{
+		Message:    m,
+		ReadCloser: rc,
+	}
+}
+
+func newSnapshotReaderCloser(snapshot dstorage.Snapshot) io.ReadCloser {
+	pr, pw := io.Pipe()
+	go func() {
+		_, err := snapshot.WriteTo(pw)
+		pw.CloseWithError(err)
+		snapshot.Close()
+	}()
+	return pr
+}

+ 0 - 260
etcdserver/snapshot_store.go

@@ -1,260 +0,0 @@
-// Copyright 2015 CoreOS, Inc.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package etcdserver
-
-import (
-	"fmt"
-	"io"
-	"io/ioutil"
-	"os"
-	"path"
-	"sync"
-	"time"
-
-	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork"
-	"github.com/coreos/etcd/pkg/fileutil"
-	"github.com/coreos/etcd/raft"
-	"github.com/coreos/etcd/raft/raftpb"
-	"github.com/coreos/etcd/rafthttp"
-	dstorage "github.com/coreos/etcd/storage"
-)
-
-// clearUnusedSnapshotInterval specifies the time interval to wait
-// before clearing unused snapshot.
-// The newly created snapshot should be retrieved within one heartbeat
-// interval because raft state machine retries to send snapshot
-// to slow follower when receiving MsgHeartbeatResp from the follower.
-// Set it as 5s to match the upper limit of heartbeat interval.
-const clearUnusedSnapshotInterval = 5 * time.Second
-
-type snapshot struct {
-	r raftpb.Snapshot
-
-	io.ReadCloser // used to read out v3 snapshot
-
-	done chan struct{}
-}
-
-func newSnapshot(r raftpb.Snapshot, kv dstorage.Snapshot) *snapshot {
-	done := make(chan struct{})
-	pr, pw := io.Pipe()
-	go func() {
-		_, err := kv.WriteTo(pw)
-		pw.CloseWithError(err)
-		kv.Close()
-		close(done)
-	}()
-	return &snapshot{
-		r:          r,
-		ReadCloser: pr,
-		done:       done,
-	}
-}
-
-func (s *snapshot) raft() raftpb.Snapshot { return s.r }
-
-func (s *snapshot) isClosed() bool {
-	select {
-	case <-s.done:
-		return true
-	default:
-		return false
-	}
-}
-
-// TODO: remove snapshotStore. getSnap part could be put into memoryStorage,
-// while SaveFrom could be put into another struct, or even put into dstorage package.
-type snapshotStore struct {
-	// dir to save snapshot data
-	dir string
-	kv  dstorage.KV
-	tr  rafthttp.Transporter
-
-	// send empty to reqsnapc to notify the channel receiver to send back latest
-	// snapshot to snapc
-	reqsnapc chan struct{}
-	// a chan to receive the requested raft snapshot
-	// snapshotStore will receive from the chan immediately after it sends empty to reqsnapc
-	raftsnapc chan raftpb.Snapshot
-
-	mu sync.Mutex // protect belowing vars
-	// snap is nil iff there is no snapshot stored
-	snap       *snapshot
-	inUse      bool
-	createOnce sync.Once // ensure at most one snapshot is created when no snapshot stored
-
-	clock clockwork.Clock
-}
-
-func newSnapshotStore(dir string, kv dstorage.KV) *snapshotStore {
-	return &snapshotStore{
-		dir:       dir,
-		kv:        kv,
-		reqsnapc:  make(chan struct{}),
-		raftsnapc: make(chan raftpb.Snapshot),
-		clock:     clockwork.NewRealClock(),
-	}
-}
-
-// getSnap returns a snapshot.
-// If there is no available snapshot, ErrSnapshotTemporarilyUnavaliable will be returned.
-//
-// If the snapshot stored is in use, it returns ErrSnapshotTemporarilyUnavailable.
-// If there is no snapshot stored, it creates new snapshot
-// asynchronously and returns ErrSnapshotTemporarilyUnavailable, so
-// caller could get snapshot later when the snapshot is created.
-// Otherwise, it returns the snapshot stored.
-//
-// The created snapshot is cleared from the snapshot store if it is
-// either unused after clearUnusedSnapshotInterval, or explicitly cleared
-// through clearUsedSnap after using.
-// closeSnapBefore is used to close outdated snapshot,
-// so the snapshot will be cleared faster when in use.
-//
-// snapshot store stores at most one snapshot at a time.
-// If raft state machine wants to send two snapshot messages to two followers,
-// the second snapshot message will keep getting snapshot and succeed only after
-// the first message is sent. This increases the time used to send messages,
-// but it is acceptable because this should happen seldomly.
-func (ss *snapshotStore) getSnap() (*snapshot, error) {
-	ss.mu.Lock()
-	defer ss.mu.Unlock()
-
-	if ss.inUse {
-		return nil, raft.ErrSnapshotTemporarilyUnavailable
-	}
-
-	if ss.snap == nil {
-		// create snapshot asynchronously
-		ss.createOnce.Do(func() { go ss.createSnap() })
-		return nil, raft.ErrSnapshotTemporarilyUnavailable
-	}
-
-	ss.inUse = true
-	// give transporter the generated snapshot that is ready to send out
-	ss.tr.SnapshotReady(ss.snap, ss.snap.raft().Metadata.Index)
-	return ss.snap, nil
-}
-
-// clearUsedSnap clears the snapshot from the snapshot store after it
-// is used.
-// After clear, snapshotStore could create new snapshot when getSnap.
-func (ss *snapshotStore) clearUsedSnap() {
-	ss.mu.Lock()
-	defer ss.mu.Unlock()
-	if !ss.inUse {
-		plog.Panicf("unexpected clearUsedSnap when snapshot is not in use")
-	}
-	ss.clear()
-}
-
-// closeSnapBefore closes the stored snapshot if its index is not greater
-// than the given compact index.
-// If it closes the snapshot, it returns true.
-func (ss *snapshotStore) closeSnapBefore(index uint64) bool {
-	ss.mu.Lock()
-	defer ss.mu.Unlock()
-	if ss.snap != nil && ss.snap.raft().Metadata.Index <= index {
-		if err := ss.snap.Close(); err != nil {
-			plog.Errorf("snapshot close error (%v)", err)
-		}
-		return true
-	}
-	return false
-}
-
-// createSnap creates a new snapshot and stores it into the snapshot store.
-// It also sets a timer to clear the snapshot if it is not in use after
-// some time interval.
-// It should only be called in snapshotStore functions.
-func (ss *snapshotStore) createSnap() {
-	// ask to generate v2 snapshot
-	ss.reqsnapc <- struct{}{}
-	// generate KV snapshot
-	kvsnap := ss.kv.Snapshot()
-	raftsnap := <-ss.raftsnapc
-	snap := newSnapshot(raftsnap, kvsnap)
-
-	ss.mu.Lock()
-	ss.snap = snap
-	ss.mu.Unlock()
-
-	go func() {
-		<-ss.clock.After(clearUnusedSnapshotInterval)
-		ss.mu.Lock()
-		defer ss.mu.Unlock()
-		if snap == ss.snap && !ss.inUse {
-			ss.clear()
-		}
-	}()
-}
-
-// clear clears snapshot related variables in snapshotStore. It closes
-// the snapshot stored and sets the variables to initial values.
-// It should only be called in snapshotStore functions.
-func (ss *snapshotStore) clear() {
-	if err := ss.snap.Close(); err != nil {
-		plog.Errorf("snapshot close error (%v)", err)
-	}
-	ss.snap = nil
-	ss.inUse = false
-	ss.createOnce = sync.Once{}
-}
-
-// SaveFrom saves snapshot at the given index from the given reader.
-// If the snapshot with the given index has been saved successfully, it keeps
-// the original saved snapshot and returns error.
-// The function guarantees that SaveFrom always saves either complete
-// snapshot or no snapshot, even if the call is aborted because program
-// is hard killed.
-func (ss *snapshotStore) SaveFrom(r io.Reader, index uint64) error {
-	f, err := ioutil.TempFile(ss.dir, "tmp")
-	if err != nil {
-		return err
-	}
-	_, err = io.Copy(f, r)
-	f.Close()
-	if err != nil {
-		os.Remove(f.Name())
-		return err
-	}
-	fn := path.Join(ss.dir, fmt.Sprintf("%016x.db", index))
-	if fileutil.Exist(fn) {
-		os.Remove(f.Name())
-		return fmt.Errorf("snapshot to save has existed")
-	}
-	err = os.Rename(f.Name(), fn)
-	if err != nil {
-		os.Remove(f.Name())
-		return err
-	}
-	return nil
-}
-
-// getSnapFilePath returns the file path for the snapshot with given index.
-// If the snapshot does not exist, it returns error.
-func (ss *snapshotStore) getSnapFilePath(index uint64) (string, error) {
-	fns, err := fileutil.ReadDir(ss.dir)
-	if err != nil {
-		return "", err
-	}
-	wfn := fmt.Sprintf("%016x.db", index)
-	for _, fn := range fns {
-		if fn == wfn {
-			return path.Join(ss.dir, fn), nil
-		}
-	}
-	return "", fmt.Errorf("snapshot file doesn't exist")
-}

+ 0 - 205
etcdserver/snapshot_store_test.go

@@ -1,205 +0,0 @@
-// Copyright 2015 CoreOS, Inc.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package etcdserver
-
-import (
-	"io"
-	"reflect"
-	"sync"
-	"testing"
-
-	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork"
-	"github.com/coreos/etcd/pkg/testutil"
-	"github.com/coreos/etcd/raft"
-	"github.com/coreos/etcd/raft/raftpb"
-	dstorage "github.com/coreos/etcd/storage"
-	"github.com/coreos/etcd/storage/storagepb"
-)
-
-func TestSnapshotStoreCreateSnap(t *testing.T) {
-	snap := raftpb.Snapshot{
-		Metadata: raftpb.SnapshotMetadata{Index: 1},
-	}
-	ss := newSnapshotStore("", &nopKV{})
-	fakeClock := clockwork.NewFakeClock()
-	ss.clock = fakeClock
-	go func() {
-		<-ss.reqsnapc
-		ss.raftsnapc <- snap
-	}()
-
-	// create snapshot
-	ss.createSnap()
-	if !reflect.DeepEqual(ss.snap.raft(), snap) {
-		t.Errorf("raftsnap = %+v, want %+v", ss.snap.raft(), snap)
-	}
-
-	// unused snapshot is cleared after clearUnusedSnapshotInterval
-	fakeClock.BlockUntil(1)
-	fakeClock.Advance(clearUnusedSnapshotInterval)
-	testutil.WaitSchedule()
-	ss.mu.Lock()
-	if ss.snap != nil {
-		t.Errorf("snap = %+v, want %+v", ss.snap, nil)
-	}
-	ss.mu.Unlock()
-}
-
-func TestSnapshotStoreGetSnap(t *testing.T) {
-	snap := raftpb.Snapshot{
-		Metadata: raftpb.SnapshotMetadata{Index: 1},
-	}
-	ss := newSnapshotStore("", &nopKV{})
-	fakeClock := clockwork.NewFakeClock()
-	ss.clock = fakeClock
-	ss.tr = &nopTransporter{}
-	go func() {
-		<-ss.reqsnapc
-		ss.raftsnapc <- snap
-	}()
-
-	// get snap when no snapshot stored
-	_, err := ss.getSnap()
-	if err != raft.ErrSnapshotTemporarilyUnavailable {
-		t.Fatalf("getSnap error = %v, want %v", err, raft.ErrSnapshotTemporarilyUnavailable)
-	}
-
-	// wait for asynchronous snapshot creation to finish
-	testutil.WaitSchedule()
-	// get the created snapshot
-	s, err := ss.getSnap()
-	if err != nil {
-		t.Fatalf("getSnap error = %v, want nil", err)
-	}
-	if !reflect.DeepEqual(s.raft(), snap) {
-		t.Errorf("raftsnap = %+v, want %+v", s.raft(), snap)
-	}
-	if !ss.inUse {
-		t.Errorf("inUse = %v, want true", ss.inUse)
-	}
-
-	// get snap when snapshot stored has been in use
-	_, err = ss.getSnap()
-	if err != raft.ErrSnapshotTemporarilyUnavailable {
-		t.Fatalf("getSnap error = %v, want %v", err, raft.ErrSnapshotTemporarilyUnavailable)
-	}
-
-	// clean up
-	fakeClock.Advance(clearUnusedSnapshotInterval)
-}
-
-func TestSnapshotStoreClearUsedSnap(t *testing.T) {
-	s := &fakeSnapshot{}
-	var once sync.Once
-	once.Do(func() {})
-	ss := &snapshotStore{
-		snap:       newSnapshot(raftpb.Snapshot{}, s),
-		inUse:      true,
-		createOnce: once,
-	}
-
-	ss.clearUsedSnap()
-	// wait for underlying KV snapshot closed
-	testutil.WaitSchedule()
-	s.mu.Lock()
-	if !s.closed {
-		t.Errorf("snapshot closed = %v, want true", s.closed)
-	}
-	s.mu.Unlock()
-	if ss.snap != nil {
-		t.Errorf("snapshot = %v, want nil", ss.snap)
-	}
-	if ss.inUse {
-		t.Errorf("isUse = %v, want false", ss.inUse)
-	}
-	// test createOnce is reset
-	if ss.createOnce == once {
-		t.Errorf("createOnce fails to reset")
-	}
-}
-
-func TestSnapshotStoreCloseSnapBefore(t *testing.T) {
-	snapIndex := uint64(5)
-
-	tests := []struct {
-		index uint64
-		wok   bool
-	}{
-		{snapIndex - 2, false},
-		{snapIndex - 1, false},
-		{snapIndex, true},
-	}
-	for i, tt := range tests {
-		rs := raftpb.Snapshot{
-			Metadata: raftpb.SnapshotMetadata{Index: 5},
-		}
-		s := &fakeSnapshot{}
-		ss := &snapshotStore{
-			snap: newSnapshot(rs, s),
-		}
-
-		ok := ss.closeSnapBefore(tt.index)
-		if ok != tt.wok {
-			t.Errorf("#%d: closeSnapBefore = %v, want %v", i, ok, tt.wok)
-		}
-		if ok {
-			// wait for underlying KV snapshot closed
-			testutil.WaitSchedule()
-			s.mu.Lock()
-			if !s.closed {
-				t.Errorf("#%d: snapshot closed = %v, want true", i, s.closed)
-			}
-			s.mu.Unlock()
-		}
-	}
-}
-
-type nopKV struct{}
-
-func (kv *nopKV) Rev() int64 { return 0 }
-func (kv *nopKV) Range(key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) {
-	return nil, 0, nil
-}
-func (kv *nopKV) Put(key, value []byte) (rev int64)          { return 0 }
-func (kv *nopKV) DeleteRange(key, end []byte) (n, rev int64) { return 0, 0 }
-func (kv *nopKV) TxnBegin() int64                            { return 0 }
-func (kv *nopKV) TxnEnd(txnID int64) error                   { return nil }
-func (kv *nopKV) TxnRange(txnID int64, key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) {
-	return nil, 0, nil
-}
-func (kv *nopKV) TxnPut(txnID int64, key, value []byte) (rev int64, err error) { return 0, nil }
-func (kv *nopKV) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) {
-	return 0, 0, nil
-}
-func (kv *nopKV) Compact(rev int64) error     { return nil }
-func (kv *nopKV) Hash() (uint32, error)       { return 0, nil }
-func (kv *nopKV) Snapshot() dstorage.Snapshot { return &fakeSnapshot{} }
-func (kv *nopKV) Commit()                     {}
-func (kv *nopKV) Restore() error              { return nil }
-func (kv *nopKV) Close() error                { return nil }
-
-type fakeSnapshot struct {
-	mu     sync.Mutex
-	closed bool
-}
-
-func (s *fakeSnapshot) Size() int64                        { return 0 }
-func (s *fakeSnapshot) WriteTo(w io.Writer) (int64, error) { return 0, nil }
-func (s *fakeSnapshot) Close() error {
-	s.mu.Lock()
-	s.closed = true
-	s.mu.Unlock()
-	return nil
-}

+ 3 - 0
etcdserver/storage.go

@@ -35,6 +35,9 @@ type Storage interface {
 	Save(st raftpb.HardState, ents []raftpb.Entry) error
 	// SaveSnap function saves snapshot to the underlying stable storage.
 	SaveSnap(snap raftpb.Snapshot) error
+	// DBFilePath returns the file path of database snapshot saved with given
+	// id.
+	DBFilePath(id uint64) (string, error)
 	// Close closes the Storage and performs finalization.
 	Close() error
 }

+ 11 - 10
rafthttp/http.go

@@ -25,6 +25,7 @@ import (
 	pioutil "github.com/coreos/etcd/pkg/ioutil"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/raft/raftpb"
+	"github.com/coreos/etcd/snap"
 	"github.com/coreos/etcd/version"
 )
 
@@ -118,16 +119,16 @@ func (h *pipelineHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 }
 
 type snapshotHandler struct {
-	r         Raft
-	snapSaver SnapshotSaver
-	cid       types.ID
+	r           Raft
+	snapshotter *snap.Snapshotter
+	cid         types.ID
 }
 
-func newSnapshotHandler(r Raft, snapSaver SnapshotSaver, cid types.ID) http.Handler {
+func newSnapshotHandler(r Raft, snapshotter *snap.Snapshotter, cid types.ID) http.Handler {
 	return &snapshotHandler{
-		r:         r,
-		snapSaver: snapSaver,
-		cid:       cid,
+		r:           r,
+		snapshotter: snapshotter,
+		cid:         cid,
 	}
 }
 
@@ -168,14 +169,14 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 		return
 	}
 
-	// save snapshot
-	if err := h.snapSaver.SaveFrom(r.Body, m.Snapshot.Metadata.Index); err != nil {
+	// save incoming database snapshot.
+	if err := h.snapshotter.SaveDBFrom(r.Body, m.Snapshot.Metadata.Index); err != nil {
 		msg := fmt.Sprintf("failed to save KV snapshot (%v)", err)
 		plog.Error(msg)
 		http.Error(w, msg, http.StatusInternalServerError)
 		return
 	}
-	plog.Infof("received and saved snapshot [index: %d, from: %s] successfully", m.Snapshot.Metadata.Index, types.ID(m.From))
+	plog.Infof("received and saved database snapshot [index: %d, from: %s] successfully", m.Snapshot.Metadata.Index, types.ID(m.From))
 
 	if err := h.r.Process(context.TODO(), m); err != nil {
 		switch v := err.(type) {

+ 6 - 3
rafthttp/http_test.go

@@ -28,6 +28,7 @@ import (
 	"github.com/coreos/etcd/pkg/pbutil"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/raft/raftpb"
+	"github.com/coreos/etcd/snap"
 	"github.com/coreos/etcd/version"
 )
 
@@ -340,9 +341,10 @@ type fakePeerGetter struct {
 func (pg *fakePeerGetter) Get(id types.ID) Peer { return pg.peers[id] }
 
 type fakePeer struct {
-	msgs  []raftpb.Message
-	urls  types.URLs
-	connc chan *outgoingConn
+	msgs     []raftpb.Message
+	snapMsgs []snap.Message
+	urls     types.URLs
+	connc    chan *outgoingConn
 }
 
 func newFakePeer() *fakePeer {
@@ -352,6 +354,7 @@ func newFakePeer() *fakePeer {
 }
 
 func (pr *fakePeer) send(m raftpb.Message)                 { pr.msgs = append(pr.msgs, m) }
+func (pr *fakePeer) sendSnap(m snap.Message)               { pr.snapMsgs = append(pr.snapMsgs, m) }
 func (pr *fakePeer) update(urls types.URLs)                { pr.urls = urls }
 func (pr *fakePeer) attachOutgoingConn(conn *outgoingConn) { pr.connc <- conn }
 func (pr *fakePeer) activeSince() time.Time                { return time.Time{} }

+ 12 - 6
rafthttp/peer.go

@@ -23,6 +23,7 @@ import (
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/raft"
 	"github.com/coreos/etcd/raft/raftpb"
+	"github.com/coreos/etcd/snap"
 )
 
 const (
@@ -57,6 +58,11 @@ type Peer interface {
 	// When it fails to send message out, it will report the status to underlying
 	// raft.
 	send(m raftpb.Message)
+
+	// sendSanp sends the merged snapshot message to the remote peer. Its behavior
+	// is similar to send.
+	sendSnap(m snap.Message)
+
 	// update updates the urls of remote peer.
 	update(urls types.URLs)
 	// attachOutgoingConn attachs the outgoing connection to the peer for
@@ -110,7 +116,7 @@ type peer struct {
 	done  chan struct{}
 }
 
-func startPeer(streamRt, pipelineRt http.RoundTripper, urls types.URLs, local, to, cid types.ID, snapst *snapshotStore, r Raft, fs *stats.FollowerStats, errorc chan error, v3demo bool) *peer {
+func startPeer(streamRt, pipelineRt http.RoundTripper, urls types.URLs, local, to, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error, v3demo bool) *peer {
 	picker := newURLPicker(urls)
 	status := newPeerStatus(to)
 	p := &peer{
@@ -121,7 +127,7 @@ func startPeer(streamRt, pipelineRt http.RoundTripper, urls types.URLs, local, t
 		msgAppV2Writer: startStreamWriter(to, status, fs, r),
 		writer:         startStreamWriter(to, status, fs, r),
 		pipeline:       newPipeline(pipelineRt, picker, local, to, cid, status, fs, r, errorc),
-		snapSender:     newSnapshotSender(pipelineRt, picker, local, to, cid, status, snapst, r, errorc),
+		snapSender:     newSnapshotSender(pipelineRt, picker, local, to, cid, status, r, errorc),
 		sendc:          make(chan raftpb.Message),
 		recvc:          make(chan raftpb.Message, recvBufSize),
 		propc:          make(chan raftpb.Message, maxPendingProposals),
@@ -158,10 +164,6 @@ func startPeer(streamRt, pipelineRt http.RoundTripper, urls types.URLs, local, t
 				if paused {
 					continue
 				}
-				if p.v3demo && isMsgSnap(m) {
-					go p.snapSender.send(m)
-					continue
-				}
 				writec, name := p.pick(m)
 				select {
 				case writec <- m:
@@ -209,6 +211,10 @@ func (p *peer) send(m raftpb.Message) {
 	}
 }
 
+func (p *peer) sendSnap(m snap.Message) {
+	go p.snapSender.send(m)
+}
+
 func (p *peer) update(urls types.URLs) {
 	select {
 	case p.newURLsC <- urls:

+ 10 - 14
rafthttp/snapshot_sender.go

@@ -24,7 +24,7 @@ import (
 	"github.com/coreos/etcd/pkg/httputil"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/raft"
-	"github.com/coreos/etcd/raft/raftpb"
+	"github.com/coreos/etcd/snap"
 )
 
 type snapshotSender struct {
@@ -34,14 +34,13 @@ type snapshotSender struct {
 	tr     http.RoundTripper
 	picker *urlPicker
 	status *peerStatus
-	snapst *snapshotStore
 	r      Raft
 	errorc chan error
 
 	stopc chan struct{}
 }
 
-func newSnapshotSender(tr http.RoundTripper, picker *urlPicker, from, to, cid types.ID, status *peerStatus, snapst *snapshotStore, r Raft, errorc chan error) *snapshotSender {
+func newSnapshotSender(tr http.RoundTripper, picker *urlPicker, from, to, cid types.ID, status *peerStatus, r Raft, errorc chan error) *snapshotSender {
 	return &snapshotSender{
 		from:   from,
 		to:     to,
@@ -49,7 +48,6 @@ func newSnapshotSender(tr http.RoundTripper, picker *urlPicker, from, to, cid ty
 		tr:     tr,
 		picker: picker,
 		status: status,
-		snapst: snapst,
 		r:      r,
 		errorc: errorc,
 		stopc:  make(chan struct{}),
@@ -58,10 +56,12 @@ func newSnapshotSender(tr http.RoundTripper, picker *urlPicker, from, to, cid ty
 
 func (s *snapshotSender) stop() { close(s.stopc) }
 
-func (s *snapshotSender) send(m raftpb.Message) {
+func (s *snapshotSender) send(merged snap.Message) {
+	m := merged.Message
+
 	start := time.Now()
 
-	body := createSnapBody(m, s.snapst)
+	body := createSnapBody(merged)
 	defer body.Close()
 
 	u := s.picker.pick()
@@ -142,20 +142,16 @@ type readCloser struct {
 	io.Closer
 }
 
-// createSnapBody creates the request body for the given raft snapshot message.
-// Callers should close body when done reading from it.
-func createSnapBody(m raftpb.Message, snapst *snapshotStore) io.ReadCloser {
+func createSnapBody(merged snap.Message) io.ReadCloser {
 	buf := new(bytes.Buffer)
 	enc := &messageEncoder{w: buf}
 	// encode raft message
-	if err := enc.encode(m); err != nil {
+	if err := enc.encode(merged.Message); err != nil {
 		plog.Panicf("encode message error (%v)", err)
 	}
-	// get snapshot
-	rc := snapst.get(m.Snapshot.Metadata.Index)
 
 	return &readCloser{
-		Reader: io.MultiReader(buf, rc),
-		Closer: rc,
+		Reader: io.MultiReader(buf, merged.ReadCloser),
+		Closer: merged.ReadCloser,
 	}
 }

+ 0 - 45
rafthttp/snapshot_store.go

@@ -1,45 +0,0 @@
-// Copyright 2015 CoreOS, Inc.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package rafthttp
-
-import (
-	"io"
-)
-
-// snapshotStore is the store of snapshot. Caller could put one
-// snapshot into the store, and get it later.
-// snapshotStore stores at most one snapshot at a time, or it panics.
-type snapshotStore struct {
-	rc io.ReadCloser
-	// index of the stored snapshot
-	// index is 0 if and only if there is no snapshot stored.
-	index uint64
-}
-
-func (s *snapshotStore) put(rc io.ReadCloser, index uint64) {
-	if s.index != 0 {
-		plog.Panicf("unexpected put when there is one snapshot stored")
-	}
-	s.rc, s.index = rc, index
-}
-
-func (s *snapshotStore) get(index uint64) io.ReadCloser {
-	if s.index == index {
-		// set index to 0 to indicate no snapshot stored
-		s.index = 0
-		return s.rc
-	}
-	return nil
-}

+ 17 - 26
rafthttp/transport.go

@@ -15,7 +15,6 @@
 package rafthttp
 
 import (
-	"io"
 	"net/http"
 	"sync"
 	"time"
@@ -29,6 +28,7 @@ import (
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/raft"
 	"github.com/coreos/etcd/raft/raftpb"
+	"github.com/coreos/etcd/snap"
 )
 
 var plog = logutil.NewMergeLogger(capnslog.NewPackageLogger("github.com/coreos/etcd", "rafthttp"))
@@ -40,12 +40,6 @@ type Raft interface {
 	ReportSnapshot(id uint64, status raft.SnapshotStatus)
 }
 
-// SnapshotSaver is the interface that wraps the SaveFrom method.
-type SnapshotSaver interface {
-	// SaveFrom saves the snapshot data at the given index from the given reader.
-	SaveFrom(r io.Reader, index uint64) error
-}
-
 type Transporter interface {
 	// Start starts the given Transporter.
 	// Start MUST be called before calling other functions in the interface.
@@ -62,6 +56,9 @@ type Transporter interface {
 	// If the id cannot be found in the transport, the message
 	// will be ignored.
 	Send(m []raftpb.Message)
+	// SendSnapshot sends out the given snapshot message to a remote peer.
+	// The behavior of SendSnapshot is similar to Send.
+	SendSnapshot(m snap.Message)
 	// AddRemote adds a remote with given peer urls into the transport.
 	// A remote helps newly joined member to catch up the progress of cluster,
 	// and will not be used after that.
@@ -86,14 +83,6 @@ type Transporter interface {
 	// If the connection is active since peer was added, it returns the adding time.
 	// If the connection is currently inactive, it returns zero time.
 	ActiveSince(id types.ID) time.Time
-	// SnapshotReady accepts a snapshot at the given index that is ready to send out.
-	// It is expected that caller sends a raft snapshot message with
-	// the given index soon, and the accepted snapshot will be sent out
-	// together. After sending, snapshot sent status is reported
-	// through Raft.SnapshotStatus.
-	// SnapshotReady MUST not be called when the snapshot sent status of previous
-	// accepted one has not been reported.
-	SnapshotReady(rc io.ReadCloser, index uint64)
 	// Stop closes the connections and stops the transporter.
 	Stop()
 }
@@ -108,10 +97,10 @@ type Transport struct {
 	DialTimeout time.Duration     // maximum duration before timing out dial of the request
 	TLSInfo     transport.TLSInfo // TLS information used when creating connection
 
-	ID          types.ID           // local member ID
-	ClusterID   types.ID           // raft cluster ID for request validation
-	Raft        Raft               // raft state machine, to which the Transport forwards received messages and reports status
-	SnapSaver   SnapshotSaver      // used to save snapshot in v3 snapshot messages
+	ID          types.ID // local member ID
+	ClusterID   types.ID // raft cluster ID for request validation
+	Raft        Raft     // raft state machine, to which the Transport forwards received messages and reports status
+	Snapshotter *snap.Snapshotter
 	ServerStats *stats.ServerStats // used to record general transportation statistics
 	// used to record transportation statistics with followers when
 	// performing as leader in raft protocol
@@ -130,8 +119,6 @@ type Transport struct {
 	remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up
 	peers   map[types.ID]Peer    // peers map
 
-	snapst *snapshotStore
-
 	prober probing.Prober
 }
 
@@ -147,7 +134,6 @@ func (t *Transport) Start() error {
 	}
 	t.remotes = make(map[types.ID]*remote)
 	t.peers = make(map[types.ID]Peer)
-	t.snapst = &snapshotStore{}
 	t.prober = probing.NewProber(t.pipelineRt)
 	return nil
 }
@@ -155,7 +141,7 @@ func (t *Transport) Start() error {
 func (t *Transport) Handler() http.Handler {
 	pipelineHandler := newPipelineHandler(t.Raft, t.ClusterID)
 	streamHandler := newStreamHandler(t, t.Raft, t.ID, t.ClusterID)
-	snapHandler := newSnapshotHandler(t.Raft, t.SnapSaver, t.ClusterID)
+	snapHandler := newSnapshotHandler(t.Raft, t.Snapshotter, t.ClusterID)
 	mux := http.NewServeMux()
 	mux.Handle(RaftPrefix, pipelineHandler)
 	mux.Handle(RaftStreamPrefix+"/", streamHandler)
@@ -240,7 +226,7 @@ func (t *Transport) AddPeer(id types.ID, us []string) {
 		plog.Panicf("newURLs %+v should never fail: %+v", us, err)
 	}
 	fs := t.LeaderStats.Follower(id.String())
-	t.peers[id] = startPeer(t.streamRt, t.pipelineRt, urls, t.ID, id, t.ClusterID, t.snapst, t.Raft, fs, t.ErrorC, t.V3demo)
+	t.peers[id] = startPeer(t.streamRt, t.pipelineRt, urls, t.ID, id, t.ClusterID, t.Raft, fs, t.ErrorC, t.V3demo)
 	addPeerToProber(t.prober, id.String(), us)
 }
 
@@ -296,8 +282,13 @@ func (t *Transport) ActiveSince(id types.ID) time.Time {
 	return time.Time{}
 }
 
-func (t *Transport) SnapshotReady(rc io.ReadCloser, index uint64) {
-	t.snapst.put(rc, index)
+func (t *Transport) SendSnapshot(m snap.Message) {
+	p := t.peers[types.ID(m.To)]
+	if p == nil {
+		m.ReadCloser.Close()
+		return
+	}
+	p.sendSnap(m)
 }
 
 type Pausable interface {

+ 67 - 0
snap/db.go

@@ -0,0 +1,67 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package snap
+
+import (
+	"fmt"
+	"io"
+	"io/ioutil"
+	"os"
+	"path"
+
+	"github.com/coreos/etcd/pkg/fileutil"
+)
+
+// SaveDBFrom saves snapshot of the database from the given reader. It
+// guarantees the save operation is atomic.
+func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) error {
+	f, err := ioutil.TempFile(s.dir, "tmp")
+	if err != nil {
+		return err
+	}
+	_, err = io.Copy(f, r)
+	f.Close()
+	if err != nil {
+		os.Remove(f.Name())
+		return err
+	}
+	fn := path.Join(s.dir, fmt.Sprintf("%016x.snap.db", id))
+	if fileutil.Exist(fn) {
+		os.Remove(f.Name())
+		return nil
+	}
+	err = os.Rename(f.Name(), fn)
+	if err != nil {
+		os.Remove(f.Name())
+		return err
+	}
+	return nil
+}
+
+// DBFilePath returns the file path for the snapshot of the database with
+// given id. If the snapshot does not exist, it returns error.
+func (s *Snapshotter) DBFilePath(id uint64) (string, error) {
+	fns, err := fileutil.ReadDir(s.dir)
+	if err != nil {
+		return "", err
+	}
+	wfn := fmt.Sprintf("%016x.snap.db", id)
+	for _, fn := range fns {
+		if fn == wfn {
+			return path.Join(s.dir, fn), nil
+		}
+	}
+	return "", fmt.Errorf("snap: snapshot file doesn't exist")
+}

+ 34 - 0
snap/message.go

@@ -0,0 +1,34 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package snap
+
+import (
+	"io"
+
+	"github.com/coreos/etcd/raft/raftpb"
+)
+
+// Message is a struct that contains a raft Message and a ReadCloser. The type
+// of raft message MUST be MsgSnap, which contains the raft meta-data and an
+// additional data []byte field that contains the snapshot of the actual state
+// machine.
+// Message contains the ReadCloser field for handling large snapshot. This avoid
+// copying the entire snapshot into a byte array, which consumes a lot of memory.
+//
+// User of Message should close the ReadCloser after sending it.
+type Message struct {
+	raftpb.Message
+	ReadCloser io.ReadCloser
+}