Selaa lähdekoodia

Merge pull request #2408 from yichengq/335

rafthttp: report MsgSnap status
Yicheng Qin 10 vuotta sitten
vanhempi
commit
31666cdbff
7 muutettua tiedostoa jossa 30 lisäystä ja 5 poistoa
  1. 4 0
      etcdserver/server.go
  2. 1 1
      raft/raft.go
  3. 2 0
      rafthttp/http.go
  4. 7 4
      rafthttp/http_test.go
  5. 7 0
      rafthttp/peer.go
  6. 7 0
      rafthttp/pipeline.go
  7. 2 0
      rafthttp/transport.go

+ 4 - 0
etcdserver/server.go

@@ -328,6 +328,10 @@ func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
 
 func (s *EtcdServer) ReportUnreachable(id uint64) { s.r.ReportUnreachable(id) }
 
+func (s *EtcdServer) ReportSnapshot(id uint64, status raft.SnapshotStatus) {
+	s.r.ReportSnapshot(id, status)
+}
+
 func (s *EtcdServer) run() {
 	var syncC <-chan time.Time
 	var shouldstop bool

+ 1 - 1
raft/raft.go

@@ -582,7 +582,7 @@ func stepLeader(r *raft, m pb.Message) {
 			log.Printf("raft: %x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
 		} else {
 			pr.snapshotFinish()
-			log.Printf("raft: %x snapshot succeeded resumed sending replication messages to %x [%s]", r.id, m.From, pr)
+			log.Printf("raft: %x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
 			// wait for the msgAppResp from the remote node before sending
 			// out the next msgApp
 			pr.waitSet(r.electionTimeout)

+ 2 - 0
rafthttp/http.go

@@ -101,6 +101,8 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 		}
 		return
 	}
+	// Write StatusNoContet header after the message has been processed by
+	// raft, which faciliates the client to report MsgSnap status.
 	w.WriteHeader(http.StatusNoContent)
 }
 

+ 7 - 4
rafthttp/http_test.go

@@ -26,6 +26,7 @@ import (
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
 	"github.com/coreos/etcd/pkg/pbutil"
 	"github.com/coreos/etcd/pkg/types"
+	"github.com/coreos/etcd/raft"
 	"github.com/coreos/etcd/raft/raftpb"
 )
 
@@ -161,15 +162,17 @@ func (er *errReader) Read(_ []byte) (int, error) { return 0, errors.New("some er
 
 type nopProcessor struct{}
 
-func (p *nopProcessor) Process(ctx context.Context, m raftpb.Message) error { return nil }
-func (p *nopProcessor) ReportUnreachable(id uint64)                         {}
+func (p *nopProcessor) Process(ctx context.Context, m raftpb.Message) error  { return nil }
+func (p *nopProcessor) ReportUnreachable(id uint64)                          {}
+func (p *nopProcessor) ReportSnapshot(id uint64, status raft.SnapshotStatus) {}
 
 type errProcessor struct {
 	err error
 }
 
-func (p *errProcessor) Process(ctx context.Context, m raftpb.Message) error { return p.err }
-func (p *errProcessor) ReportUnreachable(id uint64)                         {}
+func (p *errProcessor) Process(ctx context.Context, m raftpb.Message) error  { return p.err }
+func (p *errProcessor) ReportUnreachable(id uint64)                          {}
+func (p *errProcessor) ReportSnapshot(id uint64, status raft.SnapshotStatus) {}
 
 type resWriterToError struct {
 	code int

+ 7 - 0
rafthttp/peer.go

@@ -180,6 +180,11 @@ func (p *peer) Stop() {
 
 func (p *peer) pick(m raftpb.Message) (writec chan raftpb.Message, name string, size int) {
 	switch {
+	// Considering MsgSnap may have a big size, e.g., 1G, and will block
+	// stream for a long time, only use one of the N pipelines to send MsgSnap.
+	case isMsgSnap(m):
+		writec = p.pipeline.msgc
+		name, size = "pipeline", pipelineBufSize
 	case p.msgAppWriter.isWorking() && canUseMsgAppStream(m):
 		writec = p.msgAppWriter.msgc
 		name, size = "msgapp stream", streamBufSize
@@ -192,3 +197,5 @@ func (p *peer) pick(m raftpb.Message) (writec chan raftpb.Message, name string,
 	}
 	return
 }
+
+func isMsgSnap(m raftpb.Message) bool { return m.Type == raftpb.MsgSnap }

+ 7 - 0
rafthttp/pipeline.go

@@ -25,6 +25,7 @@ import (
 	"github.com/coreos/etcd/etcdserver/stats"
 	"github.com/coreos/etcd/pkg/pbutil"
 	"github.com/coreos/etcd/pkg/types"
+	"github.com/coreos/etcd/raft"
 	"github.com/coreos/etcd/raft/raftpb"
 )
 
@@ -105,6 +106,9 @@ func (p *pipeline) handle() {
 				p.fs.Fail()
 			}
 			p.r.ReportUnreachable(m.To)
+			if isMsgSnap(m) {
+				p.r.ReportSnapshot(m.To, raft.SnapshotFailure)
+			}
 		} else {
 			if !p.active {
 				log.Printf("pipeline: the connection with %s became active", p.id)
@@ -114,6 +118,9 @@ func (p *pipeline) handle() {
 			if m.Type == raftpb.MsgApp {
 				p.fs.Succ(end.Sub(start))
 			}
+			if isMsgSnap(m) {
+				p.r.ReportSnapshot(m.To, raft.SnapshotFinish)
+			}
 		}
 		p.Unlock()
 	}

+ 2 - 0
rafthttp/transport.go

@@ -24,12 +24,14 @@ import (
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
 	"github.com/coreos/etcd/etcdserver/stats"
 	"github.com/coreos/etcd/pkg/types"
+	"github.com/coreos/etcd/raft"
 	"github.com/coreos/etcd/raft/raftpb"
 )
 
 type Raft interface {
 	Process(ctx context.Context, m raftpb.Message) error
 	ReportUnreachable(id uint64)
+	ReportSnapshot(id uint64, status raft.SnapshotStatus)
 }
 
 type Transporter interface {