Browse Source

rafthttp: pretty print connection error

1. print out the status change of connection with peer
2. only print the first error for repeated ones
Yicheng Qin 10 years ago
parent
commit
1dbe72bb74
7 changed files with 125 additions and 58 deletions
  1. 6 5
      rafthttp/peer.go
  2. 67 0
      rafthttp/peer_status.go
  3. 5 23
      rafthttp/pipeline.go
  4. 7 7
      rafthttp/pipeline_test.go
  5. 1 1
      rafthttp/remote.go
  6. 35 18
      rafthttp/stream.go
  7. 4 4
      rafthttp/stream_test.go

+ 6 - 5
rafthttp/peer.go

@@ -109,12 +109,13 @@ type peer struct {
 
 func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error) *peer {
 	picker := newURLPicker(urls)
+	status := newPeerStatus(to)
 	p := &peer{
 		id:           to,
 		r:            r,
-		msgAppWriter: startStreamWriter(to, fs, r),
-		writer:       startStreamWriter(to, fs, r),
-		pipeline:     newPipeline(tr, picker, local, to, cid, fs, r, errorc),
+		msgAppWriter: startStreamWriter(to, status, fs, r),
+		writer:       startStreamWriter(to, status, fs, r),
+		pipeline:     newPipeline(tr, picker, local, to, cid, status, fs, r, errorc),
 		sendc:        make(chan raftpb.Message),
 		recvc:        make(chan raftpb.Message, recvBufSize),
 		propc:        make(chan raftpb.Message, maxPendingProposals),
@@ -144,8 +145,8 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r
 
 	go func() {
 		var paused bool
-		p.msgAppReader = startStreamReader(tr, picker, streamTypeMsgAppV2, local, to, cid, p.recvc, p.propc, errorc)
-		reader := startStreamReader(tr, picker, streamTypeMessage, local, to, cid, p.recvc, p.propc, errorc)
+		p.msgAppReader = startStreamReader(tr, picker, streamTypeMsgAppV2, local, to, cid, status, p.recvc, p.propc, errorc)
+		reader := startStreamReader(tr, picker, streamTypeMessage, local, to, cid, status, p.recvc, p.propc, errorc)
 		for {
 			select {
 			case m := <-p.sendc:

+ 67 - 0
rafthttp/peer_status.go

@@ -0,0 +1,67 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rafthttp
+
+import (
+	"fmt"
+	"sync"
+
+	"github.com/coreos/etcd/pkg/types"
+)
+
+type failureType struct {
+	source string
+	action string
+}
+
+type peerStatus struct {
+	id         types.ID
+	mu         sync.Mutex // protect active and failureMap
+	active     bool
+	failureMap map[failureType]string
+}
+
+func newPeerStatus(id types.ID) *peerStatus {
+	return &peerStatus{
+		id:         id,
+		failureMap: make(map[failureType]string),
+	}
+}
+
+func (s *peerStatus) activate() {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	if !s.active {
+		plog.Infof("the connection with %s became active", s.id)
+		s.active = true
+		s.failureMap = make(map[failureType]string)
+	}
+}
+
+func (s *peerStatus) deactivate(failure failureType, reason string) {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	if s.active {
+		plog.Infof("the connection with %s became inactive", s.id)
+		s.active = false
+	}
+	logline := fmt.Sprintf("failed to %s %s on %s (%s)", failure.action, s.id, failure.source, reason)
+	if r, ok := s.failureMap[failure]; ok && r == reason {
+		plog.Debugf(logline)
+		return
+	}
+	s.failureMap[failure] = reason
+	plog.Errorf(logline)
+}

+ 5 - 23
rafthttp/pipeline.go

@@ -53,6 +53,7 @@ type pipeline struct {
 
 	tr     http.RoundTripper
 	picker *urlPicker
+	status *peerStatus
 	fs     *stats.FollowerStats
 	r      Raft
 	errorc chan error
@@ -61,26 +62,21 @@ type pipeline struct {
 	// wait for the handling routines
 	wg    sync.WaitGroup
 	stopc chan struct{}
-	sync.Mutex
-	// if the last send was successful, the pipeline is active.
-	// Or it is inactive
-	active  bool
-	errored error
 }
 
-func newPipeline(tr http.RoundTripper, picker *urlPicker, from, to, cid types.ID, fs *stats.FollowerStats, r Raft, errorc chan error) *pipeline {
+func newPipeline(tr http.RoundTripper, picker *urlPicker, from, to, cid types.ID, status *peerStatus, fs *stats.FollowerStats, r Raft, errorc chan error) *pipeline {
 	p := &pipeline{
 		from:   from,
 		to:     to,
 		cid:    cid,
 		tr:     tr,
 		picker: picker,
+		status: status,
 		fs:     fs,
 		r:      r,
 		errorc: errorc,
 		stopc:  make(chan struct{}),
 		msgc:   make(chan raftpb.Message, pipelineBufSize),
-		active: true,
 	}
 	p.wg.Add(connPerPipeline)
 	for i := 0; i < connPerPipeline; i++ {
@@ -105,18 +101,9 @@ func (p *pipeline) handle() {
 		}
 		end := time.Now()
 
-		p.Lock()
 		if err != nil {
 			reportSentFailure(pipelineMsg, m)
-
-			if p.errored == nil || p.errored.Error() != err.Error() {
-				plog.Errorf("failed to post to %s (%v)", p.to, err)
-				p.errored = err
-			}
-			if p.active {
-				plog.Infof("the connection with %s became inactive", p.to)
-				p.active = false
-			}
+			p.status.deactivate(failureType{source: pipelineMsg, action: "write"}, err.Error())
 			if m.Type == raftpb.MsgApp && p.fs != nil {
 				p.fs.Fail()
 			}
@@ -125,11 +112,7 @@ func (p *pipeline) handle() {
 				p.r.ReportSnapshot(m.To, raft.SnapshotFailure)
 			}
 		} else {
-			if !p.active {
-				plog.Infof("the connection with %s became active", p.to)
-				p.active = true
-				p.errored = nil
-			}
+			p.status.activate()
 			if m.Type == raftpb.MsgApp && p.fs != nil {
 				p.fs.Succ(end.Sub(start))
 			}
@@ -138,7 +121,6 @@ func (p *pipeline) handle() {
 			}
 			reportSentDuration(pipelineMsg, m, time.Since(start))
 		}
-		p.Unlock()
 	}
 }
 

+ 7 - 7
rafthttp/pipeline_test.go

@@ -36,7 +36,7 @@ func TestPipelineSend(t *testing.T) {
 	tr := &roundTripperRecorder{}
 	picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
 	fs := &stats.FollowerStats{}
-	p := newPipeline(tr, picker, types.ID(2), types.ID(1), types.ID(1), fs, &fakeRaft{}, nil)
+	p := newPipeline(tr, picker, types.ID(2), types.ID(1), types.ID(1), newPeerStatus(types.ID(1)), fs, &fakeRaft{}, nil)
 
 	p.msgc <- raftpb.Message{Type: raftpb.MsgApp}
 	testutil.ForceGosched()
@@ -56,7 +56,7 @@ func TestPipelineExceedMaximalServing(t *testing.T) {
 	tr := newRoundTripperBlocker()
 	picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
 	fs := &stats.FollowerStats{}
-	p := newPipeline(tr, picker, types.ID(2), types.ID(1), types.ID(1), fs, &fakeRaft{}, nil)
+	p := newPipeline(tr, picker, types.ID(2), types.ID(1), types.ID(1), newPeerStatus(types.ID(1)), fs, &fakeRaft{}, nil)
 
 	// keep the sender busy and make the buffer full
 	// nothing can go out as we block the sender
@@ -96,7 +96,7 @@ func TestPipelineExceedMaximalServing(t *testing.T) {
 func TestPipelineSendFailed(t *testing.T) {
 	picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
 	fs := &stats.FollowerStats{}
-	p := newPipeline(newRespRoundTripper(0, errors.New("blah")), picker, types.ID(2), types.ID(1), types.ID(1), fs, &fakeRaft{}, nil)
+	p := newPipeline(newRespRoundTripper(0, errors.New("blah")), picker, types.ID(2), types.ID(1), types.ID(1), newPeerStatus(types.ID(1)), fs, &fakeRaft{}, nil)
 
 	p.msgc <- raftpb.Message{Type: raftpb.MsgApp}
 	testutil.ForceGosched()
@@ -112,7 +112,7 @@ func TestPipelineSendFailed(t *testing.T) {
 func TestPipelinePost(t *testing.T) {
 	tr := &roundTripperRecorder{}
 	picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
-	p := newPipeline(tr, picker, types.ID(2), types.ID(1), types.ID(1), nil, &fakeRaft{}, nil)
+	p := newPipeline(tr, picker, types.ID(2), types.ID(1), types.ID(1), newPeerStatus(types.ID(1)), nil, &fakeRaft{}, nil)
 	if err := p.post([]byte("some data")); err != nil {
 		t.Fatalf("unexpect post error: %v", err)
 	}
@@ -159,7 +159,7 @@ func TestPipelinePostBad(t *testing.T) {
 	}
 	for i, tt := range tests {
 		picker := mustNewURLPicker(t, []string{tt.u})
-		p := newPipeline(newRespRoundTripper(tt.code, tt.err), picker, types.ID(2), types.ID(1), types.ID(1), nil, &fakeRaft{}, make(chan error))
+		p := newPipeline(newRespRoundTripper(tt.code, tt.err), picker, types.ID(2), types.ID(1), types.ID(1), newPeerStatus(types.ID(1)), nil, &fakeRaft{}, make(chan error))
 		err := p.post([]byte("some data"))
 		p.stop()
 
@@ -180,7 +180,7 @@ func TestPipelinePostErrorc(t *testing.T) {
 	for i, tt := range tests {
 		picker := mustNewURLPicker(t, []string{tt.u})
 		errorc := make(chan error, 1)
-		p := newPipeline(newRespRoundTripper(tt.code, tt.err), picker, types.ID(2), types.ID(1), types.ID(1), nil, &fakeRaft{}, errorc)
+		p := newPipeline(newRespRoundTripper(tt.code, tt.err), picker, types.ID(2), types.ID(1), types.ID(1), newPeerStatus(types.ID(1)), nil, &fakeRaft{}, errorc)
 		p.post([]byte("some data"))
 		p.stop()
 		select {
@@ -193,7 +193,7 @@ func TestPipelinePostErrorc(t *testing.T) {
 
 func TestStopBlockedPipeline(t *testing.T) {
 	picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
-	p := newPipeline(newRoundTripperBlocker(), picker, types.ID(2), types.ID(1), types.ID(1), nil, &fakeRaft{}, nil)
+	p := newPipeline(newRoundTripperBlocker(), picker, types.ID(2), types.ID(1), types.ID(1), newPeerStatus(types.ID(1)), nil, &fakeRaft{}, nil)
 	// send many messages that most of them will be blocked in buffer
 	for i := 0; i < connPerPipeline*10; i++ {
 		p.msgc <- raftpb.Message{}

+ 1 - 1
rafthttp/remote.go

@@ -30,7 +30,7 @@ func startRemote(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID,
 	picker := newURLPicker(urls)
 	return &remote{
 		id:       to,
-		pipeline: newPipeline(tr, picker, local, to, cid, nil, r, errorc),
+		pipeline: newPipeline(tr, picker, local, to, cid, newPeerStatus(to), nil, r, errorc),
 	}
 }
 

+ 35 - 18
rafthttp/stream.go

@@ -67,6 +67,19 @@ func (t streamType) endpoint() string {
 	}
 }
 
+func (t streamType) String() string {
+	switch t {
+	case streamTypeMsgApp:
+		return "stream MsgApp"
+	case streamTypeMsgAppV2:
+		return "stream MsgApp v2"
+	case streamTypeMessage:
+		return "stream Message"
+	default:
+		return "unknown stream"
+	}
+}
+
 var (
 	// linkHeartbeatMessage is a special message used as heartbeat message in
 	// link layer. It never conflicts with messages from raft because raft
@@ -89,9 +102,10 @@ type outgoingConn struct {
 // streamWriter is a long-running go-routine that writes messages into the
 // attached outgoingConn.
 type streamWriter struct {
-	id types.ID
-	fs *stats.FollowerStats
-	r  Raft
+	id     types.ID
+	status *peerStatus
+	fs     *stats.FollowerStats
+	r      Raft
 
 	mu      sync.Mutex // guard field working and closer
 	closer  io.Closer
@@ -103,15 +117,16 @@ type streamWriter struct {
 	done  chan struct{}
 }
 
-func startStreamWriter(id types.ID, fs *stats.FollowerStats, r Raft) *streamWriter {
+func startStreamWriter(id types.ID, status *peerStatus, fs *stats.FollowerStats, r Raft) *streamWriter {
 	w := &streamWriter{
-		id:    id,
-		fs:    fs,
-		r:     r,
-		msgc:  make(chan raftpb.Message, streamBufSize),
-		connc: make(chan *outgoingConn),
-		stopc: make(chan struct{}),
-		done:  make(chan struct{}),
+		id:     id,
+		status: status,
+		fs:     fs,
+		r:      r,
+		msgc:   make(chan raftpb.Message, streamBufSize),
+		connc:  make(chan *outgoingConn),
+		stopc:  make(chan struct{}),
+		done:   make(chan struct{}),
 	}
 	go w.run()
 	return w
@@ -133,7 +148,7 @@ func (cw *streamWriter) run() {
 			if err := enc.encode(linkHeartbeatMessage); err != nil {
 				reportSentFailure(string(t), linkHeartbeatMessage)
 
-				plog.Errorf("failed to heartbeat on stream %s (%v)", t, err)
+				cw.status.deactivate(failureType{source: t.String(), action: "heartbeat"}, err.Error())
 				cw.close()
 				heartbeatc, msgc = nil, nil
 				continue
@@ -155,7 +170,7 @@ func (cw *streamWriter) run() {
 			if err := enc.encode(m); err != nil {
 				reportSentFailure(string(t), m)
 
-				plog.Errorf("failed to send message on stream %s (%v)", t, err)
+				cw.status.deactivate(failureType{source: t.String(), action: "write"}, err.Error())
 				cw.close()
 				heartbeatc, msgc = nil, nil
 				cw.r.ReportUnreachable(m.To)
@@ -183,6 +198,7 @@ func (cw *streamWriter) run() {
 			}
 			flusher = conn.Flusher
 			cw.mu.Lock()
+			cw.status.activate()
 			cw.closer = conn.Closer
 			cw.working = true
 			cw.mu.Unlock()
@@ -237,6 +253,7 @@ type streamReader struct {
 	t        streamType
 	from, to types.ID
 	cid      types.ID
+	status   *peerStatus
 	recvc    chan<- raftpb.Message
 	propc    chan<- raftpb.Message
 	errorc   chan<- error
@@ -249,7 +266,7 @@ type streamReader struct {
 	done       chan struct{}
 }
 
-func startStreamReader(tr http.RoundTripper, picker *urlPicker, t streamType, from, to, cid types.ID, recvc chan<- raftpb.Message, propc chan<- raftpb.Message, errorc chan<- error) *streamReader {
+func startStreamReader(tr http.RoundTripper, picker *urlPicker, t streamType, from, to, cid types.ID, status *peerStatus, recvc chan<- raftpb.Message, propc chan<- raftpb.Message, errorc chan<- error) *streamReader {
 	r := &streamReader{
 		tr:     tr,
 		picker: picker,
@@ -257,6 +274,7 @@ func startStreamReader(tr http.RoundTripper, picker *urlPicker, t streamType, fr
 		from:   from,
 		to:     to,
 		cid:    cid,
+		status: status,
 		recvc:  recvc,
 		propc:  propc,
 		errorc: errorc,
@@ -279,11 +297,10 @@ func (cr *streamReader) run() {
 		}
 		if err != nil {
 			if err != errUnsupportedStreamType {
-				// TODO: log start and end of the stream, and print
-				// error in backoff way
-				plog.Errorf("failed to dial stream %s (%v)", t, err)
+				cr.status.deactivate(failureType{source: t.String(), action: "dial"}, err.Error())
 			}
 		} else {
+			cr.status.activate()
 			err := cr.decodeLoop(rc, t)
 			switch {
 			// all data is read out
@@ -294,7 +311,7 @@ func (cr *streamReader) run() {
 			// heartbeat on the idle stream, so it is expected to time out.
 			case t == streamTypeMsgApp && isNetworkTimeoutError(err):
 			default:
-				plog.Errorf("failed to read message on stream %s (%v)", t, err)
+				cr.status.deactivate(failureType{source: t.String(), action: "read"}, err.Error())
 			}
 		}
 		select {

+ 4 - 4
rafthttp/stream_test.go

@@ -21,7 +21,7 @@ import (
 // to streamWriter. After that, streamWriter can use it to send messages
 // continuously, and closes it when stopped.
 func TestStreamWriterAttachOutgoingConn(t *testing.T) {
-	sw := startStreamWriter(types.ID(1), &stats.FollowerStats{}, &fakeRaft{})
+	sw := startStreamWriter(types.ID(1), newPeerStatus(types.ID(1)), &stats.FollowerStats{}, &fakeRaft{})
 	// the expected initial state of streamWrite is not working
 	if _, ok := sw.writec(); ok != false {
 		t.Errorf("initial working status = %v, want false", ok)
@@ -67,7 +67,7 @@ func TestStreamWriterAttachOutgoingConn(t *testing.T) {
 // TestStreamWriterAttachBadOutgoingConn tests that streamWriter with bad
 // outgoingConn will close the outgoingConn and fall back to non-working status.
 func TestStreamWriterAttachBadOutgoingConn(t *testing.T) {
-	sw := startStreamWriter(types.ID(1), &stats.FollowerStats{}, &fakeRaft{})
+	sw := startStreamWriter(types.ID(1), newPeerStatus(types.ID(1)), &stats.FollowerStats{}, &fakeRaft{})
 	defer sw.stop()
 	wfc := &fakeWriteFlushCloser{err: errors.New("blah")}
 	sw.attach(&outgoingConn{t: streamTypeMessage, Writer: wfc, Flusher: wfc, Closer: wfc})
@@ -269,12 +269,12 @@ func TestStream(t *testing.T) {
 		srv := httptest.NewServer(h)
 		defer srv.Close()
 
-		sw := startStreamWriter(types.ID(1), &stats.FollowerStats{}, &fakeRaft{})
+		sw := startStreamWriter(types.ID(1), newPeerStatus(types.ID(1)), &stats.FollowerStats{}, &fakeRaft{})
 		defer sw.stop()
 		h.sw = sw
 
 		picker := mustNewURLPicker(t, []string{srv.URL})
-		sr := startStreamReader(&http.Transport{}, picker, tt.t, types.ID(1), types.ID(2), types.ID(1), recvc, propc, nil)
+		sr := startStreamReader(&http.Transport{}, picker, tt.t, types.ID(1), types.ID(2), types.ID(1), newPeerStatus(types.ID(1)), recvc, propc, nil)
 		defer sr.stop()
 		if tt.t == streamTypeMsgApp {
 			sr.updateMsgAppTerm(tt.term)