Explorar o código

Merge pull request #2923 from yichengq/rafthttp-status

rafthttp: pretty print connection error
Yicheng Qin %!s(int64=10) %!d(string=hai) anos
pai
achega
0a3a2720a1

+ 6 - 5
rafthttp/peer.go

@@ -109,12 +109,13 @@ type peer struct {
 
 func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error) *peer {
 	picker := newURLPicker(urls)
+	status := newPeerStatus(to)
 	p := &peer{
 		id:           to,
 		r:            r,
-		msgAppWriter: startStreamWriter(to, fs, r),
-		writer:       startStreamWriter(to, fs, r),
-		pipeline:     newPipeline(tr, picker, local, to, cid, fs, r, errorc),
+		msgAppWriter: startStreamWriter(to, status, fs, r),
+		writer:       startStreamWriter(to, status, fs, r),
+		pipeline:     newPipeline(tr, picker, local, to, cid, status, fs, r, errorc),
 		sendc:        make(chan raftpb.Message),
 		recvc:        make(chan raftpb.Message, recvBufSize),
 		propc:        make(chan raftpb.Message, maxPendingProposals),
@@ -144,8 +145,8 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r
 
 	go func() {
 		var paused bool
-		p.msgAppReader = startStreamReader(tr, picker, streamTypeMsgAppV2, local, to, cid, p.recvc, p.propc, errorc)
-		reader := startStreamReader(tr, picker, streamTypeMessage, local, to, cid, p.recvc, p.propc, errorc)
+		p.msgAppReader = startStreamReader(tr, picker, streamTypeMsgAppV2, local, to, cid, status, p.recvc, p.propc, errorc)
+		reader := startStreamReader(tr, picker, streamTypeMessage, local, to, cid, status, p.recvc, p.propc, errorc)
 		for {
 			select {
 			case m := <-p.sendc:

+ 67 - 0
rafthttp/peer_status.go

@@ -0,0 +1,67 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rafthttp
+
+import (
+	"fmt"
+	"sync"
+
+	"github.com/coreos/etcd/pkg/types"
+)
+
+type failureType struct {
+	source string
+	action string
+}
+
+type peerStatus struct {
+	id         types.ID
+	mu         sync.Mutex // protect active and failureMap
+	active     bool
+	failureMap map[failureType]string
+}
+
+func newPeerStatus(id types.ID) *peerStatus {
+	return &peerStatus{
+		id:         id,
+		failureMap: make(map[failureType]string),
+	}
+}
+
+func (s *peerStatus) activate() {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	if !s.active {
+		plog.Infof("the connection with %s became active", s.id)
+		s.active = true
+		s.failureMap = make(map[failureType]string)
+	}
+}
+
+func (s *peerStatus) deactivate(failure failureType, reason string) {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	if s.active {
+		plog.Infof("the connection with %s became inactive", s.id)
+		s.active = false
+	}
+	logline := fmt.Sprintf("failed to %s %s on %s (%s)", failure.action, s.id, failure.source, reason)
+	if r, ok := s.failureMap[failure]; ok && r == reason {
+		plog.Debugf(logline)
+		return
+	}
+	s.failureMap[failure] = reason
+	plog.Errorf(logline)
+}

+ 5 - 23
rafthttp/pipeline.go

@@ -53,6 +53,7 @@ type pipeline struct {
 
 	tr     http.RoundTripper
 	picker *urlPicker
+	status *peerStatus
 	fs     *stats.FollowerStats
 	r      Raft
 	errorc chan error
@@ -61,26 +62,21 @@ type pipeline struct {
 	// wait for the handling routines
 	wg    sync.WaitGroup
 	stopc chan struct{}
-	sync.Mutex
-	// if the last send was successful, the pipeline is active.
-	// Or it is inactive
-	active  bool
-	errored error
 }
 
-func newPipeline(tr http.RoundTripper, picker *urlPicker, from, to, cid types.ID, fs *stats.FollowerStats, r Raft, errorc chan error) *pipeline {
+func newPipeline(tr http.RoundTripper, picker *urlPicker, from, to, cid types.ID, status *peerStatus, fs *stats.FollowerStats, r Raft, errorc chan error) *pipeline {
 	p := &pipeline{
 		from:   from,
 		to:     to,
 		cid:    cid,
 		tr:     tr,
 		picker: picker,
+		status: status,
 		fs:     fs,
 		r:      r,
 		errorc: errorc,
 		stopc:  make(chan struct{}),
 		msgc:   make(chan raftpb.Message, pipelineBufSize),
-		active: true,
 	}
 	p.wg.Add(connPerPipeline)
 	for i := 0; i < connPerPipeline; i++ {
@@ -105,18 +101,9 @@ func (p *pipeline) handle() {
 		}
 		end := time.Now()
 
-		p.Lock()
 		if err != nil {
 			reportSentFailure(pipelineMsg, m)
-
-			if p.errored == nil || p.errored.Error() != err.Error() {
-				plog.Errorf("failed to post to %s (%v)", p.to, err)
-				p.errored = err
-			}
-			if p.active {
-				plog.Infof("the connection with %s became inactive", p.to)
-				p.active = false
-			}
+			p.status.deactivate(failureType{source: pipelineMsg, action: "write"}, err.Error())
 			if m.Type == raftpb.MsgApp && p.fs != nil {
 				p.fs.Fail()
 			}
@@ -125,11 +112,7 @@ func (p *pipeline) handle() {
 				p.r.ReportSnapshot(m.To, raft.SnapshotFailure)
 			}
 		} else {
-			if !p.active {
-				plog.Infof("the connection with %s became active", p.to)
-				p.active = true
-				p.errored = nil
-			}
+			p.status.activate()
 			if m.Type == raftpb.MsgApp && p.fs != nil {
 				p.fs.Succ(end.Sub(start))
 			}
@@ -138,7 +121,6 @@ func (p *pipeline) handle() {
 			}
 			reportSentDuration(pipelineMsg, m, time.Since(start))
 		}
-		p.Unlock()
 	}
 }
 

+ 7 - 7
rafthttp/pipeline_test.go

@@ -36,7 +36,7 @@ func TestPipelineSend(t *testing.T) {
 	tr := &roundTripperRecorder{}
 	picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
 	fs := &stats.FollowerStats{}
-	p := newPipeline(tr, picker, types.ID(2), types.ID(1), types.ID(1), fs, &fakeRaft{}, nil)
+	p := newPipeline(tr, picker, types.ID(2), types.ID(1), types.ID(1), newPeerStatus(types.ID(1)), fs, &fakeRaft{}, nil)
 
 	p.msgc <- raftpb.Message{Type: raftpb.MsgApp}
 	testutil.WaitSchedule()
@@ -56,7 +56,7 @@ func TestPipelineExceedMaximalServing(t *testing.T) {
 	tr := newRoundTripperBlocker()
 	picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
 	fs := &stats.FollowerStats{}
-	p := newPipeline(tr, picker, types.ID(2), types.ID(1), types.ID(1), fs, &fakeRaft{}, nil)
+	p := newPipeline(tr, picker, types.ID(2), types.ID(1), types.ID(1), newPeerStatus(types.ID(1)), fs, &fakeRaft{}, nil)
 
 	// keep the sender busy and make the buffer full
 	// nothing can go out as we block the sender
@@ -96,7 +96,7 @@ func TestPipelineExceedMaximalServing(t *testing.T) {
 func TestPipelineSendFailed(t *testing.T) {
 	picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
 	fs := &stats.FollowerStats{}
-	p := newPipeline(newRespRoundTripper(0, errors.New("blah")), picker, types.ID(2), types.ID(1), types.ID(1), fs, &fakeRaft{}, nil)
+	p := newPipeline(newRespRoundTripper(0, errors.New("blah")), picker, types.ID(2), types.ID(1), types.ID(1), newPeerStatus(types.ID(1)), fs, &fakeRaft{}, nil)
 
 	p.msgc <- raftpb.Message{Type: raftpb.MsgApp}
 	testutil.WaitSchedule()
@@ -112,7 +112,7 @@ func TestPipelineSendFailed(t *testing.T) {
 func TestPipelinePost(t *testing.T) {
 	tr := &roundTripperRecorder{}
 	picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
-	p := newPipeline(tr, picker, types.ID(2), types.ID(1), types.ID(1), nil, &fakeRaft{}, nil)
+	p := newPipeline(tr, picker, types.ID(2), types.ID(1), types.ID(1), newPeerStatus(types.ID(1)), nil, &fakeRaft{}, nil)
 	if err := p.post([]byte("some data")); err != nil {
 		t.Fatalf("unexpect post error: %v", err)
 	}
@@ -159,7 +159,7 @@ func TestPipelinePostBad(t *testing.T) {
 	}
 	for i, tt := range tests {
 		picker := mustNewURLPicker(t, []string{tt.u})
-		p := newPipeline(newRespRoundTripper(tt.code, tt.err), picker, types.ID(2), types.ID(1), types.ID(1), nil, &fakeRaft{}, make(chan error))
+		p := newPipeline(newRespRoundTripper(tt.code, tt.err), picker, types.ID(2), types.ID(1), types.ID(1), newPeerStatus(types.ID(1)), nil, &fakeRaft{}, make(chan error))
 		err := p.post([]byte("some data"))
 		p.stop()
 
@@ -180,7 +180,7 @@ func TestPipelinePostErrorc(t *testing.T) {
 	for i, tt := range tests {
 		picker := mustNewURLPicker(t, []string{tt.u})
 		errorc := make(chan error, 1)
-		p := newPipeline(newRespRoundTripper(tt.code, tt.err), picker, types.ID(2), types.ID(1), types.ID(1), nil, &fakeRaft{}, errorc)
+		p := newPipeline(newRespRoundTripper(tt.code, tt.err), picker, types.ID(2), types.ID(1), types.ID(1), newPeerStatus(types.ID(1)), nil, &fakeRaft{}, errorc)
 		p.post([]byte("some data"))
 		p.stop()
 		select {
@@ -193,7 +193,7 @@ func TestPipelinePostErrorc(t *testing.T) {
 
 func TestStopBlockedPipeline(t *testing.T) {
 	picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
-	p := newPipeline(newRoundTripperBlocker(), picker, types.ID(2), types.ID(1), types.ID(1), nil, &fakeRaft{}, nil)
+	p := newPipeline(newRoundTripperBlocker(), picker, types.ID(2), types.ID(1), types.ID(1), newPeerStatus(types.ID(1)), nil, &fakeRaft{}, nil)
 	// send many messages that most of them will be blocked in buffer
 	for i := 0; i < connPerPipeline*10; i++ {
 		p.msgc <- raftpb.Message{}

+ 1 - 1
rafthttp/remote.go

@@ -30,7 +30,7 @@ func startRemote(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID,
 	picker := newURLPicker(urls)
 	return &remote{
 		id:       to,
-		pipeline: newPipeline(tr, picker, local, to, cid, nil, r, errorc),
+		pipeline: newPipeline(tr, picker, local, to, cid, newPeerStatus(to), nil, r, errorc),
 	}
 }
 

+ 35 - 18
rafthttp/stream.go

@@ -67,6 +67,19 @@ func (t streamType) endpoint() string {
 	}
 }
 
+func (t streamType) String() string {
+	switch t {
+	case streamTypeMsgApp:
+		return "stream MsgApp"
+	case streamTypeMsgAppV2:
+		return "stream MsgApp v2"
+	case streamTypeMessage:
+		return "stream Message"
+	default:
+		return "unknown stream"
+	}
+}
+
 var (
 	// linkHeartbeatMessage is a special message used as heartbeat message in
 	// link layer. It never conflicts with messages from raft because raft
@@ -89,9 +102,10 @@ type outgoingConn struct {
 // streamWriter is a long-running go-routine that writes messages into the
 // attached outgoingConn.
 type streamWriter struct {
-	id types.ID
-	fs *stats.FollowerStats
-	r  Raft
+	id     types.ID
+	status *peerStatus
+	fs     *stats.FollowerStats
+	r      Raft
 
 	mu      sync.Mutex // guard field working and closer
 	closer  io.Closer
@@ -103,15 +117,16 @@ type streamWriter struct {
 	done  chan struct{}
 }
 
-func startStreamWriter(id types.ID, fs *stats.FollowerStats, r Raft) *streamWriter {
+func startStreamWriter(id types.ID, status *peerStatus, fs *stats.FollowerStats, r Raft) *streamWriter {
 	w := &streamWriter{
-		id:    id,
-		fs:    fs,
-		r:     r,
-		msgc:  make(chan raftpb.Message, streamBufSize),
-		connc: make(chan *outgoingConn),
-		stopc: make(chan struct{}),
-		done:  make(chan struct{}),
+		id:     id,
+		status: status,
+		fs:     fs,
+		r:      r,
+		msgc:   make(chan raftpb.Message, streamBufSize),
+		connc:  make(chan *outgoingConn),
+		stopc:  make(chan struct{}),
+		done:   make(chan struct{}),
 	}
 	go w.run()
 	return w
@@ -133,7 +148,7 @@ func (cw *streamWriter) run() {
 			if err := enc.encode(linkHeartbeatMessage); err != nil {
 				reportSentFailure(string(t), linkHeartbeatMessage)
 
-				plog.Errorf("failed to heartbeat on stream %s (%v)", t, err)
+				cw.status.deactivate(failureType{source: t.String(), action: "heartbeat"}, err.Error())
 				cw.close()
 				heartbeatc, msgc = nil, nil
 				continue
@@ -155,7 +170,7 @@ func (cw *streamWriter) run() {
 			if err := enc.encode(m); err != nil {
 				reportSentFailure(string(t), m)
 
-				plog.Errorf("failed to send message on stream %s (%v)", t, err)
+				cw.status.deactivate(failureType{source: t.String(), action: "write"}, err.Error())
 				cw.close()
 				heartbeatc, msgc = nil, nil
 				cw.r.ReportUnreachable(m.To)
@@ -183,6 +198,7 @@ func (cw *streamWriter) run() {
 			}
 			flusher = conn.Flusher
 			cw.mu.Lock()
+			cw.status.activate()
 			cw.closer = conn.Closer
 			cw.working = true
 			cw.mu.Unlock()
@@ -237,6 +253,7 @@ type streamReader struct {
 	t        streamType
 	from, to types.ID
 	cid      types.ID
+	status   *peerStatus
 	recvc    chan<- raftpb.Message
 	propc    chan<- raftpb.Message
 	errorc   chan<- error
@@ -249,7 +266,7 @@ type streamReader struct {
 	done       chan struct{}
 }
 
-func startStreamReader(tr http.RoundTripper, picker *urlPicker, t streamType, from, to, cid types.ID, recvc chan<- raftpb.Message, propc chan<- raftpb.Message, errorc chan<- error) *streamReader {
+func startStreamReader(tr http.RoundTripper, picker *urlPicker, t streamType, from, to, cid types.ID, status *peerStatus, recvc chan<- raftpb.Message, propc chan<- raftpb.Message, errorc chan<- error) *streamReader {
 	r := &streamReader{
 		tr:     tr,
 		picker: picker,
@@ -257,6 +274,7 @@ func startStreamReader(tr http.RoundTripper, picker *urlPicker, t streamType, fr
 		from:   from,
 		to:     to,
 		cid:    cid,
+		status: status,
 		recvc:  recvc,
 		propc:  propc,
 		errorc: errorc,
@@ -279,11 +297,10 @@ func (cr *streamReader) run() {
 		}
 		if err != nil {
 			if err != errUnsupportedStreamType {
-				// TODO: log start and end of the stream, and print
-				// error in backoff way
-				plog.Errorf("failed to dial stream %s (%v)", t, err)
+				cr.status.deactivate(failureType{source: t.String(), action: "dial"}, err.Error())
 			}
 		} else {
+			cr.status.activate()
 			err := cr.decodeLoop(rc, t)
 			switch {
 			// all data is read out
@@ -294,7 +311,7 @@ func (cr *streamReader) run() {
 			// heartbeat on the idle stream, so it is expected to time out.
 			case t == streamTypeMsgApp && isNetworkTimeoutError(err):
 			default:
-				plog.Errorf("failed to read message on stream %s (%v)", t, err)
+				cr.status.deactivate(failureType{source: t.String(), action: "read"}, err.Error())
 			}
 		}
 		select {

+ 4 - 4
rafthttp/stream_test.go

@@ -21,7 +21,7 @@ import (
 // to streamWriter. After that, streamWriter can use it to send messages
 // continuously, and closes it when stopped.
 func TestStreamWriterAttachOutgoingConn(t *testing.T) {
-	sw := startStreamWriter(types.ID(1), &stats.FollowerStats{}, &fakeRaft{})
+	sw := startStreamWriter(types.ID(1), newPeerStatus(types.ID(1)), &stats.FollowerStats{}, &fakeRaft{})
 	// the expected initial state of streamWrite is not working
 	if _, ok := sw.writec(); ok != false {
 		t.Errorf("initial working status = %v, want false", ok)
@@ -67,7 +67,7 @@ func TestStreamWriterAttachOutgoingConn(t *testing.T) {
 // TestStreamWriterAttachBadOutgoingConn tests that streamWriter with bad
 // outgoingConn will close the outgoingConn and fall back to non-working status.
 func TestStreamWriterAttachBadOutgoingConn(t *testing.T) {
-	sw := startStreamWriter(types.ID(1), &stats.FollowerStats{}, &fakeRaft{})
+	sw := startStreamWriter(types.ID(1), newPeerStatus(types.ID(1)), &stats.FollowerStats{}, &fakeRaft{})
 	defer sw.stop()
 	wfc := &fakeWriteFlushCloser{err: errors.New("blah")}
 	sw.attach(&outgoingConn{t: streamTypeMessage, Writer: wfc, Flusher: wfc, Closer: wfc})
@@ -269,12 +269,12 @@ func TestStream(t *testing.T) {
 		srv := httptest.NewServer(h)
 		defer srv.Close()
 
-		sw := startStreamWriter(types.ID(1), &stats.FollowerStats{}, &fakeRaft{})
+		sw := startStreamWriter(types.ID(1), newPeerStatus(types.ID(1)), &stats.FollowerStats{}, &fakeRaft{})
 		defer sw.stop()
 		h.sw = sw
 
 		picker := mustNewURLPicker(t, []string{srv.URL})
-		sr := startStreamReader(&http.Transport{}, picker, tt.t, types.ID(1), types.ID(2), types.ID(1), recvc, propc, nil)
+		sr := startStreamReader(&http.Transport{}, picker, tt.t, types.ID(1), types.ID(2), types.ID(1), newPeerStatus(types.ID(1)), recvc, propc, nil)
 		defer sr.stop()
 		if tt.t == streamTypeMsgApp {
 			sr.updateMsgAppTerm(tt.term)

+ 1 - 1
rafthttp/transport.go

@@ -26,7 +26,7 @@ import (
 	"github.com/coreos/etcd/raft/raftpb"
 )
 
-var plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "snap")
+var plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "rafthttp")
 
 type Raft interface {
 	Process(ctx context.Context, m raftpb.Message) error