Преглед изворни кода

rafthttp: probe all raft transports

This PR adds another probing routine to monitor the connection
for Raft message transports. Previously, we only monitored
snapshot transports.

In our production cluster, we found one TCP connection had >8-sec
latencies to a remote peer, but "etcd_network_peer_round_trip_time_seconds"
metrics shows <1-sec latency distribution, which means etcd server
was not sampling enough while such latency spikes happen
outside of snapshot pipeline connection.

Signed-off-by: Gyuho Lee <leegyuho@amazon.com>
Gyuho Lee пре 7 година
родитељ
комит
b45f5306dc
3 измењених фајлова са 58 додато и 35 уклоњено
  1. 15 6
      rafthttp/probing_status.go
  2. 14 8
      rafthttp/transport.go
  3. 29 21
      rafthttp/transport_test.go

+ 15 - 6
rafthttp/probing_status.go

@@ -17,6 +17,7 @@ package rafthttp
 import (
 import (
 	"time"
 	"time"
 
 
+	"github.com/prometheus/client_golang/prometheus"
 	"github.com/xiang90/probing"
 	"github.com/xiang90/probing"
 )
 )
 
 
@@ -28,7 +29,15 @@ var (
 	statusErrorInterval      = 5 * time.Second
 	statusErrorInterval      = 5 * time.Second
 )
 )
 
 
-func addPeerToProber(p probing.Prober, id string, us []string) {
+const (
+	// RoundTripperNameRaftMessage is the name of round-tripper that sends
+	// all other Raft messages, other than "snap.Message".
+	RoundTripperNameRaftMessage = "ROUND_TRIPPER_RAFT_MESSAGE"
+	// RoundTripperNameSnapshot is the name of round-tripper that sends merged snapshot message.
+	RoundTripperNameSnapshot = "ROUND_TRIPPER_SNAPSHOT"
+)
+
+func addPeerToProber(p probing.Prober, id string, us []string, roundTripperName string, rttSecProm *prometheus.HistogramVec) {
 	hus := make([]string, len(us))
 	hus := make([]string, len(us))
 	for i := range us {
 	for i := range us {
 		hus[i] = us[i] + ProbingPrefix
 		hus[i] = us[i] + ProbingPrefix
@@ -40,26 +49,26 @@ func addPeerToProber(p probing.Prober, id string, us []string) {
 	if err != nil {
 	if err != nil {
 		plog.Errorf("failed to add peer %s into prober", id)
 		plog.Errorf("failed to add peer %s into prober", id)
 	} else {
 	} else {
-		go monitorProbingStatus(s, id)
+		go monitorProbingStatus(s, id, roundTripperName, rttSecProm)
 	}
 	}
 }
 }
 
 
-func monitorProbingStatus(s probing.Status, id string) {
+func monitorProbingStatus(s probing.Status, id string, roundTripperName string, rttSecProm *prometheus.HistogramVec) {
 	// set the first interval short to log error early.
 	// set the first interval short to log error early.
 	interval := statusErrorInterval
 	interval := statusErrorInterval
 	for {
 	for {
 		select {
 		select {
 		case <-time.After(interval):
 		case <-time.After(interval):
 			if !s.Health() {
 			if !s.Health() {
-				plog.Warningf("health check for peer %s could not connect: %v", id, s.Err())
+				plog.Warningf("health check for peer %s could not connect: %v (prober %q)", id, s.Err(), roundTripperName)
 				interval = statusErrorInterval
 				interval = statusErrorInterval
 			} else {
 			} else {
 				interval = statusMonitoringInterval
 				interval = statusMonitoringInterval
 			}
 			}
 			if s.ClockDiff() > time.Second {
 			if s.ClockDiff() > time.Second {
-				plog.Warningf("the clock difference against peer %s is too high [%v > %v]", id, s.ClockDiff(), time.Second)
+				plog.Warningf("the clock difference against peer %s is too high [%v > %v] (prober %q)", id, s.ClockDiff(), time.Second, roundTripperName)
 			}
 			}
-			rtts.WithLabelValues(id).Observe(s.SRTT().Seconds())
+			rttSecProm.WithLabelValues(id).Observe(s.SRTT().Seconds())
 		case <-s.StopNotify():
 		case <-s.StopNotify():
 			return
 			return
 		}
 		}

+ 14 - 8
rafthttp/transport.go

@@ -127,7 +127,8 @@ type Transport struct {
 	remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up
 	remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up
 	peers   map[types.ID]Peer    // peers map
 	peers   map[types.ID]Peer    // peers map
 
 
-	prober probing.Prober
+	pipelineProber probing.Prober
+	streamProber   probing.Prober
 }
 }
 
 
 func (t *Transport) Start() error {
 func (t *Transport) Start() error {
@@ -142,7 +143,8 @@ func (t *Transport) Start() error {
 	}
 	}
 	t.remotes = make(map[types.ID]*remote)
 	t.remotes = make(map[types.ID]*remote)
 	t.peers = make(map[types.ID]Peer)
 	t.peers = make(map[types.ID]Peer)
-	t.prober = probing.NewProber(t.pipelineRt)
+	t.pipelineProber = probing.NewProber(t.pipelineRt)
+	t.streamProber = probing.NewProber(t.streamRt)
 
 
 	// If client didn't provide dial retry frequency, use the default
 	// If client didn't provide dial retry frequency, use the default
 	// (100ms backoff between attempts to create a new stream),
 	// (100ms backoff between attempts to create a new stream),
@@ -210,7 +212,8 @@ func (t *Transport) Stop() {
 	for _, p := range t.peers {
 	for _, p := range t.peers {
 		p.stop()
 		p.stop()
 	}
 	}
-	t.prober.RemoveAll()
+	t.pipelineProber.RemoveAll()
+	t.streamProber.RemoveAll()
 	if tr, ok := t.streamRt.(*http.Transport); ok {
 	if tr, ok := t.streamRt.(*http.Transport); ok {
 		tr.CloseIdleConnections()
 		tr.CloseIdleConnections()
 	}
 	}
@@ -289,8 +292,8 @@ func (t *Transport) AddPeer(id types.ID, us []string) {
 	}
 	}
 	fs := t.LeaderStats.Follower(id.String())
 	fs := t.LeaderStats.Follower(id.String())
 	t.peers[id] = startPeer(t, urls, id, fs)
 	t.peers[id] = startPeer(t, urls, id, fs)
-	addPeerToProber(t.prober, id.String(), us)
-
+	addPeerToProber(t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rtts)
+	addPeerToProber(t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rtts)
 	plog.Infof("added peer %s", id)
 	plog.Infof("added peer %s", id)
 }
 }
 
 
@@ -317,7 +320,8 @@ func (t *Transport) removePeer(id types.ID) {
 	}
 	}
 	delete(t.peers, id)
 	delete(t.peers, id)
 	delete(t.LeaderStats.Followers, id.String())
 	delete(t.LeaderStats.Followers, id.String())
-	t.prober.Remove(id.String())
+	t.pipelineProber.Remove(id.String())
+	t.streamProber.Remove(id.String())
 	plog.Infof("removed peer %s", id)
 	plog.Infof("removed peer %s", id)
 }
 }
 
 
@@ -334,8 +338,10 @@ func (t *Transport) UpdatePeer(id types.ID, us []string) {
 	}
 	}
 	t.peers[id].update(urls)
 	t.peers[id].update(urls)
 
 
-	t.prober.Remove(id.String())
-	addPeerToProber(t.prober, id.String(), us)
+	t.pipelineProber.Remove(id.String())
+	addPeerToProber(t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rtts)
+	t.streamProber.Remove(id.String())
+	addPeerToProber(t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rtts)
 	plog.Infof("updated peer %s", id)
 	plog.Infof("updated peer %s", id)
 }
 }
 
 

+ 29 - 21
rafthttp/transport_test.go

@@ -33,8 +33,10 @@ func TestTransportSend(t *testing.T) {
 	peer1 := newFakePeer()
 	peer1 := newFakePeer()
 	peer2 := newFakePeer()
 	peer2 := newFakePeer()
 	tr := &Transport{
 	tr := &Transport{
-		ServerStats: stats.NewServerStats("", ""),
-		peers:       map[types.ID]Peer{types.ID(1): peer1, types.ID(2): peer2},
+		ServerStats:    stats.NewServerStats("", ""),
+		peers:          map[types.ID]Peer{types.ID(1): peer1, types.ID(2): peer2},
+		pipelineProber: probing.NewProber(nil),
+		streamProber:   probing.NewProber(nil),
 	}
 	}
 	wmsgsIgnored := []raftpb.Message{
 	wmsgsIgnored := []raftpb.Message{
 		// bad local message
 		// bad local message
@@ -68,8 +70,10 @@ func TestTransportCutMend(t *testing.T) {
 	peer1 := newFakePeer()
 	peer1 := newFakePeer()
 	peer2 := newFakePeer()
 	peer2 := newFakePeer()
 	tr := &Transport{
 	tr := &Transport{
-		ServerStats: stats.NewServerStats("", ""),
-		peers:       map[types.ID]Peer{types.ID(1): peer1, types.ID(2): peer2},
+		ServerStats:    stats.NewServerStats("", ""),
+		peers:          map[types.ID]Peer{types.ID(1): peer1, types.ID(2): peer2},
+		pipelineProber: probing.NewProber(nil),
+		streamProber:   probing.NewProber(nil),
 	}
 	}
 
 
 	tr.CutPeer(types.ID(1))
 	tr.CutPeer(types.ID(1))
@@ -96,10 +100,11 @@ func TestTransportCutMend(t *testing.T) {
 func TestTransportAdd(t *testing.T) {
 func TestTransportAdd(t *testing.T) {
 	ls := stats.NewLeaderStats("")
 	ls := stats.NewLeaderStats("")
 	tr := &Transport{
 	tr := &Transport{
-		LeaderStats: ls,
-		streamRt:    &roundTripperRecorder{},
-		peers:       make(map[types.ID]Peer),
-		prober:      probing.NewProber(nil),
+		LeaderStats:    ls,
+		streamRt:       &roundTripperRecorder{},
+		peers:          make(map[types.ID]Peer),
+		pipelineProber: probing.NewProber(nil),
+		streamProber:   probing.NewProber(nil),
 	}
 	}
 	tr.AddPeer(1, []string{"http://localhost:2380"})
 	tr.AddPeer(1, []string{"http://localhost:2380"})
 
 
@@ -124,10 +129,11 @@ func TestTransportAdd(t *testing.T) {
 
 
 func TestTransportRemove(t *testing.T) {
 func TestTransportRemove(t *testing.T) {
 	tr := &Transport{
 	tr := &Transport{
-		LeaderStats: stats.NewLeaderStats(""),
-		streamRt:    &roundTripperRecorder{},
-		peers:       make(map[types.ID]Peer),
-		prober:      probing.NewProber(nil),
+		LeaderStats:    stats.NewLeaderStats(""),
+		streamRt:       &roundTripperRecorder{},
+		peers:          make(map[types.ID]Peer),
+		pipelineProber: probing.NewProber(nil),
+		streamProber:   probing.NewProber(nil),
 	}
 	}
 	tr.AddPeer(1, []string{"http://localhost:2380"})
 	tr.AddPeer(1, []string{"http://localhost:2380"})
 	tr.RemovePeer(types.ID(1))
 	tr.RemovePeer(types.ID(1))
@@ -141,8 +147,9 @@ func TestTransportRemove(t *testing.T) {
 func TestTransportUpdate(t *testing.T) {
 func TestTransportUpdate(t *testing.T) {
 	peer := newFakePeer()
 	peer := newFakePeer()
 	tr := &Transport{
 	tr := &Transport{
-		peers:  map[types.ID]Peer{types.ID(1): peer},
-		prober: probing.NewProber(nil),
+		peers:          map[types.ID]Peer{types.ID(1): peer},
+		pipelineProber: probing.NewProber(nil),
+		streamProber:   probing.NewProber(nil),
 	}
 	}
 	u := "http://localhost:2380"
 	u := "http://localhost:2380"
 	tr.UpdatePeer(types.ID(1), []string{u})
 	tr.UpdatePeer(types.ID(1), []string{u})
@@ -155,13 +162,14 @@ func TestTransportUpdate(t *testing.T) {
 func TestTransportErrorc(t *testing.T) {
 func TestTransportErrorc(t *testing.T) {
 	errorc := make(chan error, 1)
 	errorc := make(chan error, 1)
 	tr := &Transport{
 	tr := &Transport{
-		Raft:        &fakeRaft{},
-		LeaderStats: stats.NewLeaderStats(""),
-		ErrorC:      errorc,
-		streamRt:    newRespRoundTripper(http.StatusForbidden, nil),
-		pipelineRt:  newRespRoundTripper(http.StatusForbidden, nil),
-		peers:       make(map[types.ID]Peer),
-		prober:      probing.NewProber(nil),
+		Raft:           &fakeRaft{},
+		LeaderStats:    stats.NewLeaderStats(""),
+		ErrorC:         errorc,
+		streamRt:       newRespRoundTripper(http.StatusForbidden, nil),
+		pipelineRt:     newRespRoundTripper(http.StatusForbidden, nil),
+		peers:          make(map[types.ID]Peer),
+		pipelineProber: probing.NewProber(nil),
+		streamProber:   probing.NewProber(nil),
 	}
 	}
 	tr.AddPeer(1, []string{"http://localhost:2380"})
 	tr.AddPeer(1, []string{"http://localhost:2380"})
 	defer tr.Stop()
 	defer tr.Stop()