Browse Source

rafthttp: report unreachable status of the peer

When it failed to send message to the remote peer, it reports unreachable
to raft.
Yicheng Qin 10 years ago
parent
commit
9b986fb4c1

+ 2 - 0
etcdserver/server.go

@@ -326,6 +326,8 @@ func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
 	return s.r.Step(ctx, m)
 }
 
+func (s *EtcdServer) ReportUnreachable(id uint64) { s.r.ReportUnreachable(id) }
+
 func (s *EtcdServer) run() {
 	var syncC <-chan time.Time
 	var shouldstop bool

+ 2 - 0
rafthttp/http_test.go

@@ -162,12 +162,14 @@ func (er *errReader) Read(_ []byte) (int, error) { return 0, errors.New("some er
 type nopProcessor struct{}
 
 func (p *nopProcessor) Process(ctx context.Context, m raftpb.Message) error { return nil }
+func (p *nopProcessor) ReportUnreachable(id uint64)                         {}
 
 type errProcessor struct {
 	err error
 }
 
 func (p *errProcessor) Process(ctx context.Context, m raftpb.Message) error { return p.err }
+func (p *errProcessor) ReportUnreachable(id uint64)                         {}
 
 type resWriterToError struct {
 	code int

+ 3 - 3
rafthttp/peer.go

@@ -65,9 +65,9 @@ type peer struct {
 func startPeer(tr http.RoundTripper, u string, local, to, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error) *peer {
 	p := &peer{
 		id:           to,
-		msgAppWriter: startStreamWriter(fs),
-		writer:       startStreamWriter(fs),
-		pipeline:     newPipeline(tr, u, to, cid, fs, errorc),
+		msgAppWriter: startStreamWriter(fs, r),
+		writer:       startStreamWriter(fs, r),
+		pipeline:     newPipeline(tr, u, to, cid, fs, r, errorc),
 		sendc:        make(chan raftpb.Message),
 		recvc:        make(chan raftpb.Message, recvBufSize),
 		newURLc:      make(chan string),

+ 4 - 1
rafthttp/pipeline.go

@@ -45,6 +45,7 @@ type pipeline struct {
 	// the url this pipeline sends to
 	u      string
 	fs     *stats.FollowerStats
+	r      Raft
 	errorc chan error
 
 	msgc chan raftpb.Message
@@ -57,13 +58,14 @@ type pipeline struct {
 	errored error
 }
 
-func newPipeline(tr http.RoundTripper, u string, id, cid types.ID, fs *stats.FollowerStats, errorc chan error) *pipeline {
+func newPipeline(tr http.RoundTripper, u string, id, cid types.ID, fs *stats.FollowerStats, r Raft, errorc chan error) *pipeline {
 	p := &pipeline{
 		id:     id,
 		cid:    cid,
 		tr:     tr,
 		u:      u,
 		fs:     fs,
+		r:      r,
 		errorc: errorc,
 		msgc:   make(chan raftpb.Message, pipelineBufSize),
 		active: true,
@@ -102,6 +104,7 @@ func (p *pipeline) handle() {
 			if m.Type == raftpb.MsgApp {
 				p.fs.Fail()
 			}
+			p.r.ReportUnreachable(m.To)
 		} else {
 			if !p.active {
 				log.Printf("pipeline: the connection with %s became active", p.id)

+ 6 - 6
rafthttp/pipeline_test.go

@@ -32,7 +32,7 @@ import (
 func TestPipelineSend(t *testing.T) {
 	tr := &roundTripperRecorder{}
 	fs := &stats.FollowerStats{}
-	p := newPipeline(tr, "http://10.0.0.1", types.ID(1), types.ID(1), fs, nil)
+	p := newPipeline(tr, "http://10.0.0.1", types.ID(1), types.ID(1), fs, &nopProcessor{}, nil)
 
 	p.msgc <- raftpb.Message{Type: raftpb.MsgApp}
 	p.stop()
@@ -50,7 +50,7 @@ func TestPipelineSend(t *testing.T) {
 func TestPipelineExceedMaximalServing(t *testing.T) {
 	tr := newRoundTripperBlocker()
 	fs := &stats.FollowerStats{}
-	p := newPipeline(tr, "http://10.0.0.1", types.ID(1), types.ID(1), fs, nil)
+	p := newPipeline(tr, "http://10.0.0.1", types.ID(1), types.ID(1), fs, &nopProcessor{}, nil)
 
 	// keep the sender busy and make the buffer full
 	// nothing can go out as we block the sender
@@ -89,7 +89,7 @@ func TestPipelineExceedMaximalServing(t *testing.T) {
 // it increases fail count in stats.
 func TestPipelineSendFailed(t *testing.T) {
 	fs := &stats.FollowerStats{}
-	p := newPipeline(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), types.ID(1), fs, nil)
+	p := newPipeline(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), types.ID(1), fs, &nopProcessor{}, nil)
 
 	p.msgc <- raftpb.Message{Type: raftpb.MsgApp}
 	p.stop()
@@ -103,7 +103,7 @@ func TestPipelineSendFailed(t *testing.T) {
 
 func TestPipelinePost(t *testing.T) {
 	tr := &roundTripperRecorder{}
-	p := newPipeline(tr, "http://10.0.0.1", types.ID(1), types.ID(1), nil, nil)
+	p := newPipeline(tr, "http://10.0.0.1", types.ID(1), types.ID(1), nil, &nopProcessor{}, nil)
 	if err := p.post([]byte("some data")); err != nil {
 		t.Fatalf("unexpect post error: %v", err)
 	}
@@ -145,7 +145,7 @@ func TestPipelinePostBad(t *testing.T) {
 		{"http://10.0.0.1", http.StatusCreated, nil},
 	}
 	for i, tt := range tests {
-		p := newPipeline(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), nil, make(chan error))
+		p := newPipeline(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), nil, &nopProcessor{}, make(chan error))
 		err := p.post([]byte("some data"))
 		p.stop()
 
@@ -166,7 +166,7 @@ func TestPipelinePostErrorc(t *testing.T) {
 	}
 	for i, tt := range tests {
 		errorc := make(chan error, 1)
-		p := newPipeline(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), nil, errorc)
+		p := newPipeline(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), nil, &nopProcessor{}, errorc)
 		p.post([]byte("some data"))
 		p.stop()
 		select {

+ 4 - 1
rafthttp/stream.go

@@ -63,6 +63,7 @@ type outgoingConn struct {
 // attached outgoingConn.
 type streamWriter struct {
 	fs *stats.FollowerStats
+	r  Raft
 
 	mu      sync.Mutex // guard field working and closer
 	closer  io.Closer
@@ -74,9 +75,10 @@ type streamWriter struct {
 	done  chan struct{}
 }
 
-func startStreamWriter(fs *stats.FollowerStats) *streamWriter {
+func startStreamWriter(fs *stats.FollowerStats, r Raft) *streamWriter {
 	w := &streamWriter{
 		fs:    fs,
+		r:     r,
 		msgc:  make(chan raftpb.Message, streamBufSize),
 		connc: make(chan *outgoingConn),
 		stopc: make(chan struct{}),
@@ -118,6 +120,7 @@ func (cw *streamWriter) run() {
 				log.Printf("rafthttp: failed to send message on stream %s due to %v. waiting for a new stream to be established.", t, err)
 				cw.resetCloser()
 				heartbeatc, msgc = nil, nil
+				cw.r.ReportUnreachable(m.To)
 				continue
 			}
 			flusher.Flush()

+ 1 - 0
rafthttp/transport.go

@@ -29,6 +29,7 @@ import (
 
 type Raft interface {
 	Process(ctx context.Context, m raftpb.Message) error
+	ReportUnreachable(id uint64)
 }
 
 type Transporter interface {