Browse Source

rafthttp: monitor connection

Xiang Li 10 years ago
parent
commit
0fc764200d
4 changed files with 80 additions and 3 deletions
  1. 1 0
      rafthttp/http.go
  2. 60 0
      rafthttp/probing_status.go
  3. 13 2
      rafthttp/transport.go
  4. 6 1
      rafthttp/transport_test.go

+ 1 - 0
rafthttp/http.go

@@ -33,6 +33,7 @@ const (
 
 var (
 	RaftPrefix       = "/raft"
+	ProbingPrefix    = path.Join(RaftPrefix, "probing")
 	RaftStreamPrefix = path.Join(RaftPrefix, "stream")
 
 	errIncompatibleVersion = errors.New("incompatible version")

+ 60 - 0
rafthttp/probing_status.go

@@ -0,0 +1,60 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rafthttp
+
+import (
+	"time"
+
+	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/xiang90/probing"
+)
+
+var (
+	// proberInterval must be shorter than read timeout.
+	// Or the connection will time-out.
+	proberInterval           = ConnReadTimeout - time.Second
+	statusMonitoringInterval = 30 * time.Second
+)
+
+func addPeerToProber(p probing.Prober, id string, us []string) {
+	hus := make([]string, len(us))
+	for i := range us {
+		hus[i] = us[i] + ProbingPrefix
+	}
+
+	p.AddHTTP(id, proberInterval, hus)
+
+	s, err := p.Status(id)
+	if err != nil {
+		plog.Errorf("failed to add peer %s into prober", id)
+	} else {
+		go monitorProbingStatus(s, id)
+	}
+}
+
+func monitorProbingStatus(s probing.Status, id string) {
+	for {
+		select {
+		case <-time.After(statusMonitoringInterval):
+			if !s.Health() {
+				plog.Warningf("the connection to peer %s is unhealthy", id)
+			}
+			if s.ClockDiff() > time.Second {
+				plog.Warningf("the clock difference against peer %s is too high [%v > %v]", id, s.ClockDiff(), time.Second)
+			}
+		case <-s.StopNotify():
+			return
+		}
+	}
+}

+ 13 - 2
rafthttp/transport.go

@@ -19,6 +19,7 @@ import (
 	"sync"
 
 	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog"
+	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/xiang90/probing"
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
 	"github.com/coreos/etcd/etcdserver/stats"
 	"github.com/coreos/etcd/pkg/types"
@@ -83,7 +84,9 @@ type transport struct {
 	term    uint64               // the latest term that has been observed
 	remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up
 	peers   map[types.ID]Peer    // peers map
-	errorc  chan error
+
+	prober probing.Prober
+	errorc chan error
 }
 
 func NewTransporter(rt http.RoundTripper, id, cid types.ID, r Raft, errorc chan error, ss *stats.ServerStats, ls *stats.LeaderStats) Transporter {
@@ -96,7 +99,9 @@ func NewTransporter(rt http.RoundTripper, id, cid types.ID, r Raft, errorc chan
 		leaderStats:  ls,
 		remotes:      make(map[types.ID]*remote),
 		peers:        make(map[types.ID]Peer),
-		errorc:       errorc,
+
+		prober: probing.NewProber(),
+		errorc: errorc,
 	}
 }
 
@@ -106,6 +111,7 @@ func (t *transport) Handler() http.Handler {
 	mux := http.NewServeMux()
 	mux.Handle(RaftPrefix, pipelineHandler)
 	mux.Handle(RaftStreamPrefix+"/", streamHandler)
+	mux.Handle(ProbingPrefix, probing.NewHandler())
 	return mux
 }
 
@@ -195,6 +201,7 @@ func (t *transport) AddPeer(id types.ID, us []string) {
 	}
 	fs := t.leaderStats.Follower(id.String())
 	t.peers[id] = startPeer(t.roundTripper, urls, t.id, id, t.clusterID, t.raft, fs, t.errorc, t.term)
+	addPeerToProber(t.prober, id.String(), us)
 }
 
 func (t *transport) RemovePeer(id types.ID) {
@@ -220,6 +227,7 @@ func (t *transport) removePeer(id types.ID) {
 	}
 	delete(t.peers, id)
 	delete(t.leaderStats.Followers, id.String())
+	t.prober.Remove(id.String())
 }
 
 func (t *transport) UpdatePeer(id types.ID, us []string) {
@@ -234,6 +242,9 @@ func (t *transport) UpdatePeer(id types.ID, us []string) {
 		plog.Panicf("newURLs %+v should never fail: %+v", us, err)
 	}
 	t.peers[id].Update(urls)
+
+	t.prober.Remove(id.String())
+	addPeerToProber(t.prober, id.String(), us)
 }
 
 type Pausable interface {

+ 6 - 1
rafthttp/transport_test.go

@@ -20,6 +20,7 @@ import (
 	"testing"
 	"time"
 
+	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/xiang90/probing"
 	"github.com/coreos/etcd/etcdserver/stats"
 	"github.com/coreos/etcd/pkg/testutil"
 	"github.com/coreos/etcd/pkg/types"
@@ -73,6 +74,7 @@ func TestTransportAdd(t *testing.T) {
 		leaderStats:  ls,
 		term:         term,
 		peers:        make(map[types.ID]Peer),
+		prober:       probing.NewProber(),
 	}
 	tr.AddPeer(1, []string{"http://localhost:2380"})
 
@@ -104,6 +106,7 @@ func TestTransportRemove(t *testing.T) {
 		roundTripper: &roundTripperRecorder{},
 		leaderStats:  stats.NewLeaderStats(""),
 		peers:        make(map[types.ID]Peer),
+		prober:       probing.NewProber(),
 	}
 	tr.AddPeer(1, []string{"http://localhost:2380"})
 	tr.RemovePeer(types.ID(1))
@@ -117,7 +120,8 @@ func TestTransportRemove(t *testing.T) {
 func TestTransportUpdate(t *testing.T) {
 	peer := newFakePeer()
 	tr := &transport{
-		peers: map[types.ID]Peer{types.ID(1): peer},
+		peers:  map[types.ID]Peer{types.ID(1): peer},
+		prober: probing.NewProber(),
 	}
 	u := "http://localhost:2380"
 	tr.UpdatePeer(types.ID(1), []string{u})
@@ -133,6 +137,7 @@ func TestTransportErrorc(t *testing.T) {
 		roundTripper: newRespRoundTripper(http.StatusForbidden, nil),
 		leaderStats:  stats.NewLeaderStats(""),
 		peers:        make(map[types.ID]Peer),
+		prober:       probing.NewProber(),
 		errorc:       errorc,
 	}
 	tr.AddPeer(1, []string{"http://localhost:2380"})