Browse Source

rafthttp: support structured logger

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
Gyuho Lee 7 years ago
parent
commit
c68f625353

+ 230 - 33
rafthttp/http.go

@@ -28,6 +28,8 @@ import (
 	"github.com/coreos/etcd/raft/raftpb"
 	"github.com/coreos/etcd/raftsnap"
 	"github.com/coreos/etcd/version"
+
+	"go.uber.org/zap"
 )
 
 const (
@@ -59,9 +61,11 @@ type writerToResponse interface {
 }
 
 type pipelineHandler struct {
-	tr  Transporter
-	r   Raft
-	cid types.ID
+	lg      *zap.Logger
+	localID types.ID
+	tr      Transporter
+	r       Raft
+	cid     types.ID
 }
 
 // newPipelineHandler returns a handler for handling raft messages
@@ -69,11 +73,13 @@ type pipelineHandler struct {
 //
 // The handler reads out the raft message from request body,
 // and forwards it to the given raft state machine for processing.
-func newPipelineHandler(tr Transporter, r Raft, cid types.ID) http.Handler {
+func newPipelineHandler(t *Transport, r Raft, cid types.ID) http.Handler {
 	return &pipelineHandler{
-		tr:  tr,
-		r:   r,
-		cid: cid,
+		lg:      t.Logger,
+		localID: t.ID,
+		tr:      t,
+		r:       r,
+		cid:     cid,
 	}
 }
 
@@ -86,7 +92,7 @@ func (h *pipelineHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 
 	w.Header().Set("X-Etcd-Cluster-ID", h.cid.String())
 
-	if err := checkClusterCompatibilityFromHeader(r.Header, h.cid); err != nil {
+	if err := checkClusterCompatibilityFromHeader(h.lg, h.localID, r.Header, h.cid); err != nil {
 		http.Error(w, err.Error(), http.StatusPreconditionFailed)
 		return
 	}
@@ -98,7 +104,15 @@ func (h *pipelineHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 	limitedr := pioutil.NewLimitedBufferReader(r.Body, connReadLimitByte)
 	b, err := ioutil.ReadAll(limitedr)
 	if err != nil {
-		plog.Errorf("failed to read raft message (%v)", err)
+		if h.lg != nil {
+			h.lg.Warn(
+				"failed to read Raft message",
+				zap.String("local-member-id", h.localID.String()),
+				zap.Error(err),
+			)
+		} else {
+			plog.Errorf("failed to read raft message (%v)", err)
+		}
 		http.Error(w, "error reading raft message", http.StatusBadRequest)
 		recvFailures.WithLabelValues(r.RemoteAddr).Inc()
 		return
@@ -106,7 +120,15 @@ func (h *pipelineHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 
 	var m raftpb.Message
 	if err := m.Unmarshal(b); err != nil {
-		plog.Errorf("failed to unmarshal raft message (%v)", err)
+		if h.lg != nil {
+			h.lg.Warn(
+				"failed to unmarshal Raft message",
+				zap.String("local-member-id", h.localID.String()),
+				zap.Error(err),
+			)
+		} else {
+			plog.Errorf("failed to unmarshal raft message (%v)", err)
+		}
 		http.Error(w, "error unmarshaling raft message", http.StatusBadRequest)
 		recvFailures.WithLabelValues(r.RemoteAddr).Inc()
 		return
@@ -119,7 +141,15 @@ func (h *pipelineHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 		case writerToResponse:
 			v.WriteTo(w)
 		default:
-			plog.Warningf("failed to process raft message (%v)", err)
+			if h.lg != nil {
+				h.lg.Warn(
+					"failed to process Raft message",
+					zap.String("local-member-id", h.localID.String()),
+					zap.Error(err),
+				)
+			} else {
+				plog.Warningf("failed to process raft message (%v)", err)
+			}
 			http.Error(w, "error processing raft message", http.StatusInternalServerError)
 			w.(http.Flusher).Flush()
 			// disconnect the http stream
@@ -134,17 +164,22 @@ func (h *pipelineHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 }
 
 type snapshotHandler struct {
+	lg          *zap.Logger
 	tr          Transporter
 	r           Raft
 	snapshotter *raftsnap.Snapshotter
-	cid         types.ID
+
+	localID types.ID
+	cid     types.ID
 }
 
-func newSnapshotHandler(tr Transporter, r Raft, snapshotter *raftsnap.Snapshotter, cid types.ID) http.Handler {
+func newSnapshotHandler(t *Transport, r Raft, snapshotter *raftsnap.Snapshotter, cid types.ID) http.Handler {
 	return &snapshotHandler{
-		tr:          tr,
+		lg:          t.Logger,
+		tr:          t,
 		r:           r,
 		snapshotter: snapshotter,
+		localID:     t.ID,
 		cid:         cid,
 	}
 }
@@ -167,7 +202,7 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 
 	w.Header().Set("X-Etcd-Cluster-ID", h.cid.String())
 
-	if err := checkClusterCompatibilityFromHeader(r.Header, h.cid); err != nil {
+	if err := checkClusterCompatibilityFromHeader(h.lg, h.localID, r.Header, h.cid); err != nil {
 		http.Error(w, err.Error(), http.StatusPreconditionFailed)
 		return
 	}
@@ -179,7 +214,16 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 	m, err := dec.decodeLimit(uint64(1 << 63))
 	if err != nil {
 		msg := fmt.Sprintf("failed to decode raft message (%v)", err)
-		plog.Errorf(msg)
+		if h.lg != nil {
+			h.lg.Warn(
+				"failed to decode Raft message",
+				zap.String("local-member-id", h.localID.String()),
+				zap.String("remote-snapshot-sender-id", types.ID(m.From).String()),
+				zap.Error(err),
+			)
+		} else {
+			plog.Error(msg)
+		}
 		http.Error(w, msg, http.StatusBadRequest)
 		recvFailures.WithLabelValues(r.RemoteAddr).Inc()
 		return
@@ -188,22 +232,61 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 	receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(m.Size()))
 
 	if m.Type != raftpb.MsgSnap {
-		plog.Errorf("unexpected raft message type %s on snapshot path", m.Type)
+		if h.lg != nil {
+			h.lg.Warn(
+				"unexpected Raft message type",
+				zap.String("local-member-id", h.localID.String()),
+				zap.String("remote-snapshot-sender-id", types.ID(m.From).String()),
+				zap.String("message-type", m.Type.String()),
+			)
+		} else {
+			plog.Errorf("unexpected raft message type %s on snapshot path", m.Type)
+		}
 		http.Error(w, "wrong raft message type", http.StatusBadRequest)
 		return
 	}
 
-	plog.Infof("receiving database snapshot [index:%d, from %s] ...", m.Snapshot.Metadata.Index, types.ID(m.From))
+	if h.lg != nil {
+		h.lg.Info(
+			"receiving database snapshot",
+			zap.String("local-member-id", h.localID.String()),
+			zap.String("remote-snapshot-sender-id", types.ID(m.From).String()),
+			zap.Uint64("snapshot-index", m.Snapshot.Metadata.Index),
+		)
+	} else {
+		plog.Infof("receiving database snapshot [index:%d, from %s] ...", m.Snapshot.Metadata.Index, types.ID(m.From))
+	}
+
 	// save incoming database snapshot.
 	n, err := h.snapshotter.SaveDBFrom(r.Body, m.Snapshot.Metadata.Index)
 	if err != nil {
 		msg := fmt.Sprintf("failed to save KV snapshot (%v)", err)
-		plog.Error(msg)
+		if h.lg != nil {
+			h.lg.Warn(
+				"failed to save KV snapshot",
+				zap.String("local-member-id", h.localID.String()),
+				zap.String("remote-snapshot-sender-id", types.ID(m.From).String()),
+				zap.Error(err),
+			)
+		} else {
+			plog.Error(msg)
+		}
 		http.Error(w, msg, http.StatusInternalServerError)
 		return
 	}
+
 	receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(n))
-	plog.Infof("received and saved database snapshot [index: %d, from: %s] successfully", m.Snapshot.Metadata.Index, types.ID(m.From))
+
+	if h.lg != nil {
+		h.lg.Info(
+			"received and saved database snapshot",
+			zap.String("local-member-id", h.localID.String()),
+			zap.String("remote-snapshot-sender-id", types.ID(m.From).String()),
+			zap.Uint64("snapshot-index", m.Snapshot.Metadata.Index),
+		)
+	} else {
+		plog.Infof("received and saved database snapshot [index: %d, from: %s] successfully", m.Snapshot.Metadata.Index, types.ID(m.From))
+	}
 
 	if err := h.r.Process(context.TODO(), m); err != nil {
 		switch v := err.(type) {
@@ -213,17 +296,28 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 			v.WriteTo(w)
 		default:
 			msg := fmt.Sprintf("failed to process raft message (%v)", err)
-			plog.Warningf(msg)
+			if h.lg != nil {
+				h.lg.Warn(
+					"failed to process Raft message",
+					zap.String("local-member-id", h.localID.String()),
+					zap.String("remote-snapshot-sender-id", types.ID(m.From).String()),
+					zap.Error(err),
+				)
+			} else {
+				plog.Error(msg)
+			}
 			http.Error(w, msg, http.StatusInternalServerError)
 		}
 		return
 	}
+
 	// Write StatusNoContent header after the message has been processed by
 	// raft, which facilitates the client to report MsgSnap status.
 	w.WriteHeader(http.StatusNoContent)
 }
 
 type streamHandler struct {
+	lg         *zap.Logger
 	tr         *Transport
 	peerGetter peerGetter
 	r          Raft
@@ -231,9 +325,10 @@ type streamHandler struct {
 	cid        types.ID
 }
 
-func newStreamHandler(tr *Transport, pg peerGetter, r Raft, id, cid types.ID) http.Handler {
+func newStreamHandler(t *Transport, pg peerGetter, r Raft, id, cid types.ID) http.Handler {
 	return &streamHandler{
-		tr:         tr,
+		lg:         t.Logger,
+		tr:         t,
 		peerGetter: pg,
 		r:          r,
 		id:         id,
@@ -251,7 +346,7 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 	w.Header().Set("X-Server-Version", version.Version)
 	w.Header().Set("X-Etcd-Cluster-ID", h.cid.String())
 
-	if err := checkClusterCompatibilityFromHeader(r.Header, h.cid); err != nil {
+	if err := checkClusterCompatibilityFromHeader(h.lg, h.tr.ID, r.Header, h.cid); err != nil {
 		http.Error(w, err.Error(), http.StatusPreconditionFailed)
 		return
 	}
@@ -263,7 +358,16 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 	case streamTypeMessage.endpoint():
 		t = streamTypeMessage
 	default:
-		plog.Debugf("ignored unexpected streaming request path %s", r.URL.Path)
+		if h.lg != nil {
+			h.lg.Debug(
+				"ignored unexpected streaming request path",
+				zap.String("local-member-id", h.tr.ID.String()),
+				zap.String("remote-peer-id-stream-handler", h.id.String()),
+				zap.String("path", r.URL.Path),
+			)
+		} else {
+			plog.Debugf("ignored unexpected streaming request path %s", r.URL.Path)
+		}
 		http.Error(w, "invalid path", http.StatusNotFound)
 		return
 	}
@@ -271,12 +375,31 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 	fromStr := path.Base(r.URL.Path)
 	from, err := types.IDFromString(fromStr)
 	if err != nil {
-		plog.Errorf("failed to parse from %s into ID (%v)", fromStr, err)
+		if h.lg != nil {
+			h.lg.Warn(
+				"failed to parse path into ID",
+				zap.String("local-member-id", h.tr.ID.String()),
+				zap.String("remote-peer-id-stream-handler", h.id.String()),
+				zap.String("path", fromStr),
+				zap.Error(err),
+			)
+		} else {
+			plog.Errorf("failed to parse from %s into ID (%v)", fromStr, err)
+		}
 		http.Error(w, "invalid from", http.StatusNotFound)
 		return
 	}
 	if h.r.IsIDRemoved(uint64(from)) {
-		plog.Warningf("rejected the stream from peer %s since it was removed", from)
+		if h.lg != nil {
+			h.lg.Warn(
+				"rejected stream from remote peer because it was removed",
+				zap.String("local-member-id", h.tr.ID.String()),
+				zap.String("remote-peer-id-stream-handler", h.id.String()),
+				zap.String("remote-peer-id-from", from.String()),
+			)
+		} else {
+			plog.Warningf("rejected the stream from peer %s since it was removed", from)
+		}
 		http.Error(w, "removed member", http.StatusGone)
 		return
 	}
@@ -290,14 +413,35 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 		if urls := r.Header.Get("X-PeerURLs"); urls != "" {
 			h.tr.AddRemote(from, strings.Split(urls, ","))
 		}
-		plog.Errorf("failed to find member %s in cluster %s", from, h.cid)
+		if h.lg != nil {
+			h.lg.Warn(
+				"failed to find remote peer in cluster",
+				zap.String("local-member-id", h.tr.ID.String()),
+				zap.String("remote-peer-id-stream-handler", h.id.String()),
+				zap.String("remote-peer-id-from", from.String()),
+				zap.String("cluster-id", h.cid.String()),
+			)
+		} else {
+			plog.Errorf("failed to find member %s in cluster %s", from, h.cid)
+		}
 		http.Error(w, "error sender not found", http.StatusNotFound)
 		return
 	}
 
 	wto := h.id.String()
 	if gto := r.Header.Get("X-Raft-To"); gto != wto {
-		plog.Errorf("streaming request ignored (ID mismatch got %s want %s)", gto, wto)
+		if h.lg != nil {
+			h.lg.Warn(
+				"ignored streaming request; ID mismatch",
+				zap.String("local-member-id", h.tr.ID.String()),
+				zap.String("remote-peer-id-stream-handler", h.id.String()),
+				zap.String("remote-peer-id-header", gto),
+				zap.String("remote-peer-id-from", from.String()),
+				zap.String("cluster-id", h.cid.String()),
+			)
+		} else {
+			plog.Errorf("streaming request ignored (ID mismatch got %s want %s)", gto, wto)
+		}
 		http.Error(w, "to field mismatch", http.StatusPreconditionFailed)
 		return
 	}
@@ -321,13 +465,66 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 // It checks whether the version of local member is compatible with
 // the versions in the header, and whether the cluster ID of local member
 // matches the one in the header.
-func checkClusterCompatibilityFromHeader(header http.Header, cid types.ID) error {
-	if err := checkVersionCompability(header.Get("X-Server-From"), serverVersion(header), minClusterVersion(header)); err != nil {
-		plog.Errorf("request version incompatibility (%v)", err)
+func checkClusterCompatibilityFromHeader(lg *zap.Logger, localID types.ID, header http.Header, cid types.ID) error {
+	remoteName := header.Get("X-Server-From")
+
+	remoteServer := serverVersion(header)
+	remoteVs := ""
+	if remoteServer != nil {
+		remoteVs = remoteServer.String()
+	}
+
+	remoteMinClusterVer := minClusterVersion(header)
+	remoteMinClusterVs := ""
+	if remoteMinClusterVer != nil {
+		remoteMinClusterVs = remoteMinClusterVer.String()
+	}
+
+	localServer, localMinCluster, err := checkVersionCompatibility(remoteName, remoteServer, remoteMinClusterVer)
+
+	localVs := ""
+	if localServer != nil {
+		localVs = localServer.String()
+	}
+	localMinClusterVs := ""
+	if localMinCluster != nil {
+		localMinClusterVs = localMinCluster.String()
+	}
+
+	if err != nil {
+		if lg != nil {
+			lg.Warn(
+				"failed to check version compatibility",
+				zap.String("local-member-id", localID.String()),
+				zap.String("local-member-cluster-id", cid.String()),
+				zap.String("local-member-server-version", localVs),
+				zap.String("local-member-server-minimum-cluster-version", localMinClusterVs),
+				zap.String("remote-peer-server-name", remoteName),
+				zap.String("remote-peer-server-version", remoteVs),
+				zap.String("remote-peer-server-minimum-cluster-version", remoteMinClusterVs),
+				zap.Error(err),
+			)
+		} else {
+			plog.Errorf("request version incompatibility (%v)", err)
+		}
 		return errIncompatibleVersion
 	}
 	if gcid := header.Get("X-Etcd-Cluster-ID"); gcid != cid.String() {
-		plog.Errorf("request cluster ID mismatch (got %s want %s)", gcid, cid)
+		if lg != nil {
+			lg.Warn(
+				"request cluster ID mismatch",
+				zap.String("local-member-id", localID.String()),
+				zap.String("local-member-cluster-id", cid.String()),
+				zap.String("local-member-server-version", localVs),
+				zap.String("local-member-server-minimum-cluster-version", localMinClusterVs),
+				zap.String("remote-peer-server-name", remoteName),
+				zap.String("remote-peer-server-version", remoteVs),
+				zap.String("remote-peer-server-minimum-cluster-version", remoteMinClusterVs),
+				zap.String("remote-peer-cluster-id", gcid),
+			)
+		} else {
+			plog.Errorf("request cluster ID mismatch (got %s want %s)", gcid, cid)
+		}
 		return errClusterIDMismatch
 	}
 	return nil

+ 3 - 1
rafthttp/http_test.go

@@ -31,6 +31,8 @@ import (
 	"github.com/coreos/etcd/raft/raftpb"
 	"github.com/coreos/etcd/raftsnap"
 	"github.com/coreos/etcd/version"
+
+	"go.uber.org/zap"
 )
 
 func TestServeRaftPrefix(t *testing.T) {
@@ -151,7 +153,7 @@ func TestServeRaftPrefix(t *testing.T) {
 		req.Header.Set("X-Etcd-Cluster-ID", tt.clusterID)
 		req.Header.Set("X-Server-Version", version.Version)
 		rw := httptest.NewRecorder()
-		h := newPipelineHandler(NewNopTransporter(), tt.p, types.ID(0))
+		h := newPipelineHandler(&Transport{Logger: zap.NewExample()}, tt.p, types.ID(0))
 
 		// goroutine because the handler panics to disconnect on raft error
 		donec := make(chan struct{})

+ 82 - 21
rafthttp/peer.go

@@ -25,6 +25,7 @@ import (
 	"github.com/coreos/etcd/raft/raftpb"
 	"github.com/coreos/etcd/raftsnap"
 
+	"go.uber.org/zap"
 	"golang.org/x/time/rate"
 )
 
@@ -93,9 +94,13 @@ type Peer interface {
 // A pipeline is a series of http clients that send http requests to the remote.
 // It is only used when the stream has not been established.
 type peer struct {
+	lg *zap.Logger
+
+	localID types.ID
 	// id of the remote raft peer node
 	id types.ID
-	r  Raft
+
+	r Raft
 
 	status *peerStatus
 
@@ -118,17 +123,27 @@ type peer struct {
 	stopc  chan struct{}
 }
 
-func startPeer(transport *Transport, urls types.URLs, peerID types.ID, fs *stats.FollowerStats) *peer {
-	plog.Infof("starting peer %s...", peerID)
-	defer plog.Infof("started peer %s", peerID)
+func startPeer(t *Transport, urls types.URLs, peerID types.ID, fs *stats.FollowerStats) *peer {
+	if t.Logger != nil {
+		t.Logger.Info("starting remote peer", zap.String("remote-peer-id", peerID.String()))
+	} else {
+		plog.Infof("starting peer %s...", peerID)
+	}
+	defer func() {
+		if t.Logger != nil {
+			t.Logger.Info("started remote peer", zap.String("remote-peer-id", peerID.String()))
+		} else {
+			plog.Infof("started peer %s", peerID)
+		}
+	}()
 
-	status := newPeerStatus(peerID)
+	status := newPeerStatus(t.Logger, peerID)
 	picker := newURLPicker(urls)
-	errorc := transport.ErrorC
-	r := transport.Raft
+	errorc := t.ErrorC
+	r := t.Raft
 	pipeline := &pipeline{
 		peerID:        peerID,
-		tr:            transport,
+		tr:            t,
 		picker:        picker,
 		status:        status,
 		followerStats: fs,
@@ -138,14 +153,16 @@ func startPeer(transport *Transport, urls types.URLs, peerID types.ID, fs *stats
 	pipeline.start()
 
 	p := &peer{
+		lg:             t.Logger,
+		localID:        t.ID,
 		id:             peerID,
 		r:              r,
 		status:         status,
 		picker:         picker,
-		msgAppV2Writer: startStreamWriter(peerID, status, fs, r),
-		writer:         startStreamWriter(peerID, status, fs, r),
+		msgAppV2Writer: startStreamWriter(t.Logger, t.ID, peerID, status, fs, r),
+		writer:         startStreamWriter(t.Logger, t.ID, peerID, status, fs, r),
 		pipeline:       pipeline,
-		snapSender:     newSnapshotSender(transport, picker, peerID, status),
+		snapSender:     newSnapshotSender(t, picker, peerID, status),
 		recvc:          make(chan raftpb.Message, recvBufSize),
 		propc:          make(chan raftpb.Message, maxPendingProposals),
 		stopc:          make(chan struct{}),
@@ -158,7 +175,11 @@ func startPeer(transport *Transport, urls types.URLs, peerID types.ID, fs *stats
 			select {
 			case mm := <-p.recvc:
 				if err := r.Process(ctx, mm); err != nil {
-					plog.Warningf("failed to process raft message (%v)", err)
+					if t.Logger != nil {
+						t.Logger.Warn("failed to process Raft message", zap.Error(err))
+					} else {
+						plog.Warningf("failed to process raft message (%v)", err)
+					}
 				}
 			case <-p.stopc:
 				return
@@ -183,24 +204,26 @@ func startPeer(transport *Transport, urls types.URLs, peerID types.ID, fs *stats
 	}()
 
 	p.msgAppV2Reader = &streamReader{
+		lg:     t.Logger,
 		peerID: peerID,
 		typ:    streamTypeMsgAppV2,
-		tr:     transport,
+		tr:     t,
 		picker: picker,
 		status: status,
 		recvc:  p.recvc,
 		propc:  p.propc,
-		rl:     rate.NewLimiter(transport.DialRetryFrequency, 1),
+		rl:     rate.NewLimiter(t.DialRetryFrequency, 1),
 	}
 	p.msgAppReader = &streamReader{
+		lg:     t.Logger,
 		peerID: peerID,
 		typ:    streamTypeMessage,
-		tr:     transport,
+		tr:     t,
 		picker: picker,
 		status: status,
 		recvc:  p.recvc,
 		propc:  p.propc,
-		rl:     rate.NewLimiter(transport.DialRetryFrequency, 1),
+		rl:     rate.NewLimiter(t.DialRetryFrequency, 1),
 	}
 
 	p.msgAppV2Reader.start()
@@ -227,9 +250,32 @@ func (p *peer) send(m raftpb.Message) {
 			p.r.ReportSnapshot(m.To, raft.SnapshotFailure)
 		}
 		if p.status.isActive() {
-			plog.MergeWarningf("dropped internal raft message to %s since %s's sending buffer is full (bad/overloaded network)", p.id, name)
+			if p.lg != nil {
+				p.lg.Warn(
+					"dropped internal Raft message since sending buffer is full (overloaded network)",
+					zap.String("message-type", m.Type.String()),
+					zap.String("local-member-id", p.localID.String()),
+					zap.String("from", types.ID(m.From).String()),
+					zap.String("remote-peer-id", types.ID(p.id).String()),
+					zap.Bool("remote-peer-active", p.status.isActive()),
+				)
+			} else {
+				plog.MergeWarningf("dropped internal raft message to %s since %s's sending buffer is full (bad/overloaded network)", p.id, name)
+			}
+		} else {
+			if p.lg != nil {
+				p.lg.Warn(
+					"dropped internal Raft message since sending buffer is full (overloaded network)",
+					zap.String("message-type", m.Type.String()),
+					zap.String("local-member-id", p.localID.String()),
+					zap.String("from", types.ID(m.From).String()),
+					zap.String("remote-peer-id", types.ID(p.id).String()),
+					zap.Bool("remote-peer-active", p.status.isActive()),
+				)
+			} else {
+				plog.Debugf("dropped %s to %s since %s's sending buffer is full", m.Type, p.id, name)
+			}
 		}
-		plog.Debugf("dropped %s to %s since %s's sending buffer is full", m.Type, p.id, name)
 		sentFailures.WithLabelValues(types.ID(m.To).String()).Inc()
 	}
 }
@@ -250,7 +296,11 @@ func (p *peer) attachOutgoingConn(conn *outgoingConn) {
 	case streamTypeMessage:
 		ok = p.writer.attach(conn)
 	default:
-		plog.Panicf("unhandled stream type %s", conn.t)
+		if p.lg != nil {
+			p.lg.Panic("unknown stream type", zap.String("type", conn.t.String()))
+		} else {
+			plog.Panicf("unhandled stream type %s", conn.t)
+		}
 	}
 	if !ok {
 		conn.Close()
@@ -279,8 +329,19 @@ func (p *peer) Resume() {
 }
 
 func (p *peer) stop() {
-	plog.Infof("stopping peer %s...", p.id)
-	defer plog.Infof("stopped peer %s", p.id)
+	if p.lg != nil {
+		p.lg.Info("stopping remote peer", zap.String("remote-peer-id", p.id.String()))
+	} else {
+		plog.Infof("stopping peer %s...", p.id)
+	}
+
+	defer func() {
+		if p.lg != nil {
+			p.lg.Info("stopped remote peer", zap.String("remote-peer-id", p.id.String()))
+		} else {
+			plog.Infof("stopped peer %s", p.id)
+		}
+	}()
 
 	close(p.stopc)
 	p.cancel()

+ 20 - 8
rafthttp/peer_status.go

@@ -15,11 +15,14 @@
 package rafthttp
 
 import (
+	"errors"
 	"fmt"
 	"sync"
 	"time"
 
 	"github.com/coreos/etcd/pkg/types"
+
+	"go.uber.org/zap"
 )
 
 type failureType struct {
@@ -28,23 +31,26 @@ type failureType struct {
 }
 
 type peerStatus struct {
+	lg     *zap.Logger
 	id     types.ID
 	mu     sync.Mutex // protect variables below
 	active bool
 	since  time.Time
 }
 
-func newPeerStatus(id types.ID) *peerStatus {
-	return &peerStatus{
-		id: id,
-	}
+func newPeerStatus(lg *zap.Logger, id types.ID) *peerStatus {
+	return &peerStatus{lg: lg, id: id}
 }
 
 func (s *peerStatus) activate() {
 	s.mu.Lock()
 	defer s.mu.Unlock()
 	if !s.active {
-		plog.Infof("peer %s became active", s.id)
+		if s.lg != nil {
+			s.lg.Info("peer became active", zap.String("peer-id", s.id.String()))
+		} else {
+			plog.Infof("peer %s became active", s.id)
+		}
 		s.active = true
 		s.since = time.Now()
 	}
@@ -55,13 +61,19 @@ func (s *peerStatus) deactivate(failure failureType, reason string) {
 	defer s.mu.Unlock()
 	msg := fmt.Sprintf("failed to %s %s on %s (%s)", failure.action, s.id, failure.source, reason)
 	if s.active {
-		plog.Errorf(msg)
-		plog.Infof("peer %s became inactive", s.id)
+		if s.lg != nil {
+			s.lg.Warn("peer became inactive", zap.String("peer-id", s.id.String()), zap.Error(errors.New(msg)))
+		} else {
+			plog.Errorf(msg)
+			plog.Infof("peer %s became inactive", s.id)
+		}
 		s.active = false
 		s.since = time.Time{}
 		return
 	}
-	plog.Debugf(msg)
+	if s.lg != nil {
+		s.lg.Warn("peer deactivated again", zap.String("peer-id", s.id.String()), zap.Error(errors.New(msg)))
+	}
 }
 
 func (s *peerStatus) isActive() bool {

+ 22 - 2
rafthttp/pipeline.go

@@ -27,6 +27,8 @@ import (
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/raft"
 	"github.com/coreos/etcd/raft/raftpb"
+
+	"go.uber.org/zap"
 )
 
 const (
@@ -64,13 +66,31 @@ func (p *pipeline) start() {
 	for i := 0; i < connPerPipeline; i++ {
 		go p.handle()
 	}
-	plog.Infof("started HTTP pipelining with peer %s", p.peerID)
+
+	if p.tr != nil && p.tr.Logger != nil {
+		p.tr.Logger.Info(
+			"started HTTP pipelining with remote peer",
+			zap.String("local-member-id", p.tr.ID.String()),
+			zap.String("remote-peer-id", p.peerID.String()),
+		)
+	} else {
+		plog.Infof("started HTTP pipelining with peer %s", p.peerID)
+	}
 }
 
 func (p *pipeline) stop() {
 	close(p.stopc)
 	p.wg.Wait()
-	plog.Infof("stopped HTTP pipelining with peer %s", p.peerID)
+
+	if p.tr != nil && p.tr.Logger != nil {
+		p.tr.Logger.Info(
+			"stopped HTTP pipelining with remote peer",
+			zap.String("local-member-id", p.tr.ID.String()),
+			zap.String("remote-peer-id", p.peerID.String()),
+		)
+	} else {
+		plog.Infof("stopped HTTP pipelining with peer %s", p.peerID)
+	}
 }
 
 func (p *pipeline) handle() {

+ 3 - 1
rafthttp/pipeline_test.go

@@ -24,6 +24,8 @@ import (
 	"testing"
 	"time"
 
+	"go.uber.org/zap"
+
 	"github.com/coreos/etcd/etcdserver/stats"
 	"github.com/coreos/etcd/pkg/testutil"
 	"github.com/coreos/etcd/pkg/types"
@@ -301,7 +303,7 @@ func startTestPipeline(tr *Transport, picker *urlPicker) *pipeline {
 		peerID:        types.ID(1),
 		tr:            tr,
 		picker:        picker,
-		status:        newPeerStatus(types.ID(1)),
+		status:        newPeerStatus(zap.NewExample(), types.ID(1)),
 		raft:          &fakeRaft{},
 		followerStats: &stats.FollowerStats{},
 		errorc:        make(chan error, 1),

+ 30 - 6
rafthttp/probing_status.go

@@ -18,6 +18,7 @@ import (
 	"time"
 
 	"github.com/xiang90/probing"
+	"go.uber.org/zap"
 )
 
 var (
@@ -28,7 +29,7 @@ var (
 	statusErrorInterval      = 5 * time.Second
 )
 
-func addPeerToProber(p probing.Prober, id string, us []string) {
+func addPeerToProber(lg *zap.Logger, p probing.Prober, id string, us []string) {
 	hus := make([]string, len(us))
 	for i := range us {
 		hus[i] = us[i] + ProbingPrefix
@@ -38,26 +39,49 @@ func addPeerToProber(p probing.Prober, id string, us []string) {
 
 	s, err := p.Status(id)
 	if err != nil {
-		plog.Errorf("failed to add peer %s into prober", id)
+		if lg != nil {
+			lg.Warn("failed to add peer into prober", zap.String("remote-peer-id", id))
+		} else {
+			plog.Errorf("failed to add peer %s into prober", id)
+		}
 	} else {
-		go monitorProbingStatus(s, id)
+		go monitorProbingStatus(lg, s, id)
 	}
 }
 
-func monitorProbingStatus(s probing.Status, id string) {
+func monitorProbingStatus(lg *zap.Logger, s probing.Status, id string) {
 	// set the first interval short to log error early.
 	interval := statusErrorInterval
 	for {
 		select {
 		case <-time.After(interval):
 			if !s.Health() {
-				plog.Warningf("health check for peer %s could not connect: %v", id, s.Err())
+				if lg != nil {
+					lg.Warn(
+						"prober detected unhealthy status",
+						zap.String("remote-peer-id", id),
+						zap.Duration("rtt", s.SRTT()),
+						zap.Error(s.Err()),
+					)
+				} else {
+					plog.Warningf("health check for peer %s could not connect: %v", id, s.Err())
+				}
 				interval = statusErrorInterval
 			} else {
 				interval = statusMonitoringInterval
 			}
 			if s.ClockDiff() > time.Second {
-				plog.Warningf("the clock difference against peer %s is too high [%v > %v]", id, s.ClockDiff(), time.Second)
+				if lg != nil {
+					lg.Warn(
+						"prober found high clock drift",
+						zap.String("remote-peer-id", id),
+						zap.Duration("clock-drift", s.SRTT()),
+						zap.Duration("rtt", s.ClockDiff()),
+						zap.Error(s.Err()),
+					)
+				} else {
+					plog.Warningf("the clock difference against peer %s is too high [%v > %v]", id, s.ClockDiff(), time.Second)
+				}
 			}
 			rtts.WithLabelValues(id).Observe(s.SRTT().Seconds())
 		case <-s.StopNotify():

+ 32 - 3
rafthttp/remote.go

@@ -17,9 +17,13 @@ package rafthttp
 import (
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/raft/raftpb"
+
+	"go.uber.org/zap"
 )
 
 type remote struct {
+	lg       *zap.Logger
+	localID  types.ID
 	id       types.ID
 	status   *peerStatus
 	pipeline *pipeline
@@ -27,7 +31,7 @@ type remote struct {
 
 func startRemote(tr *Transport, urls types.URLs, id types.ID) *remote {
 	picker := newURLPicker(urls)
-	status := newPeerStatus(id)
+	status := newPeerStatus(tr.Logger, id)
 	pipeline := &pipeline{
 		peerID: id,
 		tr:     tr,
@@ -39,6 +43,8 @@ func startRemote(tr *Transport, urls types.URLs, id types.ID) *remote {
 	pipeline.start()
 
 	return &remote{
+		lg:       tr.Logger,
+		localID:  tr.ID,
 		id:       id,
 		status:   status,
 		pipeline: pipeline,
@@ -50,9 +56,32 @@ func (g *remote) send(m raftpb.Message) {
 	case g.pipeline.msgc <- m:
 	default:
 		if g.status.isActive() {
-			plog.MergeWarningf("dropped internal raft message to %s since sending buffer is full (bad/overloaded network)", g.id)
+			if g.lg != nil {
+				g.lg.Warn(
+					"dropped internal Raft message since sending buffer is full (overloaded network)",
+					zap.String("message-type", m.Type.String()),
+					zap.String("local-member-id", g.localID.String()),
+					zap.String("from", types.ID(m.From).String()),
+					zap.String("remote-peer-id", types.ID(g.id).String()),
+					zap.Bool("remote-peer-active", g.status.isActive()),
+				)
+			} else {
+				plog.MergeWarningf("dropped internal raft message to %s since sending buffer is full (bad/overloaded network)", g.id)
+			}
+		} else {
+			if g.lg != nil {
+				g.lg.Warn(
+					"dropped Raft message since sending buffer is full (overloaded network)",
+					zap.String("message-type", m.Type.String()),
+					zap.String("local-member-id", g.localID.String()),
+					zap.String("from", types.ID(m.From).String()),
+					zap.String("remote-peer-id", types.ID(g.id).String()),
+					zap.Bool("remote-peer-active", g.status.isActive()),
+				)
+			} else {
+				plog.Debugf("dropped %s to %s since sending buffer is full", m.Type, g.id)
+			}
 		}
-		plog.Debugf("dropped %s to %s since sending buffer is full", m.Type, g.id)
 		sentFailures.WithLabelValues(types.ID(m.To).String()).Inc()
 	}
 }

+ 38 - 6
rafthttp/snapshot_sender.go

@@ -27,6 +27,8 @@ import (
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/raft"
 	"github.com/coreos/etcd/raftsnap"
+
+	"go.uber.org/zap"
 )
 
 var (
@@ -66,18 +68,35 @@ func (s *snapshotSender) stop() { close(s.stopc) }
 func (s *snapshotSender) send(merged raftsnap.Message) {
 	m := merged.Message
 
-	body := createSnapBody(merged)
+	body := createSnapBody(s.tr.Logger, merged)
 	defer body.Close()
 
 	u := s.picker.pick()
 	req := createPostRequest(u, RaftSnapshotPrefix, body, "application/octet-stream", s.tr.URLs, s.from, s.cid)
 
-	plog.Infof("start to send database snapshot [index: %d, to %s]...", m.Snapshot.Metadata.Index, types.ID(m.To))
+	if s.tr.Logger != nil {
+		s.tr.Logger.Info(
+			"sending database snapshot",
+			zap.Uint64("snapshot-index", m.Snapshot.Metadata.Index),
+			zap.String("remote-peer-id", types.ID(m.To).String()),
+		)
+	} else {
+		plog.Infof("start to send database snapshot [index: %d, to %s]...", m.Snapshot.Metadata.Index, types.ID(m.To))
+	}
 
 	err := s.post(req)
 	defer merged.CloseWithError(err)
 	if err != nil {
-		plog.Warningf("database snapshot [index: %d, to: %s] failed to be sent out (%v)", m.Snapshot.Metadata.Index, types.ID(m.To), err)
+		if s.tr.Logger != nil {
+			s.tr.Logger.Warn(
+				"failed to send database snapshot",
+				zap.Uint64("snapshot-index", m.Snapshot.Metadata.Index),
+				zap.String("remote-peer-id", types.ID(m.To).String()),
+				zap.Error(err),
+			)
+		} else {
+			plog.Warningf("database snapshot [index: %d, to: %s] failed to be sent out (%v)", m.Snapshot.Metadata.Index, types.ID(m.To), err)
+		}
 
 		// errMemberRemoved is a critical error since a removed member should
 		// always be stopped. So we use reportCriticalError to report it to errorc.
@@ -97,7 +116,16 @@ func (s *snapshotSender) send(merged raftsnap.Message) {
 	}
 	s.status.activate()
 	s.r.ReportSnapshot(m.To, raft.SnapshotFinish)
-	plog.Infof("database snapshot [index: %d, to: %s] sent out successfully", m.Snapshot.Metadata.Index, types.ID(m.To))
+
+	if s.tr.Logger != nil {
+		s.tr.Logger.Info(
+			"sent database snapshot",
+			zap.Uint64("snapshot-index", m.Snapshot.Metadata.Index),
+			zap.String("remote-peer-id", types.ID(m.To).String()),
+		)
+	} else {
+		plog.Infof("database snapshot [index: %d, to: %s] sent out successfully", m.Snapshot.Metadata.Index, types.ID(m.To))
+	}
 
 	sentBytes.WithLabelValues(types.ID(m.To).String()).Add(float64(merged.TotalSize))
 }
@@ -142,12 +170,16 @@ func (s *snapshotSender) post(req *http.Request) (err error) {
 	}
 }
 
-func createSnapBody(merged raftsnap.Message) io.ReadCloser {
+func createSnapBody(lg *zap.Logger, merged raftsnap.Message) io.ReadCloser {
 	buf := new(bytes.Buffer)
 	enc := &messageEncoder{w: buf}
 	// encode raft message
 	if err := enc.encode(&merged.Message); err != nil {
-		plog.Panicf("encode message error (%v)", err)
+		if lg != nil {
+			lg.Panic("failed to encode message", zap.Error(err))
+		} else {
+			plog.Panicf("encode message error (%v)", err)
+		}
 	}
 
 	return &pioutil.ReaderAndCloser{

+ 4 - 2
rafthttp/snapshot_test.go

@@ -28,6 +28,8 @@ import (
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/raft/raftpb"
 	"github.com/coreos/etcd/raftsnap"
+
+	"go.uber.org/zap"
 )
 
 type strReaderCloser struct{ *strings.Reader }
@@ -102,12 +104,12 @@ func testSnapshotSend(t *testing.T, sm *raftsnap.Message) (bool, []os.FileInfo)
 	r := &fakeRaft{}
 	tr := &Transport{pipelineRt: &http.Transport{}, ClusterID: types.ID(1), Raft: r}
 	ch := make(chan struct{}, 1)
-	h := &syncHandler{newSnapshotHandler(tr, r, raftsnap.New(d), types.ID(1)), ch}
+	h := &syncHandler{newSnapshotHandler(tr, r, raftsnap.New(zap.NewExample(), d), types.ID(1)), ch}
 	srv := httptest.NewServer(h)
 	defer srv.Close()
 
 	picker := mustNewURLPicker(t, []string{srv.URL})
-	snapsend := newSnapshotSender(tr, picker, types.ID(1), newPeerStatus(types.ID(1)))
+	snapsend := newSnapshotSender(tr, picker, types.ID(1), newPeerStatus(zap.NewExample(), types.ID(1)))
 	defer snapsend.stop()
 
 	snapsend.send(*sm)

+ 215 - 25
rafthttp/stream.go

@@ -25,8 +25,6 @@ import (
 	"sync"
 	"time"
 
-	"golang.org/x/time/rate"
-
 	"github.com/coreos/etcd/etcdserver/stats"
 	"github.com/coreos/etcd/pkg/httputil"
 	"github.com/coreos/etcd/pkg/transport"
@@ -35,6 +33,8 @@ import (
 	"github.com/coreos/etcd/version"
 
 	"github.com/coreos/go-semver/semver"
+	"go.uber.org/zap"
+	"golang.org/x/time/rate"
 )
 
 const (
@@ -105,7 +105,11 @@ type outgoingConn struct {
 
 // streamWriter writes messages to the attached outgoingConn.
 type streamWriter struct {
-	peerID types.ID
+	lg *zap.Logger
+
+	localID types.ID
+	peerID  types.ID
+
 	status *peerStatus
 	fs     *stats.FollowerStats
 	r      Raft
@@ -122,9 +126,13 @@ type streamWriter struct {
 
 // startStreamWriter creates a streamWrite and starts a long running go-routine that accepts
 // messages and writes to the attached outgoing connection.
-func startStreamWriter(id types.ID, status *peerStatus, fs *stats.FollowerStats, r Raft) *streamWriter {
+func startStreamWriter(lg *zap.Logger, local, id types.ID, status *peerStatus, fs *stats.FollowerStats, r Raft) *streamWriter {
 	w := &streamWriter{
-		peerID: id,
+		lg: lg,
+
+		localID: local,
+		peerID:  id,
+
 		status: status,
 		fs:     fs,
 		r:      r,
@@ -150,7 +158,15 @@ func (cw *streamWriter) run() {
 	defer tickc.Stop()
 	unflushed := 0
 
-	plog.Infof("started streaming with peer %s (writer)", cw.peerID)
+	if cw.lg != nil {
+		cw.lg.Info(
+			"started stream writer with remote peer",
+			zap.String("local-member-id", cw.localID.String()),
+			zap.String("remote-peer-id", cw.peerID.String()),
+		)
+	} else {
+		plog.Infof("started streaming with peer %s (writer)", cw.peerID)
+	}
 
 	for {
 		select {
@@ -169,7 +185,16 @@ func (cw *streamWriter) run() {
 
 			sentFailures.WithLabelValues(cw.peerID.String()).Inc()
 			cw.close()
-			plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
+			if cw.lg != nil {
+				cw.lg.Warn(
+					"lost TCP streaming connection with remote peer",
+					zap.String("stream-writer-type", t.String()),
+					zap.String("local-member-id", cw.localID.String()),
+					zap.String("remote-peer-id", cw.peerID.String()),
+				)
+			} else {
+				plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
+			}
 			heartbeatc, msgc = nil, nil
 
 		case m := <-msgc:
@@ -191,7 +216,16 @@ func (cw *streamWriter) run() {
 
 			cw.status.deactivate(failureType{source: t.String(), action: "write"}, err.Error())
 			cw.close()
-			plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
+			if cw.lg != nil {
+				cw.lg.Warn(
+					"lost TCP streaming connection with remote peer",
+					zap.String("stream-writer-type", t.String()),
+					zap.String("local-member-id", cw.localID.String()),
+					zap.String("remote-peer-id", cw.peerID.String()),
+				)
+			} else {
+				plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
+			}
 			heartbeatc, msgc = nil, nil
 			cw.r.ReportUnreachable(m.To)
 			sentFailures.WithLabelValues(cw.peerID.String()).Inc()
@@ -216,15 +250,50 @@ func (cw *streamWriter) run() {
 			cw.mu.Unlock()
 
 			if closed {
-				plog.Warningf("closed an existing TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
+				if cw.lg != nil {
+					cw.lg.Warn(
+						"closed TCP streaming connection with remote peer",
+						zap.String("stream-writer-type", t.String()),
+						zap.String("local-member-id", cw.localID.String()),
+						zap.String("remote-peer-id", cw.peerID.String()),
+					)
+				} else {
+					plog.Warningf("closed an existing TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
+				}
+			}
+			if cw.lg != nil {
+				cw.lg.Warn(
+					"established TCP streaming connection with remote peer",
+					zap.String("stream-writer-type", t.String()),
+					zap.String("local-member-id", cw.localID.String()),
+					zap.String("remote-peer-id", cw.peerID.String()),
+				)
+			} else {
+				plog.Infof("established a TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
 			}
-			plog.Infof("established a TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
 			heartbeatc, msgc = tickc.C, cw.msgc
+
 		case <-cw.stopc:
 			if cw.close() {
-				plog.Infof("closed the TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
+				if cw.lg != nil {
+					cw.lg.Warn(
+						"closed TCP streaming connection with remote peer",
+						zap.String("stream-writer-type", t.String()),
+						zap.String("remote-peer-id", cw.peerID.String()),
+					)
+				} else {
+					plog.Infof("closed the TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
+				}
+			}
+			if cw.lg != nil {
+				cw.lg.Warn(
+					"stopped TCP streaming connection with remote peer",
+					zap.String("stream-writer-type", t.String()),
+					zap.String("remote-peer-id", cw.peerID.String()),
+				)
+			} else {
+				plog.Infof("stopped streaming with peer %s (writer)", cw.peerID)
 			}
-			plog.Infof("stopped streaming with peer %s (writer)", cw.peerID)
 			close(cw.done)
 			return
 		}
@@ -248,7 +317,15 @@ func (cw *streamWriter) closeUnlocked() bool {
 		return false
 	}
 	if err := cw.closer.Close(); err != nil {
-		plog.Errorf("peer %s (writer) connection close error: %v", cw.peerID, err)
+		if cw.lg != nil {
+			cw.lg.Warn(
+				"failed to close connection with remote peer",
+				zap.String("remote-peer-id", cw.peerID.String()),
+				zap.Error(err),
+			)
+		} else {
+			plog.Errorf("peer %s (writer) connection close error: %v", cw.peerID, err)
+		}
 	}
 	if len(cw.msgc) > 0 {
 		cw.r.ReportUnreachable(uint64(cw.peerID))
@@ -275,6 +352,8 @@ func (cw *streamWriter) stop() {
 // streamReader is a long-running go-routine that dials to the remote stream
 // endpoint and reads messages from the response body returned.
 type streamReader struct {
+	lg *zap.Logger
+
 	peerID types.ID
 	typ    streamType
 
@@ -310,7 +389,18 @@ func (cr *streamReader) start() {
 
 func (cr *streamReader) run() {
 	t := cr.typ
-	plog.Infof("started streaming with peer %s (%s reader)", cr.peerID, t)
+
+	if cr.lg != nil {
+		cr.lg.Info(
+			"started stream reader with remote peer",
+			zap.String("stream-reader-type", t.String()),
+			zap.String("local-member-id", cr.tr.ID.String()),
+			zap.String("remote-peer-id", cr.peerID.String()),
+		)
+	} else {
+		plog.Infof("started streaming with peer %s (%s reader)", cr.peerID, t)
+	}
+
 	for {
 		rc, err := cr.dial(t)
 		if err != nil {
@@ -319,9 +409,28 @@ func (cr *streamReader) run() {
 			}
 		} else {
 			cr.status.activate()
-			plog.Infof("established a TCP streaming connection with peer %s (%s reader)", cr.peerID, cr.typ)
+			if cr.lg != nil {
+				cr.lg.Info(
+					"established TCP streaming connection with remote peer",
+					zap.String("stream-reader-type", cr.typ.String()),
+					zap.String("local-member-id", cr.tr.ID.String()),
+					zap.String("remote-peer-id", cr.peerID.String()),
+				)
+			} else {
+				plog.Infof("established a TCP streaming connection with peer %s (%s reader)", cr.peerID, cr.typ)
+			}
 			err = cr.decodeLoop(rc, t)
-			plog.Warningf("lost the TCP streaming connection with peer %s (%s reader)", cr.peerID, cr.typ)
+			if cr.lg != nil {
+				cr.lg.Warn(
+					"lost TCP streaming connection with remote peer",
+					zap.String("stream-reader-type", cr.typ.String()),
+					zap.String("local-member-id", cr.tr.ID.String()),
+					zap.String("remote-peer-id", cr.peerID.String()),
+					zap.Error(err),
+				)
+			} else {
+				plog.Warningf("lost the TCP streaming connection with peer %s (%s reader)", cr.peerID, cr.typ)
+			}
 			switch {
 			// all data is read out
 			case err == io.EOF:
@@ -334,12 +443,31 @@ func (cr *streamReader) run() {
 		// Wait for a while before new dial attempt
 		err = cr.rl.Wait(cr.ctx)
 		if cr.ctx.Err() != nil {
-			plog.Infof("stopped streaming with peer %s (%s reader)", cr.peerID, t)
+			if cr.lg != nil {
+				cr.lg.Info(
+					"stopped stream reader with remote peer",
+					zap.String("stream-reader-type", t.String()),
+					zap.String("local-member-id", cr.tr.ID.String()),
+					zap.String("remote-peer-id", cr.peerID.String()),
+				)
+			} else {
+				plog.Infof("stopped streaming with peer %s (%s reader)", cr.peerID, t)
+			}
 			close(cr.done)
 			return
 		}
 		if err != nil {
-			plog.Errorf("streaming with peer %s (%s reader) rate limiter error: %v", cr.peerID, t, err)
+			if cr.lg != nil {
+				cr.lg.Warn(
+					"rate limit on stream reader with remote peer",
+					zap.String("stream-reader-type", t.String()),
+					zap.String("local-member-id", cr.tr.ID.String()),
+					zap.String("remote-peer-id", cr.peerID.String()),
+					zap.Error(err),
+				)
+			} else {
+				plog.Errorf("streaming with peer %s (%s reader) rate limiter error: %v", cr.peerID, t, err)
+			}
 		}
 	}
 }
@@ -353,7 +481,11 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
 	case streamTypeMessage:
 		dec = &messageDecoder{r: rc}
 	default:
-		plog.Panicf("unhandled stream type %s", t)
+		if cr.lg != nil {
+			cr.lg.Panic("unknown stream type", zap.String("type", t.String()))
+		} else {
+			plog.Panicf("unhandled stream type %s", t)
+		}
 	}
 	select {
 	case <-cr.ctx.Done():
@@ -402,9 +534,32 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
 		case recvc <- m:
 		default:
 			if cr.status.isActive() {
-				plog.MergeWarningf("dropped internal raft message from %s since receiving buffer is full (overloaded network)", types.ID(m.From))
+				if cr.lg != nil {
+					cr.lg.Warn(
+						"dropped internal Raft message since receiving buffer is full (overloaded network)",
+						zap.String("message-type", m.Type.String()),
+						zap.String("local-member-id", cr.tr.ID.String()),
+						zap.String("from", types.ID(m.From).String()),
+						zap.String("remote-peer-id", types.ID(m.To).String()),
+						zap.Bool("remote-peer-active", cr.status.isActive()),
+					)
+				} else {
+					plog.MergeWarningf("dropped internal raft message from %s since receiving buffer is full (overloaded network)", types.ID(m.From))
+				}
+			} else {
+				if cr.lg != nil {
+					cr.lg.Warn(
+						"dropped Raft message since receiving buffer is full (overloaded network)",
+						zap.String("message-type", m.Type.String()),
+						zap.String("local-member-id", cr.tr.ID.String()),
+						zap.String("from", types.ID(m.From).String()),
+						zap.String("remote-peer-id", types.ID(m.To).String()),
+						zap.Bool("remote-peer-active", cr.status.isActive()),
+					)
+				} else {
+					plog.Debugf("dropped %s from %s since receiving buffer is full", m.Type, types.ID(m.From))
+				}
 			}
-			plog.Debugf("dropped %s from %s since receiving buffer is full", m.Type, types.ID(m.From))
 			recvFailures.WithLabelValues(types.ID(m.From).String()).Inc()
 		}
 	}
@@ -467,12 +622,15 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
 		cr.picker.unreachable(u)
 		reportCriticalError(errMemberRemoved, cr.errorc)
 		return nil, errMemberRemoved
+
 	case http.StatusOK:
 		return resp.Body, nil
+
 	case http.StatusNotFound:
 		httputil.GracefulClose(resp)
 		cr.picker.unreachable(u)
 		return nil, fmt.Errorf("peer %s failed to find local node %s", cr.peerID, cr.tr.ID)
+
 	case http.StatusPreconditionFailed:
 		b, err := ioutil.ReadAll(resp.Body)
 		if err != nil {
@@ -484,15 +642,38 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
 
 		switch strings.TrimSuffix(string(b), "\n") {
 		case errIncompatibleVersion.Error():
-			plog.Errorf("request sent was ignored by peer %s (server version incompatible)", cr.peerID)
+			if cr.lg != nil {
+				cr.lg.Warn(
+					"request sent was ignored by remote peer due to server version incompatibility",
+					zap.String("local-member-id", cr.tr.ID.String()),
+					zap.String("remote-peer-id", cr.peerID.String()),
+					zap.Error(errIncompatibleVersion),
+				)
+			} else {
+				plog.Errorf("request sent was ignored by peer %s (server version incompatible)", cr.peerID)
+			}
 			return nil, errIncompatibleVersion
+
 		case errClusterIDMismatch.Error():
-			plog.Errorf("request sent was ignored (cluster ID mismatch: peer[%s]=%s, local=%s)",
-				cr.peerID, resp.Header.Get("X-Etcd-Cluster-ID"), cr.tr.ClusterID)
+			if cr.lg != nil {
+				cr.lg.Warn(
+					"request sent was ignored by remote peer due to cluster ID mismatch",
+					zap.String("remote-peer-id", cr.peerID.String()),
+					zap.String("remote-peer-cluster-id", resp.Header.Get("X-Etcd-Cluster-ID")),
+					zap.String("local-member-id", cr.tr.ID.String()),
+					zap.String("local-member-cluster-id", cr.tr.ClusterID.String()),
+					zap.Error(errClusterIDMismatch),
+				)
+			} else {
+				plog.Errorf("request sent was ignored (cluster ID mismatch: peer[%s]=%s, local=%s)",
+					cr.peerID, resp.Header.Get("X-Etcd-Cluster-ID"), cr.tr.ClusterID)
+			}
 			return nil, errClusterIDMismatch
+
 		default:
 			return nil, fmt.Errorf("unhandled error %q when precondition failed", string(b))
 		}
+
 	default:
 		httputil.GracefulClose(resp)
 		cr.picker.unreachable(u)
@@ -503,7 +684,16 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
 func (cr *streamReader) close() {
 	if cr.closer != nil {
 		if err := cr.closer.Close(); err != nil {
-			plog.Errorf("peer %s (reader) connection close error: %v", cr.peerID, err)
+			if cr.lg != nil {
+				cr.lg.Warn(
+					"failed to close remote peer connection",
+					zap.String("local-member-id", cr.tr.ID.String()),
+					zap.String("remote-peer-id", cr.peerID.String()),
+					zap.Error(err),
+				)
+			} else {
+				plog.Errorf("peer %s (reader) connection close error: %v", cr.peerID, err)
+			}
 		}
 	}
 	cr.closer = nil

+ 6 - 5
rafthttp/stream_test.go

@@ -33,6 +33,7 @@ import (
 	"github.com/coreos/etcd/version"
 
 	"github.com/coreos/go-semver/semver"
+	"go.uber.org/zap"
 	"golang.org/x/time/rate"
 )
 
@@ -40,7 +41,7 @@ import (
 // to streamWriter. After that, streamWriter can use it to send messages
 // continuously, and closes it when stopped.
 func TestStreamWriterAttachOutgoingConn(t *testing.T) {
-	sw := startStreamWriter(types.ID(1), newPeerStatus(types.ID(1)), &stats.FollowerStats{}, &fakeRaft{})
+	sw := startStreamWriter(zap.NewExample(), types.ID(0), types.ID(1), newPeerStatus(zap.NewExample(), types.ID(1)), &stats.FollowerStats{}, &fakeRaft{})
 	// the expected initial state of streamWriter is not working
 	if _, ok := sw.writec(); ok {
 		t.Errorf("initial working status = %v, want false", ok)
@@ -92,7 +93,7 @@ func TestStreamWriterAttachOutgoingConn(t *testing.T) {
 // TestStreamWriterAttachBadOutgoingConn tests that streamWriter with bad
 // outgoingConn will close the outgoingConn and fall back to non-working status.
 func TestStreamWriterAttachBadOutgoingConn(t *testing.T) {
-	sw := startStreamWriter(types.ID(1), newPeerStatus(types.ID(1)), &stats.FollowerStats{}, &fakeRaft{})
+	sw := startStreamWriter(zap.NewExample(), types.ID(0), types.ID(1), newPeerStatus(zap.NewExample(), types.ID(1)), &stats.FollowerStats{}, &fakeRaft{})
 	defer sw.stop()
 	wfc := newFakeWriteFlushCloser(errors.New("blah"))
 	sw.attach(&outgoingConn{t: streamTypeMessage, Writer: wfc, Flusher: wfc, Closer: wfc})
@@ -196,7 +197,7 @@ func TestStreamReaderStopOnDial(t *testing.T) {
 		picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
 		errorc: make(chan error, 1),
 		typ:    streamTypeMessage,
-		status: newPeerStatus(types.ID(2)),
+		status: newPeerStatus(zap.NewExample(), types.ID(2)),
 		rl:     rate.NewLimiter(rate.Every(100*time.Millisecond), 1),
 	}
 	tr.onResp = func() {
@@ -303,7 +304,7 @@ func TestStream(t *testing.T) {
 		srv := httptest.NewServer(h)
 		defer srv.Close()
 
-		sw := startStreamWriter(types.ID(1), newPeerStatus(types.ID(1)), &stats.FollowerStats{}, &fakeRaft{})
+		sw := startStreamWriter(zap.NewExample(), types.ID(0), types.ID(1), newPeerStatus(zap.NewExample(), types.ID(1)), &stats.FollowerStats{}, &fakeRaft{})
 		defer sw.stop()
 		h.sw = sw
 
@@ -315,7 +316,7 @@ func TestStream(t *testing.T) {
 			typ:    tt.t,
 			tr:     tr,
 			picker: picker,
-			status: newPeerStatus(types.ID(2)),
+			status: newPeerStatus(zap.NewExample(), types.ID(2)),
 			recvc:  recvc,
 			propc:  propc,
 			rl:     rate.NewLimiter(rate.Every(100*time.Millisecond), 1),

+ 52 - 11
rafthttp/transport.go

@@ -30,6 +30,7 @@ import (
 
 	"github.com/coreos/pkg/capnslog"
 	"github.com/xiang90/probing"
+	"go.uber.org/zap"
 	"golang.org/x/time/rate"
 )
 
@@ -98,6 +99,8 @@ type Transporter interface {
 // User needs to call Start before calling other functions, and call
 // Stop when the Transport is no longer used.
 type Transport struct {
+	Logger *zap.Logger
+
 	DialTimeout time.Duration // maximum duration before timing out dial of the request
 	// DialRetryFrequency defines the frequency of streamReader dial retrial attempts;
 	// a distinct rate limiter is created per every peer (default value: 10 events/sec)
@@ -197,7 +200,15 @@ func (t *Transport) Send(msgs []raftpb.Message) {
 			continue
 		}
 
-		plog.Debugf("ignored message %s (sent to unknown peer %s)", m.Type, to)
+		if t.Logger != nil {
+			t.Logger.Debug(
+				"ignored message send request; unknown remote peer target",
+				zap.String("type", m.Type.String()),
+				zap.String("unknown-target-peer-id", to.String()),
+			)
+		} else {
+			plog.Debugf("ignored message %s (sent to unknown peer %s)", m.Type, to)
+		}
 	}
 }
 
@@ -268,7 +279,11 @@ func (t *Transport) AddRemote(id types.ID, us []string) {
 	}
 	urls, err := types.NewURLs(us)
 	if err != nil {
-		plog.Panicf("newURLs %+v should never fail: %+v", us, err)
+		if t.Logger != nil {
+			t.Logger.Panic("failed NewURLs", zap.Strings("urls", us), zap.Error(err))
+		} else {
+			plog.Panicf("newURLs %+v should never fail: %+v", us, err)
+		}
 	}
 	t.remotes[id] = startRemote(t, urls, id)
 }
@@ -285,13 +300,21 @@ func (t *Transport) AddPeer(id types.ID, us []string) {
 	}
 	urls, err := types.NewURLs(us)
 	if err != nil {
-		plog.Panicf("newURLs %+v should never fail: %+v", us, err)
+		if t.Logger != nil {
+			t.Logger.Panic("failed NewURLs", zap.Strings("urls", us), zap.Error(err))
+		} else {
+			plog.Panicf("newURLs %+v should never fail: %+v", us, err)
+		}
 	}
 	fs := t.LeaderStats.Follower(id.String())
 	t.peers[id] = startPeer(t, urls, id, fs)
-	addPeerToProber(t.prober, id.String(), us)
+	addPeerToProber(t.Logger, t.prober, id.String(), us)
 
-	plog.Infof("added peer %s", id)
+	if t.Logger != nil {
+		t.Logger.Info("added remote peer", zap.String("remote-peer-id", id.String()))
+	} else {
+		plog.Infof("added peer %s", id)
+	}
 }
 
 func (t *Transport) RemovePeer(id types.ID) {
@@ -313,12 +336,21 @@ func (t *Transport) removePeer(id types.ID) {
 	if peer, ok := t.peers[id]; ok {
 		peer.stop()
 	} else {
-		plog.Panicf("unexpected removal of unknown peer '%d'", id)
+		if t.Logger != nil {
+			t.Logger.Panic("unexpected removal of unknown remote peer", zap.String("remote-peer-id", id.String()))
+		} else {
+			plog.Panicf("unexpected removal of unknown peer '%d'", id)
+		}
 	}
 	delete(t.peers, id)
 	delete(t.LeaderStats.Followers, id.String())
 	t.prober.Remove(id.String())
-	plog.Infof("removed peer %s", id)
+
+	if t.Logger != nil {
+		t.Logger.Info("removed remote peer", zap.String("remote-peer-id", id.String()))
+	} else {
+		plog.Infof("removed peer %s", id)
+	}
 }
 
 func (t *Transport) UpdatePeer(id types.ID, us []string) {
@@ -330,13 +362,22 @@ func (t *Transport) UpdatePeer(id types.ID, us []string) {
 	}
 	urls, err := types.NewURLs(us)
 	if err != nil {
-		plog.Panicf("newURLs %+v should never fail: %+v", us, err)
+		if t.Logger != nil {
+			t.Logger.Panic("failed NewURLs", zap.Strings("urls", us), zap.Error(err))
+		} else {
+			plog.Panicf("newURLs %+v should never fail: %+v", us, err)
+		}
 	}
 	t.peers[id].update(urls)
 
 	t.prober.Remove(id.String())
-	addPeerToProber(t.prober, id.String(), us)
-	plog.Infof("updated peer %s", id)
+	addPeerToProber(t.Logger, t.prober, id.String(), us)
+
+	if t.Logger != nil {
+		t.Logger.Info("updated remote peer", zap.String("remote-peer-id", id.String()))
+	} else {
+		plog.Infof("updated peer %s", id)
+	}
 }
 
 func (t *Transport) ActiveSince(id types.ID) time.Time {
@@ -425,7 +466,7 @@ func NewSnapTransporter(snapDir string) (Transporter, <-chan raftsnap.Message) {
 }
 
 func (s *snapTransporter) SendSnapshot(m raftsnap.Message) {
-	ss := raftsnap.New(s.snapDir)
+	ss := raftsnap.New(zap.NewExample(), s.snapDir)
 	ss.SaveDBFrom(m.ReadCloser, m.Snapshot.Metadata.Index+1)
 	m.CloseWithError(nil)
 	s.snapDoneC <- m

+ 10 - 7
rafthttp/util.go

@@ -150,18 +150,21 @@ func minClusterVersion(h http.Header) *semver.Version {
 	return semver.Must(semver.NewVersion(verStr))
 }
 
-// checkVersionCompability checks whether the given version is compatible
+// checkVersionCompatibility checks whether the given version is compatible
 // with the local version.
-func checkVersionCompability(name string, server, minCluster *semver.Version) error {
-	localServer := semver.Must(semver.NewVersion(version.Version))
-	localMinCluster := semver.Must(semver.NewVersion(version.MinClusterVersion))
+func checkVersionCompatibility(name string, server, minCluster *semver.Version) (
+	localServer *semver.Version,
+	localMinCluster *semver.Version,
+	err error) {
+	localServer = semver.Must(semver.NewVersion(version.Version))
+	localMinCluster = semver.Must(semver.NewVersion(version.MinClusterVersion))
 	if compareMajorMinorVersion(server, localMinCluster) == -1 {
-		return fmt.Errorf("remote version is too low: remote[%s]=%s, local=%s", name, server, localServer)
+		return localServer, localMinCluster, fmt.Errorf("remote version is too low: remote[%s]=%s, local=%s", name, server, localServer)
 	}
 	if compareMajorMinorVersion(minCluster, localServer) == 1 {
-		return fmt.Errorf("local version is too low: remote[%s]=%s, local=%s", name, server, localServer)
+		return localServer, localMinCluster, fmt.Errorf("local version is too low: remote[%s]=%s, local=%s", name, server, localServer)
 	}
-	return nil
+	return localServer, localMinCluster, nil
 }
 
 // setPeerURLsHeader reports local urls for peer discovery

+ 1 - 1
rafthttp/util_test.go

@@ -188,7 +188,7 @@ func TestCheckVersionCompatibility(t *testing.T) {
 		},
 	}
 	for i, tt := range tests {
-		err := checkVersionCompability("", tt.server, tt.minCluster)
+		_, _, err := checkVersionCompatibility("", tt.server, tt.minCluster)
 		if ok := err == nil; ok != tt.wok {
 			t.Errorf("#%d: ok = %v, want %v", i, ok, tt.wok)
 		}