Browse Source

Merge pull request #3666 from yichengq/transport-snap

rafthttp: support sending v3 snapshot message
Yicheng Qin 10 years ago
parent
commit
d2b40c1c98

+ 7 - 0
etcdserver/server.go

@@ -359,9 +359,11 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
 		ID:          id,
 		ClusterID:   cl.ID(),
 		Raft:        srv,
+		SnapSaver:   s.snapStore,
 		ServerStats: sstats,
 		LeaderStats: lstats,
 		ErrorC:      srv.errorc,
+		V3demo:      cfg.V3demo,
 	}
 	if err := tr.Start(); err != nil {
 		return nil, err
@@ -378,6 +380,11 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
 		}
 	}
 	srv.r.transport = tr
+
+	if cfg.V3demo {
+		s.snapStore.tr = tr
+	}
+
 	return srv, nil
 }
 

+ 14 - 12
etcdserver/server_test.go

@@ -17,6 +17,7 @@ package etcdserver
 import (
 	"encoding/json"
 	"fmt"
+	"io"
 	"net/http"
 	"path"
 	"reflect"
@@ -1468,15 +1469,16 @@ func (n *readyNode) Ready() <-chan raft.Ready { return n.readyc }
 
 type nopTransporter struct{}
 
-func (s *nopTransporter) Start() error                        { return nil }
-func (s *nopTransporter) Handler() http.Handler               { return nil }
-func (s *nopTransporter) Send(m []raftpb.Message)             {}
-func (s *nopTransporter) AddRemote(id types.ID, us []string)  {}
-func (s *nopTransporter) AddPeer(id types.ID, us []string)    {}
-func (s *nopTransporter) RemovePeer(id types.ID)              {}
-func (s *nopTransporter) RemoveAllPeers()                     {}
-func (s *nopTransporter) UpdatePeer(id types.ID, us []string) {}
-func (s *nopTransporter) ActiveSince(id types.ID) time.Time   { return time.Time{} }
-func (s *nopTransporter) Stop()                               {}
-func (s *nopTransporter) Pause()                              {}
-func (s *nopTransporter) Resume()                             {}
+func (s *nopTransporter) Start() error                                 { return nil }
+func (s *nopTransporter) Handler() http.Handler                        { return nil }
+func (s *nopTransporter) Send(m []raftpb.Message)                      {}
+func (s *nopTransporter) AddRemote(id types.ID, us []string)           {}
+func (s *nopTransporter) AddPeer(id types.ID, us []string)             {}
+func (s *nopTransporter) RemovePeer(id types.ID)                       {}
+func (s *nopTransporter) RemoveAllPeers()                              {}
+func (s *nopTransporter) UpdatePeer(id types.ID, us []string)          {}
+func (s *nopTransporter) ActiveSince(id types.ID) time.Time            { return time.Time{} }
+func (s *nopTransporter) SnapshotReady(rc io.ReadCloser, index uint64) {}
+func (s *nopTransporter) Stop()                                        {}
+func (s *nopTransporter) Pause()                                       {}
+func (s *nopTransporter) Resume()                                      {}

+ 56 - 20
etcdserver/snapshot_store.go

@@ -24,26 +24,52 @@ import (
 	"github.com/coreos/etcd/pkg/fileutil"
 	"github.com/coreos/etcd/raft"
 	"github.com/coreos/etcd/raft/raftpb"
+	"github.com/coreos/etcd/rafthttp"
 	dstorage "github.com/coreos/etcd/storage"
 )
 
 type snapshot struct {
-	r  raftpb.Snapshot
-	kv dstorage.Snapshot
-}
+	r raftpb.Snapshot
 
-func (s *snapshot) raft() raftpb.Snapshot { return s.r }
+	io.ReadCloser // used to read out v3 snapshot
+
+	done chan struct{}
+}
 
-func (s *snapshot) size() int64 { return s.kv.Size() }
+func newSnapshot(r raftpb.Snapshot, kv dstorage.Snapshot) *snapshot {
+	done := make(chan struct{})
+	pr, pw := io.Pipe()
+	go func() {
+		_, err := kv.WriteTo(pw)
+		pw.CloseWithError(err)
+		kv.Close()
+		close(done)
+	}()
+	return &snapshot{
+		r:          r,
+		ReadCloser: pr,
+		done:       done,
+	}
+}
 
-func (s *snapshot) writeTo(w io.Writer) (n int64, err error) { return s.kv.WriteTo(w) }
+func (s *snapshot) raft() raftpb.Snapshot { return s.r }
 
-func (s *snapshot) close() error { return s.kv.Close() }
+func (s *snapshot) isClosed() bool {
+	select {
+	case <-s.done:
+		return true
+	default:
+		return false
+	}
+}
 
+// TODO: remove snapshotStore. getSnap part could be put into memoryStorage,
+// while SaveFrom could be put into another struct, or even put into dstorage package.
 type snapshotStore struct {
 	// dir to save snapshot data
 	dir string
 	kv  dstorage.KV
+	tr  rafthttp.Transporter
 
 	// send empty to reqsnapc to notify the channel receiver to send back latest
 	// snapshot to snapc
@@ -66,8 +92,18 @@ func newSnapshotStore(dir string, kv dstorage.KV) *snapshotStore {
 
 // getSnap returns a snapshot.
 // If there is no available snapshot, ErrSnapshotTemporarilyUnavaliable will be returned.
+//
+// Internally it creates new snapshot and returns the snapshot. Unless the
+// returned snapshot is closed, it rejects creating new one and returns
+// ErrSnapshotTemporarilyUnavailable.
+// If raft state machine wants to send two snapshot messages to two followers,
+// the second snapshot message will keep getting snapshot and succeed only after
+// the first message is sent. This increases the time used to send messages,
+// but it is acceptable because this should happen seldomly.
 func (ss *snapshotStore) getSnap() (*snapshot, error) {
-	if ss.snap != nil {
+	// If snapshotStore has some snapshot that has not been closed, it cannot
+	// request new snapshot. So it returns ErrSnapshotTemporarilyUnavailable.
+	if ss.snap != nil && !ss.snap.isClosed() {
 		return nil, raft.ErrSnapshotTemporarilyUnavailable
 	}
 
@@ -76,30 +112,30 @@ func (ss *snapshotStore) getSnap() (*snapshot, error) {
 	// generate KV snapshot
 	kvsnap := ss.kv.Snapshot()
 	raftsnap := <-ss.raftsnapc
-	ss.snap = &snapshot{
-		r:  raftsnap,
-		kv: kvsnap,
-	}
+	ss.snap = newSnapshot(raftsnap, kvsnap)
+	// give transporter the generated snapshot that is ready to send out
+	ss.tr.SnapshotReady(ss.snap, raftsnap.Metadata.Index)
 	return ss.snap, nil
 }
 
-// saveSnap saves snapshot into disk.
-//
-// If snapshot has existed in disk, it keeps the original snapshot and returns error.
-// The function guarantees that it always saves either complete snapshot or no snapshot,
-// even if the call is aborted because program is hard killed.
-func (ss *snapshotStore) saveSnap(s *snapshot) error {
+// SaveFrom saves snapshot at the given index from the given reader.
+// If the snapshot with the given index has been saved successfully, it keeps
+// the original saved snapshot and returns error.
+// The function guarantees that SaveFrom always saves either complete
+// snapshot or no snapshot, even if the call is aborted because program
+// is hard killed.
+func (ss *snapshotStore) SaveFrom(r io.Reader, index uint64) error {
 	f, err := ioutil.TempFile(ss.dir, "tmp")
 	if err != nil {
 		return err
 	}
-	_, err = s.writeTo(f)
+	_, err = io.Copy(f, r)
 	f.Close()
 	if err != nil {
 		os.Remove(f.Name())
 		return err
 	}
-	fn := path.Join(ss.dir, fmt.Sprintf("%016x.db", s.raft().Metadata.Index))
+	fn := path.Join(ss.dir, fmt.Sprintf("%016x.db", index))
 	if fileutil.Exist(fn) {
 		os.Remove(f.Name())
 		return fmt.Errorf("snapshot to save has existed")

+ 107 - 28
rafthttp/http.go

@@ -16,6 +16,7 @@ package rafthttp
 
 import (
 	"errors"
+	"fmt"
 	"io/ioutil"
 	"net/http"
 	"path"
@@ -32,9 +33,10 @@ const (
 )
 
 var (
-	RaftPrefix       = "/raft"
-	ProbingPrefix    = path.Join(RaftPrefix, "probing")
-	RaftStreamPrefix = path.Join(RaftPrefix, "stream")
+	RaftPrefix         = "/raft"
+	ProbingPrefix      = path.Join(RaftPrefix, "probing")
+	RaftStreamPrefix   = path.Join(RaftPrefix, "stream")
+	RaftSnapshotPrefix = path.Join(RaftPrefix, "snapshot")
 
 	errIncompatibleVersion = errors.New("incompatible version")
 	errClusterIDMismatch   = errors.New("cluster ID mismatch")
@@ -47,6 +49,14 @@ func NewHandler(r Raft, cid types.ID) http.Handler {
 	}
 }
 
+func newSnapshotHandler(r Raft, snapSaver SnapshotSaver, cid types.ID) http.Handler {
+	return &snapshotHandler{
+		r:         r,
+		snapSaver: snapSaver,
+		cid:       cid,
+	}
+}
+
 type peerGetter interface {
 	Get(id types.ID) Peer
 }
@@ -76,19 +86,10 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 		return
 	}
 
-	if err := checkVersionCompability(r.Header.Get("X-Server-From"), serverVersion(r.Header), minClusterVersion(r.Header)); err != nil {
-		plog.Errorf("request received was ignored (%v)", err)
-		http.Error(w, errIncompatibleVersion.Error(), http.StatusPreconditionFailed)
-		return
-	}
-
-	wcid := h.cid.String()
-	w.Header().Set("X-Etcd-Cluster-ID", wcid)
+	w.Header().Set("X-Etcd-Cluster-ID", h.cid.String())
 
-	gcid := r.Header.Get("X-Etcd-Cluster-ID")
-	if gcid != wcid {
-		plog.Errorf("request received was ignored (cluster ID mismatch got %s want %s)", gcid, wcid)
-		http.Error(w, errClusterIDMismatch.Error(), http.StatusPreconditionFailed)
+	if err := checkClusterCompatibilityFromHeader(r.Header, h.cid); err != nil {
+		http.Error(w, err.Error(), http.StatusPreconditionFailed)
 		return
 	}
 
@@ -122,6 +123,76 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 	w.WriteHeader(http.StatusNoContent)
 }
 
+type snapshotHandler struct {
+	r         Raft
+	snapSaver SnapshotSaver
+	cid       types.ID
+}
+
+// ServeHTTP serves HTTP request to receive and process snapshot message.
+//
+// If request sender dies without closing underlying TCP connection,
+// the handler will keep waiting for the request body until TCP keepalive
+// finds out that the connection is broken after several minutes.
+// This is acceptable because
+// 1. snapshot messages sent through other TCP connections could still be
+// received and processed.
+// 2. this case should happen rarely, so no further optimization is done.
+func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+	if r.Method != "POST" {
+		w.Header().Set("Allow", "POST")
+		http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
+		return
+	}
+
+	w.Header().Set("X-Etcd-Cluster-ID", h.cid.String())
+
+	if err := checkClusterCompatibilityFromHeader(r.Header, h.cid); err != nil {
+		http.Error(w, err.Error(), http.StatusPreconditionFailed)
+		return
+	}
+
+	dec := &messageDecoder{r: r.Body}
+	m, err := dec.decode()
+	if err != nil {
+		msg := fmt.Sprintf("failed to decode raft message (%v)", err)
+		plog.Errorf(msg)
+		http.Error(w, msg, http.StatusBadRequest)
+		return
+	}
+	if m.Type != raftpb.MsgSnap {
+		plog.Errorf("unexpected raft message type %s on snapshot path", m.Type)
+		http.Error(w, "wrong raft message type", http.StatusBadRequest)
+		return
+	}
+
+	// save snapshot
+	if err := h.snapSaver.SaveFrom(r.Body, m.Snapshot.Metadata.Index); err != nil {
+		msg := fmt.Sprintf("failed to save KV snapshot (%v)", err)
+		plog.Error(msg)
+		http.Error(w, msg, http.StatusInternalServerError)
+		return
+	}
+	plog.Infof("received and saved snapshot [index: %d, from: %s] successfully", m.Snapshot.Metadata.Index, types.ID(m.From))
+
+	if err := h.r.Process(context.TODO(), m); err != nil {
+		switch v := err.(type) {
+		// Process may return writerToResponse error when doing some
+		// additional checks before calling raft.Node.Step.
+		case writerToResponse:
+			v.WriteTo(w)
+		default:
+			msg := fmt.Sprintf("failed to process raft message (%v)", err)
+			plog.Warningf(msg)
+			http.Error(w, msg, http.StatusInternalServerError)
+		}
+		return
+	}
+	// Write StatusNoContet header after the message has been processed by
+	// raft, which facilitates the client to report MsgSnap status.
+	w.WriteHeader(http.StatusNoContent)
+}
+
 type streamHandler struct {
 	peerGetter peerGetter
 	r          Raft
@@ -137,19 +208,10 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 	}
 
 	w.Header().Set("X-Server-Version", version.Version)
+	w.Header().Set("X-Etcd-Cluster-ID", h.cid.String())
 
-	if err := checkVersionCompability(r.Header.Get("X-Server-From"), serverVersion(r.Header), minClusterVersion(r.Header)); err != nil {
-		plog.Errorf("request received was ignored (%v)", err)
-		http.Error(w, errIncompatibleVersion.Error(), http.StatusPreconditionFailed)
-		return
-	}
-
-	wcid := h.cid.String()
-	w.Header().Set("X-Etcd-Cluster-ID", wcid)
-
-	if gcid := r.Header.Get("X-Etcd-Cluster-ID"); gcid != wcid {
-		plog.Errorf("streaming request ignored (cluster ID mismatch got %s want %s)", gcid, wcid)
-		http.Error(w, errClusterIDMismatch.Error(), http.StatusPreconditionFailed)
+	if err := checkClusterCompatibilityFromHeader(r.Header, h.cid); err != nil {
+		http.Error(w, err.Error(), http.StatusPreconditionFailed)
 		return
 	}
 
@@ -187,7 +249,7 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 		// with the same cluster ID.
 		// 2. local etcd falls behind of the cluster, and cannot recognize
 		// the members that joined after its current progress.
-		plog.Errorf("failed to find member %s in cluster %s", from, wcid)
+		plog.Errorf("failed to find member %s in cluster %s", from, h.cid)
 		http.Error(w, "error sender not found", http.StatusNotFound)
 		return
 	}
@@ -214,6 +276,23 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 	<-c.closeNotify()
 }
 
+// checkClusterCompatibilityFromHeader checks the cluster compatibility of
+// the local member from the given header.
+// It checks whether the version of local member is compatible with
+// the versions in the header, and whether the cluster ID of local member
+// matches the one in the header.
+func checkClusterCompatibilityFromHeader(header http.Header, cid types.ID) error {
+	if err := checkVersionCompability(header.Get("X-Server-From"), serverVersion(header), minClusterVersion(header)); err != nil {
+		plog.Errorf("request version incompatibility (%v)", err)
+		return errIncompatibleVersion
+	}
+	if gcid := header.Get("X-Etcd-Cluster-ID"); gcid != cid.String() {
+		plog.Errorf("request cluster ID mismatch (got %s want %s)", gcid, cid)
+		return errClusterIDMismatch
+	}
+	return nil
+}
+
 type closeNotifier struct {
 	done chan struct{}
 }

+ 13 - 3
rafthttp/peer.go

@@ -49,6 +49,7 @@ const (
 	streamAppV2 = "streamMsgAppV2"
 	streamMsg   = "streamMsg"
 	pipelineMsg = "pipeline"
+	sendSnap    = "sendMsgSnap"
 )
 
 type Peer interface {
@@ -87,14 +88,16 @@ type Peer interface {
 // It is only used when the stream has not been established.
 type peer struct {
 	// id of the remote raft peer node
-	id types.ID
-	r  Raft
+	id     types.ID
+	r      Raft
+	v3demo bool
 
 	status *peerStatus
 
 	msgAppWriter *streamWriter
 	writer       *streamWriter
 	pipeline     *pipeline
+	snapSender   *snapshotSender // snapshot sender to send v3 snapshot messages
 	msgAppReader *streamReader
 
 	sendc    chan raftpb.Message
@@ -111,16 +114,18 @@ type peer struct {
 	done  chan struct{}
 }
 
-func startPeer(streamRt, pipelineRt http.RoundTripper, urls types.URLs, local, to, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error, term uint64) *peer {
+func startPeer(streamRt, pipelineRt http.RoundTripper, urls types.URLs, local, to, cid types.ID, snapst *snapshotStore, r Raft, fs *stats.FollowerStats, errorc chan error, term uint64, v3demo bool) *peer {
 	picker := newURLPicker(urls)
 	status := newPeerStatus(to)
 	p := &peer{
 		id:           to,
 		r:            r,
+		v3demo:       v3demo,
 		status:       status,
 		msgAppWriter: startStreamWriter(to, status, fs, r),
 		writer:       startStreamWriter(to, status, fs, r),
 		pipeline:     newPipeline(pipelineRt, picker, local, to, cid, status, fs, r, errorc),
+		snapSender:   newSnapshotSender(pipelineRt, picker, local, to, cid, status, snapst, r, errorc),
 		sendc:        make(chan raftpb.Message),
 		recvc:        make(chan raftpb.Message, recvBufSize),
 		propc:        make(chan raftpb.Message, maxPendingProposals),
@@ -158,6 +163,10 @@ func startPeer(streamRt, pipelineRt http.RoundTripper, urls types.URLs, local, t
 				if paused {
 					continue
 				}
+				if p.v3demo && isMsgSnap(m) {
+					go p.snapSender.send(m)
+					continue
+				}
 				writec, name := p.pick(m)
 				select {
 				case writec <- m:
@@ -187,6 +196,7 @@ func startPeer(streamRt, pipelineRt http.RoundTripper, urls types.URLs, local, t
 				p.msgAppWriter.stop()
 				p.writer.stop()
 				p.pipeline.stop()
+				p.snapSender.stop()
 				p.msgAppReader.stop()
 				reader.stop()
 				close(p.done)

+ 7 - 38
rafthttp/pipeline.go

@@ -17,10 +17,8 @@ package rafthttp
 import (
 	"bytes"
 	"errors"
-	"fmt"
 	"io/ioutil"
 	"net/http"
-	"strings"
 	"sync"
 	"time"
 
@@ -30,7 +28,6 @@ import (
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/raft"
 	"github.com/coreos/etcd/raft/raftpb"
-	"github.com/coreos/etcd/version"
 )
 
 const (
@@ -125,18 +122,7 @@ func (p *pipeline) handle() {
 // error on any failure.
 func (p *pipeline) post(data []byte) (err error) {
 	u := p.picker.pick()
-	uu := u
-	uu.Path = RaftPrefix
-	req, err := http.NewRequest("POST", uu.String(), bytes.NewBuffer(data))
-	if err != nil {
-		p.picker.unreachable(u)
-		return err
-	}
-	req.Header.Set("Content-Type", "application/protobuf")
-	req.Header.Set("X-Server-From", p.from.String())
-	req.Header.Set("X-Server-Version", version.Version)
-	req.Header.Set("X-Min-Cluster-Version", version.MinClusterVersion)
-	req.Header.Set("X-Etcd-Cluster-ID", p.cid.String())
+	req := createPostRequest(u, RaftPrefix, bytes.NewBuffer(data), "application/protobuf", p.from, p.cid)
 
 	var stopped bool
 	defer func() {
@@ -170,31 +156,14 @@ func (p *pipeline) post(data []byte) (err error) {
 	}
 	resp.Body.Close()
 
-	switch resp.StatusCode {
-	case http.StatusPreconditionFailed:
-		switch strings.TrimSuffix(string(b), "\n") {
-		case errIncompatibleVersion.Error():
-			plog.Errorf("request sent was ignored by peer %s (server version incompatible)", p.to)
-			return errIncompatibleVersion
-		case errClusterIDMismatch.Error():
-			plog.Errorf("request sent was ignored (cluster ID mismatch: remote[%s]=%s, local=%s)",
-				p.to, resp.Header.Get("X-Etcd-Cluster-ID"), p.cid)
-			return errClusterIDMismatch
-		default:
-			return fmt.Errorf("unhandled error %q when precondition failed", string(b))
-		}
-	case http.StatusForbidden:
-		err := fmt.Errorf("the member has been permanently removed from the cluster")
-		select {
-		case p.errorc <- err:
-		default:
-		}
-		return nil
-	case http.StatusNoContent:
+	err = checkPostResponse(resp, b, req, p.to)
+	// errMemberRemoved is a critical error since a removed member should
+	// always be stopped. So we use reportCriticalError to report it to errorc.
+	if err == errMemberRemoved {
+		reportCriticalError(err, p.errorc)
 		return nil
-	default:
-		return fmt.Errorf("unexpected http status %s while posting to %q", http.StatusText(resp.StatusCode), req.URL.String())
 	}
+	return err
 }
 
 // waitSchedule waits other goroutines to be scheduled for a while

+ 161 - 0
rafthttp/snapshot_sender.go

@@ -0,0 +1,161 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rafthttp
+
+import (
+	"bytes"
+	"io"
+	"io/ioutil"
+	"net/http"
+	"time"
+
+	"github.com/coreos/etcd/pkg/httputil"
+	"github.com/coreos/etcd/pkg/types"
+	"github.com/coreos/etcd/raft"
+	"github.com/coreos/etcd/raft/raftpb"
+)
+
+type snapshotSender struct {
+	from, to types.ID
+	cid      types.ID
+
+	tr     http.RoundTripper
+	picker *urlPicker
+	status *peerStatus
+	snapst *snapshotStore
+	r      Raft
+	errorc chan error
+
+	stopc chan struct{}
+}
+
+func newSnapshotSender(tr http.RoundTripper, picker *urlPicker, from, to, cid types.ID, status *peerStatus, snapst *snapshotStore, r Raft, errorc chan error) *snapshotSender {
+	return &snapshotSender{
+		from:   from,
+		to:     to,
+		cid:    cid,
+		tr:     tr,
+		picker: picker,
+		status: status,
+		snapst: snapst,
+		r:      r,
+		errorc: errorc,
+		stopc:  make(chan struct{}),
+	}
+}
+
+func (s *snapshotSender) stop() { close(s.stopc) }
+
+func (s *snapshotSender) send(m raftpb.Message) {
+	start := time.Now()
+
+	body := createSnapBody(m, s.snapst)
+	defer body.Close()
+
+	u := s.picker.pick()
+	req := createPostRequest(u, RaftSnapshotPrefix, body, "application/octet-stream", s.from, s.cid)
+
+	err := s.post(req)
+	if err != nil {
+		// errMemberRemoved is a critical error since a removed member should
+		// always be stopped. So we use reportCriticalError to report it to errorc.
+		if err == errMemberRemoved {
+			reportCriticalError(err, s.errorc)
+		}
+		s.picker.unreachable(u)
+		reportSentFailure(sendSnap, m)
+		s.status.deactivate(failureType{source: sendSnap, action: "post"}, err.Error())
+		s.r.ReportUnreachable(m.To)
+		// report SnapshotFailure to raft state machine. After raft state
+		// machine knows about it, it would pause a while and retry sending
+		// new snapshot message.
+		s.r.ReportSnapshot(m.To, raft.SnapshotFailure)
+		if s.status.isActive() {
+			plog.Warningf("snapshot [index: %d, to: %s] failed to be sent out (%v)", m.Snapshot.Metadata.Index, types.ID(m.To), err)
+		} else {
+			plog.Debugf("snapshot [index: %d, to: %s] failed to be sent out (%v)", m.Snapshot.Metadata.Index, types.ID(m.To), err)
+		}
+		return
+	}
+	reportSentDuration(sendSnap, m, time.Since(start))
+	s.status.activate()
+	s.r.ReportSnapshot(m.To, raft.SnapshotFinish)
+	plog.Infof("snapshot [index: %d, to: %s] sent out successfully", m.Snapshot.Metadata.Index, types.ID(m.To))
+}
+
+// post posts the given request.
+// It returns nil when request is sent out and processed successfully.
+func (s *snapshotSender) post(req *http.Request) (err error) {
+	cancel := httputil.RequestCanceler(s.tr, req)
+
+	type responseAndError struct {
+		resp *http.Response
+		body []byte
+		err  error
+	}
+	result := make(chan responseAndError, 1)
+
+	go func() {
+		// TODO: cancel the request if it has waited for a long time(~5s) after
+		// it has write out the full request body, which helps to avoid receiver
+		// dies when sender is waiting for response
+		// TODO: the snapshot could be large and eat up all resources when writing
+		// it out. Send it block by block and rest some time between to give the
+		// time for main loop to run.
+		resp, err := s.tr.RoundTrip(req)
+		if err != nil {
+			result <- responseAndError{resp, nil, err}
+			return
+		}
+		body, err := ioutil.ReadAll(resp.Body)
+		resp.Body.Close()
+		result <- responseAndError{resp, body, err}
+	}()
+
+	select {
+	case <-s.stopc:
+		cancel()
+		return errStopped
+	case r := <-result:
+		if r.err != nil {
+			return r.err
+		}
+		return checkPostResponse(r.resp, r.body, req, s.to)
+	}
+}
+
+// readCloser implements io.ReadCloser interface.
+type readCloser struct {
+	io.Reader
+	io.Closer
+}
+
+// createSnapBody creates the request body for the given raft snapshot message.
+// Callers should close body when done reading from it.
+func createSnapBody(m raftpb.Message, snapst *snapshotStore) io.ReadCloser {
+	buf := new(bytes.Buffer)
+	enc := &messageEncoder{w: buf}
+	// encode raft message
+	if err := enc.encode(m); err != nil {
+		plog.Panicf("encode message error (%v)", err)
+	}
+	// get snapshot
+	rc := snapst.get(m.Snapshot.Metadata.Index)
+
+	return &readCloser{
+		Reader: io.MultiReader(buf, rc),
+		Closer: rc,
+	}
+}

+ 49 - 1
rafthttp/transport.go

@@ -15,6 +15,7 @@
 package rafthttp
 
 import (
+	"io"
 	"net/http"
 	"sync"
 	"time"
@@ -38,6 +39,12 @@ type Raft interface {
 	ReportSnapshot(id uint64, status raft.SnapshotStatus)
 }
 
+// SnapshotSaver is the interface that wraps the SaveFrom method.
+type SnapshotSaver interface {
+	// SaveFrom saves the snapshot data at the given index from the given reader.
+	SaveFrom(r io.Reader, index uint64) error
+}
+
 type Transporter interface {
 	// Start starts the given Transporter.
 	// Start MUST be called before calling other functions in the interface.
@@ -78,6 +85,10 @@ type Transporter interface {
 	// If the connection is active since peer was added, it returns the adding time.
 	// If the connection is currently inactive, it returns zero time.
 	ActiveSince(id types.ID) time.Time
+	// SnapshotReady accepts a snapshot at the given index that is ready to send out.
+	// SnapshotReady MUST not be called when the snapshot sent result of previous
+	// accepted one has not been reported.
+	SnapshotReady(rc io.ReadCloser, index uint64)
 	// Stop closes the connections and stops the transporter.
 	Stop()
 }
@@ -95,6 +106,7 @@ type Transport struct {
 	ID          types.ID           // local member ID
 	ClusterID   types.ID           // raft cluster ID for request validation
 	Raft        Raft               // raft state machine, to which the Transport forwards received messages and reports status
+	SnapSaver   SnapshotSaver      // used to save snapshot in v3 snapshot messages
 	ServerStats *stats.ServerStats // used to record general transportation statistics
 	// used to record transportation statistics with followers when
 	// performing as leader in raft protocol
@@ -104,6 +116,7 @@ type Transport struct {
 	// When an error is received from ErrorC, user should stop raft state
 	// machine and thus stop the Transport.
 	ErrorC chan error
+	V3demo bool
 
 	streamRt   http.RoundTripper // roundTripper used by streams
 	pipelineRt http.RoundTripper // roundTripper used by pipelines
@@ -113,6 +126,8 @@ type Transport struct {
 	remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up
 	peers   map[types.ID]Peer    // peers map
 
+	snapst *snapshotStore
+
 	prober probing.Prober
 }
 
@@ -131,6 +146,7 @@ func (t *Transport) Start() error {
 	}
 	t.remotes = make(map[types.ID]*remote)
 	t.peers = make(map[types.ID]Peer)
+	t.snapst = &snapshotStore{}
 	t.prober = probing.NewProber(t.pipelineRt)
 	return nil
 }
@@ -138,9 +154,11 @@ func (t *Transport) Start() error {
 func (t *Transport) Handler() http.Handler {
 	pipelineHandler := NewHandler(t.Raft, t.ClusterID)
 	streamHandler := newStreamHandler(t, t.Raft, t.ID, t.ClusterID)
+	snapHandler := newSnapshotHandler(t.Raft, t.SnapSaver, t.ClusterID)
 	mux := http.NewServeMux()
 	mux.Handle(RaftPrefix, pipelineHandler)
 	mux.Handle(RaftStreamPrefix+"/", streamHandler)
+	mux.Handle(RaftSnapshotPrefix, snapHandler)
 	mux.Handle(ProbingPrefix, probing.NewHandler())
 	return mux
 }
@@ -234,7 +252,7 @@ func (t *Transport) AddPeer(id types.ID, us []string) {
 		plog.Panicf("newURLs %+v should never fail: %+v", us, err)
 	}
 	fs := t.LeaderStats.Follower(id.String())
-	t.peers[id] = startPeer(t.streamRt, t.pipelineRt, urls, t.ID, id, t.ClusterID, t.Raft, fs, t.ErrorC, t.term)
+	t.peers[id] = startPeer(t.streamRt, t.pipelineRt, urls, t.ID, id, t.ClusterID, t.snapst, t.Raft, fs, t.ErrorC, t.term, t.V3demo)
 	addPeerToProber(t.prober, id.String(), us)
 }
 
@@ -290,6 +308,10 @@ func (t *Transport) ActiveSince(id types.ID) time.Time {
 	return time.Time{}
 }
 
+func (t *Transport) SnapshotReady(rc io.ReadCloser, index uint64) {
+	t.snapst.put(rc, index)
+}
+
 type Pausable interface {
 	Pause()
 	Resume()
@@ -307,3 +329,29 @@ func (t *Transport) Resume() {
 		p.(Pausable).Resume()
 	}
 }
+
+// snapshotStore is the store of snapshot. Caller could put one
+// snapshot into the store, and get it later.
+// snapshotStore stores at most one snapshot at a time, or it panics.
+type snapshotStore struct {
+	rc io.ReadCloser
+	// index of the stored snapshot
+	// index is 0 if and only if there is no snapshot stored.
+	index uint64
+}
+
+func (s *snapshotStore) put(rc io.ReadCloser, index uint64) {
+	if s.index != 0 {
+		plog.Panicf("unexpected put when there is one snapshot stored")
+	}
+	s.rc, s.index = rc, index
+}
+
+func (s *snapshotStore) get(index uint64) io.ReadCloser {
+	if s.index == index {
+		// set index to 0 to indicate no snapshot stored
+		s.index = 0
+		return s.rc
+	}
+	return nil
+}

+ 58 - 0
rafthttp/util.go

@@ -19,12 +19,17 @@ import (
 	"fmt"
 	"io"
 	"net/http"
+	"net/url"
+	"strings"
 
 	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver"
+	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/raft/raftpb"
 	"github.com/coreos/etcd/version"
 )
 
+var errMemberRemoved = fmt.Errorf("the member has been permanently removed from the cluster")
+
 func writeEntryTo(w io.Writer, ent *raftpb.Entry) error {
 	size := ent.Size()
 	if err := binary.Write(w, binary.BigEndian, uint64(size)); err != nil {
@@ -50,6 +55,59 @@ func readEntryFrom(r io.Reader, ent *raftpb.Entry) error {
 	return ent.Unmarshal(buf)
 }
 
+// createPostRequest creates a HTTP POST request that sends raft message.
+func createPostRequest(u url.URL, path string, body io.Reader, ct string, from, cid types.ID) *http.Request {
+	uu := u
+	uu.Path = path
+	req, err := http.NewRequest("POST", uu.String(), body)
+	if err != nil {
+		plog.Panicf("unexpected new request error (%v)", err)
+	}
+	req.Header.Set("Content-Type", ct)
+	req.Header.Set("X-Server-From", from.String())
+	req.Header.Set("X-Server-Version", version.Version)
+	req.Header.Set("X-Min-Cluster-Version", version.MinClusterVersion)
+	req.Header.Set("X-Etcd-Cluster-ID", cid.String())
+	return req
+}
+
+// checkPostResponse checks the response of the HTTP POST request that sends
+// raft message.
+func checkPostResponse(resp *http.Response, body []byte, req *http.Request, to types.ID) error {
+	switch resp.StatusCode {
+	case http.StatusPreconditionFailed:
+		switch strings.TrimSuffix(string(body), "\n") {
+		case errIncompatibleVersion.Error():
+			plog.Errorf("request sent was ignored by peer %s (server version incompatible)", to)
+			return errIncompatibleVersion
+		case errClusterIDMismatch.Error():
+			plog.Errorf("request sent was ignored (cluster ID mismatch: remote[%s]=%s, local=%s)",
+				to, resp.Header.Get("X-Etcd-Cluster-ID"), req.Header.Get("X-Etcd-Cluster-ID"))
+			return errClusterIDMismatch
+		default:
+			return fmt.Errorf("unhandled error %q when precondition failed", string(body))
+		}
+	case http.StatusForbidden:
+		return errMemberRemoved
+	case http.StatusNoContent:
+		return nil
+	default:
+		return fmt.Errorf("unexpected http status %s while posting to %q", http.StatusText(resp.StatusCode), req.URL.String())
+	}
+}
+
+// reportErr reports the given error through sending it into
+// the given error channel.
+// If the error channel is filled up when sending error, it drops the error
+// because the fact that error has happened is reported, which is
+// good enough.
+func reportCriticalError(err error, errc chan<- error) {
+	select {
+	case errc <- err:
+	default:
+	}
+}
+
 // compareMajorMinorVersion returns an integer comparing two versions based on
 // their major and minor version. The result will be 0 if a==b, -1 if a < b,
 // and 1 if a > b.