Browse Source

Merge pull request #10155 from gyuho/metrics-messages

rafthttp: probe all raft transports
Gyuho Lee 7 years ago
parent
commit
d2a0f17b82

+ 1 - 1
etcdserver/api/rafthttp/metrics.go

@@ -137,7 +137,7 @@ var (
 		Namespace: "etcd",
 		Subsystem: "network",
 		Name:      "peer_round_trip_time_seconds",
-		Help:      "Round-Trip-Time histogram between peers.",
+		Help:      "Round-Trip-Time histogram between peers",
 
 		// lowest bucket start of upper bound 0.0001 sec (0.1 ms) with factor 2
 		// highest bucket start of 0.0001 sec * 2^15 == 3.2768 sec

+ 15 - 4
etcdserver/api/rafthttp/probing_status.go

@@ -17,10 +17,19 @@ package rafthttp
 import (
 	"time"
 
+	"github.com/prometheus/client_golang/prometheus"
 	"github.com/xiang90/probing"
 	"go.uber.org/zap"
 )
 
+const (
+	// RoundTripperNameRaftMessage is the name of round-tripper that sends
+	// all other Raft messages, other than "snap.Message".
+	RoundTripperNameRaftMessage = "ROUND_TRIPPER_RAFT_MESSAGE"
+	// RoundTripperNameSnapshot is the name of round-tripper that sends merged snapshot message.
+	RoundTripperNameSnapshot = "ROUND_TRIPPER_SNAPSHOT"
+)
+
 var (
 	// proberInterval must be shorter than read timeout.
 	// Or the connection will time-out.
@@ -29,7 +38,7 @@ var (
 	statusErrorInterval      = 5 * time.Second
 )
 
-func addPeerToProber(lg *zap.Logger, p probing.Prober, id string, us []string) {
+func addPeerToProber(lg *zap.Logger, p probing.Prober, id string, us []string, roundTripperName string, rttSecProm *prometheus.HistogramVec) {
 	hus := make([]string, len(us))
 	for i := range us {
 		hus[i] = us[i] + ProbingPrefix
@@ -47,10 +56,10 @@ func addPeerToProber(lg *zap.Logger, p probing.Prober, id string, us []string) {
 		return
 	}
 
-	go monitorProbingStatus(lg, s, id)
+	go monitorProbingStatus(lg, s, id, roundTripperName, rttSecProm)
 }
 
-func monitorProbingStatus(lg *zap.Logger, s probing.Status, id string) {
+func monitorProbingStatus(lg *zap.Logger, s probing.Status, id string, roundTripperName string, rttSecProm *prometheus.HistogramVec) {
 	// set the first interval short to log error early.
 	interval := statusErrorInterval
 	for {
@@ -60,6 +69,7 @@ func monitorProbingStatus(lg *zap.Logger, s probing.Status, id string) {
 				if lg != nil {
 					lg.Warn(
 						"prober detected unhealthy status",
+						zap.String("round-tripper-name", roundTripperName),
 						zap.String("remote-peer-id", id),
 						zap.Duration("rtt", s.SRTT()),
 						zap.Error(s.Err()),
@@ -75,6 +85,7 @@ func monitorProbingStatus(lg *zap.Logger, s probing.Status, id string) {
 				if lg != nil {
 					lg.Warn(
 						"prober found high clock drift",
+						zap.String("round-tripper-name", roundTripperName),
 						zap.String("remote-peer-id", id),
 						zap.Duration("clock-drift", s.SRTT()),
 						zap.Duration("rtt", s.ClockDiff()),
@@ -84,7 +95,7 @@ func monitorProbingStatus(lg *zap.Logger, s probing.Status, id string) {
 					plog.Warningf("the clock difference against peer %s is too high [%v > %v]", id, s.ClockDiff(), time.Second)
 				}
 			}
-			rttSec.WithLabelValues(id).Observe(s.SRTT().Seconds())
+			rttSecProm.WithLabelValues(id).Observe(s.SRTT().Seconds())
 
 		case <-s.StopNotify():
 			return

+ 14 - 7
etcdserver/api/rafthttp/transport.go

@@ -130,7 +130,8 @@ type Transport struct {
 	remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up
 	peers   map[types.ID]Peer    // peers map
 
-	prober probing.Prober
+	pipelineProber probing.Prober
+	streamProber   probing.Prober
 }
 
 func (t *Transport) Start() error {
@@ -145,7 +146,8 @@ func (t *Transport) Start() error {
 	}
 	t.remotes = make(map[types.ID]*remote)
 	t.peers = make(map[types.ID]Peer)
-	t.prober = probing.NewProber(t.pipelineRt)
+	t.pipelineProber = probing.NewProber(t.pipelineRt)
+	t.streamProber = probing.NewProber(t.streamRt)
 
 	// If client didn't provide dial retry frequency, use the default
 	// (100ms backoff between attempts to create a new stream),
@@ -221,7 +223,8 @@ func (t *Transport) Stop() {
 	for _, p := range t.peers {
 		p.stop()
 	}
-	t.prober.RemoveAll()
+	t.pipelineProber.RemoveAll()
+	t.streamProber.RemoveAll()
 	if tr, ok := t.streamRt.(*http.Transport); ok {
 		tr.CloseIdleConnections()
 	}
@@ -317,7 +320,8 @@ func (t *Transport) AddPeer(id types.ID, us []string) {
 	}
 	fs := t.LeaderStats.Follower(id.String())
 	t.peers[id] = startPeer(t, urls, id, fs)
-	addPeerToProber(t.Logger, t.prober, id.String(), us)
+	addPeerToProber(t.Logger, t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rttSec)
+	addPeerToProber(t.Logger, t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rttSec)
 
 	if t.Logger != nil {
 		t.Logger.Info(
@@ -358,7 +362,8 @@ func (t *Transport) removePeer(id types.ID) {
 	}
 	delete(t.peers, id)
 	delete(t.LeaderStats.Followers, id.String())
-	t.prober.Remove(id.String())
+	t.pipelineProber.Remove(id.String())
+	t.streamProber.Remove(id.String())
 
 	if t.Logger != nil {
 		t.Logger.Info(
@@ -388,8 +393,10 @@ func (t *Transport) UpdatePeer(id types.ID, us []string) {
 	}
 	t.peers[id].update(urls)
 
-	t.prober.Remove(id.String())
-	addPeerToProber(t.Logger, t.prober, id.String(), us)
+	t.pipelineProber.Remove(id.String())
+	addPeerToProber(t.Logger, t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rttSec)
+	t.streamProber.Remove(id.String())
+	addPeerToProber(t.Logger, t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rttSec)
 
 	if t.Logger != nil {
 		t.Logger.Info(

+ 21 - 17
etcdserver/api/rafthttp/transport_test.go

@@ -97,10 +97,11 @@ func TestTransportCutMend(t *testing.T) {
 func TestTransportAdd(t *testing.T) {
 	ls := stats.NewLeaderStats("")
 	tr := &Transport{
-		LeaderStats: ls,
-		streamRt:    &roundTripperRecorder{},
-		peers:       make(map[types.ID]Peer),
-		prober:      probing.NewProber(nil),
+		LeaderStats:    ls,
+		streamRt:       &roundTripperRecorder{},
+		peers:          make(map[types.ID]Peer),
+		pipelineProber: probing.NewProber(nil),
+		streamProber:   probing.NewProber(nil),
 	}
 	tr.AddPeer(1, []string{"http://localhost:2380"})
 
@@ -125,10 +126,11 @@ func TestTransportAdd(t *testing.T) {
 
 func TestTransportRemove(t *testing.T) {
 	tr := &Transport{
-		LeaderStats: stats.NewLeaderStats(""),
-		streamRt:    &roundTripperRecorder{},
-		peers:       make(map[types.ID]Peer),
-		prober:      probing.NewProber(nil),
+		LeaderStats:    stats.NewLeaderStats(""),
+		streamRt:       &roundTripperRecorder{},
+		peers:          make(map[types.ID]Peer),
+		pipelineProber: probing.NewProber(nil),
+		streamProber:   probing.NewProber(nil),
 	}
 	tr.AddPeer(1, []string{"http://localhost:2380"})
 	tr.RemovePeer(types.ID(1))
@@ -142,8 +144,9 @@ func TestTransportRemove(t *testing.T) {
 func TestTransportUpdate(t *testing.T) {
 	peer := newFakePeer()
 	tr := &Transport{
-		peers:  map[types.ID]Peer{types.ID(1): peer},
-		prober: probing.NewProber(nil),
+		peers:          map[types.ID]Peer{types.ID(1): peer},
+		pipelineProber: probing.NewProber(nil),
+		streamProber:   probing.NewProber(nil),
 	}
 	u := "http://localhost:2380"
 	tr.UpdatePeer(types.ID(1), []string{u})
@@ -156,13 +159,14 @@ func TestTransportUpdate(t *testing.T) {
 func TestTransportErrorc(t *testing.T) {
 	errorc := make(chan error, 1)
 	tr := &Transport{
-		Raft:        &fakeRaft{},
-		LeaderStats: stats.NewLeaderStats(""),
-		ErrorC:      errorc,
-		streamRt:    newRespRoundTripper(http.StatusForbidden, nil),
-		pipelineRt:  newRespRoundTripper(http.StatusForbidden, nil),
-		peers:       make(map[types.ID]Peer),
-		prober:      probing.NewProber(nil),
+		Raft:           &fakeRaft{},
+		LeaderStats:    stats.NewLeaderStats(""),
+		ErrorC:         errorc,
+		streamRt:       newRespRoundTripper(http.StatusForbidden, nil),
+		pipelineRt:     newRespRoundTripper(http.StatusForbidden, nil),
+		peers:          make(map[types.ID]Peer),
+		pipelineProber: probing.NewProber(nil),
+		streamProber:   probing.NewProber(nil),
 	}
 	tr.AddPeer(1, []string{"http://localhost:2380"})
 	defer tr.Stop()