Browse Source

etcdserver/api/rafthttp: add "etcd_network_active_peers/disconnected_peers_total"

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
Gyuho Lee 7 years ago
parent
commit
3821f3364d

+ 20 - 0
etcdserver/api/rafthttp/metrics.go

@@ -17,6 +17,24 @@ package rafthttp
 import "github.com/prometheus/client_golang/prometheus"
 
 var (
+	activePeers = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+		Namespace: "etcd",
+		Subsystem: "network",
+		Name:      "active_peers",
+		Help:      "The current number of active peer connections.",
+	},
+		[]string{"Local", "Remote"},
+	)
+
+	disconnectedPeers = prometheus.NewCounterVec(prometheus.CounterOpts{
+		Namespace: "etcd",
+		Subsystem: "network",
+		Name:      "disconnected_peers_total",
+		Help:      "The total number of disconnected peers.",
+	},
+		[]string{"Local", "Remote"},
+	)
+
 	sentBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
 		Namespace: "etcd",
 		Subsystem: "network",
@@ -68,6 +86,8 @@ var (
 )
 
 func init() {
+	prometheus.MustRegister(activePeers)
+	prometheus.MustRegister(disconnectedPeers)
 	prometheus.MustRegister(sentBytes)
 	prometheus.MustRegister(receivedBytes)
 	prometheus.MustRegister(sentFailures)

+ 1 - 1
etcdserver/api/rafthttp/peer.go

@@ -137,7 +137,7 @@ func startPeer(t *Transport, urls types.URLs, peerID types.ID, fs *stats.Followe
 		}
 	}()
 
-	status := newPeerStatus(t.Logger, peerID)
+	status := newPeerStatus(t.Logger, t.ID, peerID)
 	picker := newURLPicker(urls)
 	errorc := t.ErrorC
 	r := t.Raft

+ 9 - 2
etcdserver/api/rafthttp/peer_status.go

@@ -32,14 +32,15 @@ type failureType struct {
 
 type peerStatus struct {
 	lg     *zap.Logger
+	local  types.ID
 	id     types.ID
 	mu     sync.Mutex // protect variables below
 	active bool
 	since  time.Time
 }
 
-func newPeerStatus(lg *zap.Logger, id types.ID) *peerStatus {
-	return &peerStatus{lg: lg, id: id}
+func newPeerStatus(lg *zap.Logger, local, id types.ID) *peerStatus {
+	return &peerStatus{lg: lg, local: local, id: id}
 }
 
 func (s *peerStatus) activate() {
@@ -53,6 +54,8 @@ func (s *peerStatus) activate() {
 		}
 		s.active = true
 		s.since = time.Now()
+
+		activePeers.WithLabelValues(s.local.String(), s.id.String()).Inc()
 	}
 }
 
@@ -69,8 +72,12 @@ func (s *peerStatus) deactivate(failure failureType, reason string) {
 		}
 		s.active = false
 		s.since = time.Time{}
+
+		activePeers.WithLabelValues(s.local.String(), s.id.String()).Dec()
+		disconnectedPeers.WithLabelValues(s.local.String(), s.id.String()).Inc()
 		return
 	}
+
 	if s.lg != nil {
 		s.lg.Debug("peer deactivated again", zap.String("peer-id", s.id.String()), zap.Error(errors.New(msg)))
 	}

+ 1 - 1
etcdserver/api/rafthttp/pipeline_test.go

@@ -303,7 +303,7 @@ func startTestPipeline(tr *Transport, picker *urlPicker) *pipeline {
 		peerID:        types.ID(1),
 		tr:            tr,
 		picker:        picker,
-		status:        newPeerStatus(zap.NewExample(), types.ID(1)),
+		status:        newPeerStatus(zap.NewExample(), tr.ID, types.ID(1)),
 		raft:          &fakeRaft{},
 		followerStats: &stats.FollowerStats{},
 		errorc:        make(chan error, 1),

+ 1 - 1
etcdserver/api/rafthttp/remote.go

@@ -31,7 +31,7 @@ type remote struct {
 
 func startRemote(tr *Transport, urls types.URLs, id types.ID) *remote {
 	picker := newURLPicker(urls)
-	status := newPeerStatus(tr.Logger, id)
+	status := newPeerStatus(tr.Logger, tr.ID, id)
 	pipeline := &pipeline{
 		peerID: id,
 		tr:     tr,

+ 1 - 1
etcdserver/api/rafthttp/snapshot_test.go

@@ -109,7 +109,7 @@ func testSnapshotSend(t *testing.T, sm *snap.Message) (bool, []os.FileInfo) {
 	defer srv.Close()
 
 	picker := mustNewURLPicker(t, []string{srv.URL})
-	snapsend := newSnapshotSender(tr, picker, types.ID(1), newPeerStatus(zap.NewExample(), types.ID(1)))
+	snapsend := newSnapshotSender(tr, picker, types.ID(1), newPeerStatus(zap.NewExample(), types.ID(0), types.ID(1)))
 	defer snapsend.stop()
 
 	snapsend.send(*sm)

+ 5 - 5
etcdserver/api/rafthttp/stream_test.go

@@ -41,7 +41,7 @@ import (
 // to streamWriter. After that, streamWriter can use it to send messages
 // continuously, and closes it when stopped.
 func TestStreamWriterAttachOutgoingConn(t *testing.T) {
-	sw := startStreamWriter(zap.NewExample(), types.ID(0), types.ID(1), newPeerStatus(zap.NewExample(), types.ID(1)), &stats.FollowerStats{}, &fakeRaft{})
+	sw := startStreamWriter(zap.NewExample(), types.ID(0), types.ID(1), newPeerStatus(zap.NewExample(), types.ID(0), types.ID(1)), &stats.FollowerStats{}, &fakeRaft{})
 	// the expected initial state of streamWriter is not working
 	if _, ok := sw.writec(); ok {
 		t.Errorf("initial working status = %v, want false", ok)
@@ -93,7 +93,7 @@ func TestStreamWriterAttachOutgoingConn(t *testing.T) {
 // TestStreamWriterAttachBadOutgoingConn tests that streamWriter with bad
 // outgoingConn will close the outgoingConn and fall back to non-working status.
 func TestStreamWriterAttachBadOutgoingConn(t *testing.T) {
-	sw := startStreamWriter(zap.NewExample(), types.ID(0), types.ID(1), newPeerStatus(zap.NewExample(), types.ID(1)), &stats.FollowerStats{}, &fakeRaft{})
+	sw := startStreamWriter(zap.NewExample(), types.ID(0), types.ID(1), newPeerStatus(zap.NewExample(), types.ID(0), types.ID(1)), &stats.FollowerStats{}, &fakeRaft{})
 	defer sw.stop()
 	wfc := newFakeWriteFlushCloser(errors.New("blah"))
 	sw.attach(&outgoingConn{t: streamTypeMessage, Writer: wfc, Flusher: wfc, Closer: wfc})
@@ -197,7 +197,7 @@ func TestStreamReaderStopOnDial(t *testing.T) {
 		picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
 		errorc: make(chan error, 1),
 		typ:    streamTypeMessage,
-		status: newPeerStatus(zap.NewExample(), types.ID(2)),
+		status: newPeerStatus(zap.NewExample(), types.ID(1), types.ID(2)),
 		rl:     rate.NewLimiter(rate.Every(100*time.Millisecond), 1),
 	}
 	tr.onResp = func() {
@@ -304,7 +304,7 @@ func TestStream(t *testing.T) {
 		srv := httptest.NewServer(h)
 		defer srv.Close()
 
-		sw := startStreamWriter(zap.NewExample(), types.ID(0), types.ID(1), newPeerStatus(zap.NewExample(), types.ID(1)), &stats.FollowerStats{}, &fakeRaft{})
+		sw := startStreamWriter(zap.NewExample(), types.ID(0), types.ID(1), newPeerStatus(zap.NewExample(), types.ID(0), types.ID(1)), &stats.FollowerStats{}, &fakeRaft{})
 		defer sw.stop()
 		h.sw = sw
 
@@ -316,7 +316,7 @@ func TestStream(t *testing.T) {
 			typ:    tt.t,
 			tr:     tr,
 			picker: picker,
-			status: newPeerStatus(zap.NewExample(), types.ID(2)),
+			status: newPeerStatus(zap.NewExample(), types.ID(0), types.ID(2)),
 			recvc:  recvc,
 			propc:  propc,
 			rl:     rate.NewLimiter(rate.Every(100*time.Millisecond), 1),