Browse Source

etcdserver/api/rafthttp: probe all Raft messages' RTT

This PR adds another probing routine to monitor the connection
for Raft message transports. Previously, we only monitored
snapshot transports.

In our production cluster, we found one TCP connection had >8-sec
latencies to a remote peer, but "etcd_network_peer_round_trip_time_seconds"
metrics shows <1-sec latency distribution, which means etcd server
was not sampling enough while such latency spikes happen
outside of snapshot pipeline connection.

Signed-off-by: Gyuho Lee <leegyuho@amazon.com>
Gyuho Lee 7 years ago
parent
commit
7b1ef37054

+ 1 - 1
etcdserver/api/rafthttp/metrics.go

@@ -137,7 +137,7 @@ var (
 		Namespace: "etcd",
 		Subsystem: "network",
 		Name:      "peer_round_trip_time_seconds",
-		Help:      "Round-Trip-Time histogram between peers.",
+		Help:      "Round-Trip-Time histogram between peers",
 
 		// lowest bucket start of upper bound 0.0001 sec (0.1 ms) with factor 2
 		// highest bucket start of 0.0001 sec * 2^15 == 3.2768 sec

+ 3 - 0
etcdserver/api/rafthttp/probing_status.go

@@ -23,6 +23,9 @@ import (
 )
 
 const (
+	// RoundTripperNameRaftMessage is the name of round-tripper that sends
+	// all other Raft messages, other than "snap.Message".
+	RoundTripperNameRaftMessage = "ROUND_TRIPPER_RAFT_MESSAGE"
 	// RoundTripperNameSnapshot is the name of round-tripper that sends merged snapshot message.
 	RoundTripperNameSnapshot = "ROUND_TRIPPER_SNAPSHOT"
 )

+ 7 - 0
etcdserver/api/rafthttp/transport.go

@@ -131,6 +131,7 @@ type Transport struct {
 	peers   map[types.ID]Peer    // peers map
 
 	pipelineProber probing.Prober
+	streamProber   probing.Prober
 }
 
 func (t *Transport) Start() error {
@@ -146,6 +147,7 @@ func (t *Transport) Start() error {
 	t.remotes = make(map[types.ID]*remote)
 	t.peers = make(map[types.ID]Peer)
 	t.pipelineProber = probing.NewProber(t.pipelineRt)
+	t.streamProber = probing.NewProber(t.streamRt)
 
 	// If client didn't provide dial retry frequency, use the default
 	// (100ms backoff between attempts to create a new stream),
@@ -222,6 +224,7 @@ func (t *Transport) Stop() {
 		p.stop()
 	}
 	t.pipelineProber.RemoveAll()
+	t.streamProber.RemoveAll()
 	if tr, ok := t.streamRt.(*http.Transport); ok {
 		tr.CloseIdleConnections()
 	}
@@ -318,6 +321,7 @@ func (t *Transport) AddPeer(id types.ID, us []string) {
 	fs := t.LeaderStats.Follower(id.String())
 	t.peers[id] = startPeer(t, urls, id, fs)
 	addPeerToProber(t.Logger, t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rttSec)
+	addPeerToProber(t.Logger, t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rttSec)
 
 	if t.Logger != nil {
 		t.Logger.Info(
@@ -359,6 +363,7 @@ func (t *Transport) removePeer(id types.ID) {
 	delete(t.peers, id)
 	delete(t.LeaderStats.Followers, id.String())
 	t.pipelineProber.Remove(id.String())
+	t.streamProber.Remove(id.String())
 
 	if t.Logger != nil {
 		t.Logger.Info(
@@ -390,6 +395,8 @@ func (t *Transport) UpdatePeer(id types.ID, us []string) {
 
 	t.pipelineProber.Remove(id.String())
 	addPeerToProber(t.Logger, t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rttSec)
+	t.streamProber.Remove(id.String())
+	addPeerToProber(t.Logger, t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rttSec)
 
 	if t.Logger != nil {
 		t.Logger.Info(