Pārlūkot izejas kodu

Merge pull request #3210 from xiang90/probing

monitoring connectivity between peers
Xiang Li 10 gadi atpakaļ
vecāks
revīzija
718a42f408

+ 4 - 0
Godeps/Godeps.json

@@ -102,6 +102,10 @@
 			"ImportPath": "github.com/stretchr/testify/assert",
 			"Rev": "9cc77fa25329013ce07362c7742952ff887361f2"
 		},
+		{
+			"ImportPath": "github.com/xiang90/probing",
+			"Rev": "11caf1c32ca4055f97e55541e92a75966635981d"
+		},
 		{
 			"ImportPath": "golang.org/x/crypto/bcrypt",
 			"Rev": "1351f936d976c60a0a48d728281922cf63eafb8d"

+ 24 - 0
Godeps/_workspace/src/github.com/xiang90/probing/.gitignore

@@ -0,0 +1,24 @@
+# Compiled Object files, Static and Dynamic libs (Shared Objects)
+*.o
+*.a
+*.so
+
+# Folders
+_obj
+_test
+
+# Architecture specific extensions/prefixes
+*.[568vq]
+[568vq].out
+
+*.cgo1.go
+*.cgo2.c
+_cgo_defun.c
+_cgo_gotypes.go
+_cgo_export.*
+
+_testmain.go
+
+*.exe
+*.test
+*.prof

+ 22 - 0
Godeps/_workspace/src/github.com/xiang90/probing/LICENSE

@@ -0,0 +1,22 @@
+The MIT License (MIT)
+
+Copyright (c) 2015 Xiang Li
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
+

+ 39 - 0
Godeps/_workspace/src/github.com/xiang90/probing/README.md

@@ -0,0 +1,39 @@
+## Getting Started
+
+### Install the handler
+
+We first need to serve the probing HTTP handler.
+
+```go
+    http.HandleFunc("/health", probing.NewHandler())
+    err := http.ListenAndServe(":12345", nil)
+	if err != nil {
+		log.Fatal("ListenAndServe: ", err)
+	}
+```
+
+### Start to probe
+
+Now we can start to probe the endpoint.
+
+``` go
+    id := "example"
+    probingInterval = 5 * time.Second
+    url := "http://example.com:12345/health"
+    p.AddHTTP(id, probingInterval, url)
+
+	time.Sleep(13 * time.Second)
+	status, err := p.Status(id)
+ 	fmt.Printf("Total Probing: %d, Total Loss: %d, Estimated RTT: %v, Estimated Clock Difference: %v\n",
+		status.Total(), status.Loss(), status.SRTT(), status.ClockDiff())
+	// Total Probing: 2, Total Loss: 0, Estimated RTT: 320.771µs, Estimated Clock Difference: -35.869µs
+```
+
+### TODOs:
+
+- TCP probing
+- UDP probing
+- Gossip based probing
+- More accurate RTT estimation
+- More accurate Clock difference estimation
+- Use a clock interface rather than the real clock

+ 123 - 0
Godeps/_workspace/src/github.com/xiang90/probing/prober.go

@@ -0,0 +1,123 @@
+package probing
+
+import (
+	"encoding/json"
+	"errors"
+	"net/http"
+	"sync"
+	"time"
+)
+
+var (
+	ErrNotFound = errors.New("probing: id not found")
+	ErrExist    = errors.New("probing: id exists")
+)
+
+type Prober interface {
+	AddHTTP(id string, probingInterval time.Duration, endpoints []string) error
+	Remove(id string) error
+	RemoveAll()
+	Reset(id string) error
+	Status(id string) (Status, error)
+}
+
+type prober struct {
+	mu      sync.Mutex
+	targets map[string]*status
+}
+
+func NewProber() Prober {
+	return &prober{targets: make(map[string]*status)}
+}
+
+func (p *prober) AddHTTP(id string, probingInterval time.Duration, endpoints []string) error {
+	p.mu.Lock()
+	defer p.mu.Unlock()
+	if _, ok := p.targets[id]; ok {
+		return ErrExist
+	}
+
+	s := &status{stopC: make(chan struct{})}
+	p.targets[id] = s
+
+	ticker := time.NewTicker(probingInterval)
+
+	go func() {
+		pinned := 0
+		for {
+			select {
+			case <-ticker.C:
+				start := time.Now()
+				resp, err := http.Get(endpoints[pinned])
+				if err != nil {
+					s.recordFailure()
+					pinned = (pinned + 1) % len(endpoints)
+					continue
+				}
+
+				var hh Health
+				d := json.NewDecoder(resp.Body)
+				err = d.Decode(&hh)
+				resp.Body.Close()
+				if err != nil || !hh.OK {
+					s.recordFailure()
+					pinned = (pinned + 1) % len(endpoints)
+					continue
+				}
+
+				s.record(time.Since(start), hh.Now)
+			case <-s.stopC:
+				ticker.Stop()
+				return
+			}
+		}
+	}()
+
+	return nil
+}
+
+func (p *prober) Remove(id string) error {
+	p.mu.Lock()
+	defer p.mu.Unlock()
+
+	s, ok := p.targets[id]
+	if !ok {
+		return ErrNotFound
+	}
+	close(s.stopC)
+	delete(p.targets, id)
+	return nil
+}
+
+func (p *prober) RemoveAll() {
+	p.mu.Lock()
+	defer p.mu.Unlock()
+
+	for _, s := range p.targets {
+		close(s.stopC)
+	}
+	p.targets = make(map[string]*status)
+}
+
+func (p *prober) Reset(id string) error {
+	p.mu.Lock()
+	defer p.mu.Unlock()
+
+	s, ok := p.targets[id]
+	if !ok {
+		return ErrNotFound
+	}
+	s.reset()
+	return nil
+}
+
+func (p *prober) Status(id string) (Status, error) {
+	p.mu.Lock()
+	defer p.mu.Unlock()
+
+	s, ok := p.targets[id]
+	if !ok {
+		return nil, ErrNotFound
+	}
+	return s, nil
+}

+ 90 - 0
Godeps/_workspace/src/github.com/xiang90/probing/prober_test.go

@@ -0,0 +1,90 @@
+package probing
+
+import (
+	"net/http/httptest"
+	"testing"
+	"time"
+)
+
+var (
+	testID = "testID"
+)
+
+func TestProbe(t *testing.T) {
+	s := httptest.NewServer(NewHandler())
+
+	p := NewProber()
+	p.AddHTTP(testID, time.Millisecond, []string{s.URL})
+	defer p.Remove(testID)
+
+	time.Sleep(100 * time.Millisecond)
+	status, err := p.Status(testID)
+	if err != nil {
+		t.Fatalf("err = %v, want %v", err, nil)
+	}
+	if total := status.Total(); total < 50 || total > 150 {
+		t.Fatalf("total = %v, want around %v", total, 100)
+	}
+	if health := status.Health(); health != true {
+		t.Fatalf("health = %v, want %v", health, true)
+	}
+
+	// become unhealthy
+	s.Close()
+
+	time.Sleep(100 * time.Millisecond)
+	if total := status.Total(); total < 150 || total > 250 {
+		t.Fatalf("total = %v, want around %v", total, 200)
+	}
+	if loss := status.Loss(); loss < 50 || loss > 150 {
+		t.Fatalf("loss = %v, want around %v", loss, 200)
+	}
+	if health := status.Health(); health != false {
+		t.Fatalf("health = %v, want %v", health, false)
+	}
+}
+
+func TestProbeReset(t *testing.T) {
+	s := httptest.NewServer(NewHandler())
+	defer s.Close()
+
+	p := NewProber()
+	p.AddHTTP(testID, time.Millisecond, []string{s.URL})
+	defer p.Remove(testID)
+
+	time.Sleep(100 * time.Millisecond)
+	status, err := p.Status(testID)
+	if err != nil {
+		t.Fatalf("err = %v, want %v", err, nil)
+	}
+	if total := status.Total(); total < 50 || total > 150 {
+		t.Fatalf("total = %v, want around %v", total, 100)
+	}
+	if health := status.Health(); health != true {
+		t.Fatalf("health = %v, want %v", health, true)
+	}
+
+	p.Reset(testID)
+
+	time.Sleep(100 * time.Millisecond)
+	if total := status.Total(); total < 50 || total > 150 {
+		t.Fatalf("total = %v, want around %v", total, 100)
+	}
+	if health := status.Health(); health != true {
+		t.Fatalf("health = %v, want %v", health, true)
+	}
+}
+
+func TestProbeRemove(t *testing.T) {
+	s := httptest.NewServer(NewHandler())
+	defer s.Close()
+
+	p := NewProber()
+	p.AddHTTP(testID, time.Millisecond, []string{s.URL})
+
+	p.Remove(testID)
+	_, err := p.Status(testID)
+	if err != ErrNotFound {
+		t.Fatalf("err = %v, want %v", err, ErrNotFound)
+	}
+}

+ 25 - 0
Godeps/_workspace/src/github.com/xiang90/probing/server.go

@@ -0,0 +1,25 @@
+package probing
+
+import (
+	"encoding/json"
+	"net/http"
+	"time"
+)
+
+func NewHandler() http.Handler {
+	return &httpHealth{}
+}
+
+type httpHealth struct {
+}
+
+type Health struct {
+	OK  bool
+	Now time.Time
+}
+
+func (h *httpHealth) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+	health := Health{OK: true, Now: time.Now()}
+	e := json.NewEncoder(w)
+	e.Encode(health)
+}

+ 96 - 0
Godeps/_workspace/src/github.com/xiang90/probing/status.go

@@ -0,0 +1,96 @@
+package probing
+
+import (
+	"sync"
+	"time"
+)
+
+var (
+	// weight factor
+	α = 0.125
+)
+
+type Status interface {
+	Total() int64
+	Loss() int64
+	Health() bool
+	// Estimated smoothed round trip time
+	SRTT() time.Duration
+	// Estimated clock difference
+	ClockDiff() time.Duration
+	StopNotify() <-chan struct{}
+}
+
+type status struct {
+	mu        sync.Mutex
+	srtt      time.Duration
+	total     int64
+	loss      int64
+	health    bool
+	clockdiff time.Duration
+	stopC     chan struct{}
+}
+
+// SRTT = (1-α) * SRTT + α * RTT
+func (s *status) SRTT() time.Duration {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	return s.srtt
+}
+
+func (s *status) Total() int64 {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	return s.total
+}
+
+func (s *status) Loss() int64 {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	return s.loss
+}
+
+func (s *status) Health() bool {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	return s.health
+}
+
+func (s *status) ClockDiff() time.Duration {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	return s.clockdiff
+}
+
+func (s *status) StopNotify() <-chan struct{} {
+	return s.stopC
+}
+
+func (s *status) record(rtt time.Duration, when time.Time) {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+
+	s.total += 1
+	s.health = true
+	s.srtt = time.Duration((1-α)*float64(s.srtt) + α*float64(rtt))
+	s.clockdiff = time.Now().Sub(when) - s.srtt/2
+}
+
+func (s *status) recordFailure() {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+
+	s.total++
+	s.health = false
+	s.loss += 1
+}
+
+func (s *status) reset() {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+
+	s.srtt = 0
+	s.total = 0
+	s.health = false
+	s.clockdiff = 0
+}

+ 1 - 0
rafthttp/http.go

@@ -33,6 +33,7 @@ const (
 
 var (
 	RaftPrefix       = "/raft"
+	ProbingPrefix    = path.Join(RaftPrefix, "probing")
 	RaftStreamPrefix = path.Join(RaftPrefix, "stream")
 
 	errIncompatibleVersion = errors.New("incompatible version")

+ 60 - 0
rafthttp/probing_status.go

@@ -0,0 +1,60 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rafthttp
+
+import (
+	"time"
+
+	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/xiang90/probing"
+)
+
+var (
+	// proberInterval must be shorter than read timeout.
+	// Or the connection will time-out.
+	proberInterval           = ConnReadTimeout - time.Second
+	statusMonitoringInterval = 30 * time.Second
+)
+
+func addPeerToProber(p probing.Prober, id string, us []string) {
+	hus := make([]string, len(us))
+	for i := range us {
+		hus[i] = us[i] + ProbingPrefix
+	}
+
+	p.AddHTTP(id, proberInterval, hus)
+
+	s, err := p.Status(id)
+	if err != nil {
+		plog.Errorf("failed to add peer %s into prober", id)
+	} else {
+		go monitorProbingStatus(s, id)
+	}
+}
+
+func monitorProbingStatus(s probing.Status, id string) {
+	for {
+		select {
+		case <-time.After(statusMonitoringInterval):
+			if !s.Health() {
+				plog.Warningf("the connection to peer %s is unhealthy", id)
+			}
+			if s.ClockDiff() > time.Second {
+				plog.Warningf("the clock difference against peer %s is too high [%v > %v]", id, s.ClockDiff(), time.Second)
+			}
+		case <-s.StopNotify():
+			return
+		}
+	}
+}

+ 14 - 2
rafthttp/transport.go

@@ -19,6 +19,7 @@ import (
 	"sync"
 
 	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog"
+	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/xiang90/probing"
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
 	"github.com/coreos/etcd/etcdserver/stats"
 	"github.com/coreos/etcd/pkg/types"
@@ -83,7 +84,9 @@ type transport struct {
 	term    uint64               // the latest term that has been observed
 	remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up
 	peers   map[types.ID]Peer    // peers map
-	errorc  chan error
+
+	prober probing.Prober
+	errorc chan error
 }
 
 func NewTransporter(rt http.RoundTripper, id, cid types.ID, r Raft, errorc chan error, ss *stats.ServerStats, ls *stats.LeaderStats) Transporter {
@@ -96,7 +99,9 @@ func NewTransporter(rt http.RoundTripper, id, cid types.ID, r Raft, errorc chan
 		leaderStats:  ls,
 		remotes:      make(map[types.ID]*remote),
 		peers:        make(map[types.ID]Peer),
-		errorc:       errorc,
+
+		prober: probing.NewProber(),
+		errorc: errorc,
 	}
 }
 
@@ -106,6 +111,7 @@ func (t *transport) Handler() http.Handler {
 	mux := http.NewServeMux()
 	mux.Handle(RaftPrefix, pipelineHandler)
 	mux.Handle(RaftStreamPrefix+"/", streamHandler)
+	mux.Handle(ProbingPrefix, probing.NewHandler())
 	return mux
 }
 
@@ -165,6 +171,7 @@ func (t *transport) Stop() {
 	for _, p := range t.peers {
 		p.Stop()
 	}
+	t.prober.RemoveAll()
 	if tr, ok := t.roundTripper.(*http.Transport); ok {
 		tr.CloseIdleConnections()
 	}
@@ -195,6 +202,7 @@ func (t *transport) AddPeer(id types.ID, us []string) {
 	}
 	fs := t.leaderStats.Follower(id.String())
 	t.peers[id] = startPeer(t.roundTripper, urls, t.id, id, t.clusterID, t.raft, fs, t.errorc, t.term)
+	addPeerToProber(t.prober, id.String(), us)
 }
 
 func (t *transport) RemovePeer(id types.ID) {
@@ -220,6 +228,7 @@ func (t *transport) removePeer(id types.ID) {
 	}
 	delete(t.peers, id)
 	delete(t.leaderStats.Followers, id.String())
+	t.prober.Remove(id.String())
 }
 
 func (t *transport) UpdatePeer(id types.ID, us []string) {
@@ -234,6 +243,9 @@ func (t *transport) UpdatePeer(id types.ID, us []string) {
 		plog.Panicf("newURLs %+v should never fail: %+v", us, err)
 	}
 	t.peers[id].Update(urls)
+
+	t.prober.Remove(id.String())
+	addPeerToProber(t.prober, id.String(), us)
 }
 
 type Pausable interface {

+ 6 - 1
rafthttp/transport_test.go

@@ -20,6 +20,7 @@ import (
 	"testing"
 	"time"
 
+	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/xiang90/probing"
 	"github.com/coreos/etcd/etcdserver/stats"
 	"github.com/coreos/etcd/pkg/testutil"
 	"github.com/coreos/etcd/pkg/types"
@@ -73,6 +74,7 @@ func TestTransportAdd(t *testing.T) {
 		leaderStats:  ls,
 		term:         term,
 		peers:        make(map[types.ID]Peer),
+		prober:       probing.NewProber(),
 	}
 	tr.AddPeer(1, []string{"http://localhost:2380"})
 
@@ -104,6 +106,7 @@ func TestTransportRemove(t *testing.T) {
 		roundTripper: &roundTripperRecorder{},
 		leaderStats:  stats.NewLeaderStats(""),
 		peers:        make(map[types.ID]Peer),
+		prober:       probing.NewProber(),
 	}
 	tr.AddPeer(1, []string{"http://localhost:2380"})
 	tr.RemovePeer(types.ID(1))
@@ -117,7 +120,8 @@ func TestTransportRemove(t *testing.T) {
 func TestTransportUpdate(t *testing.T) {
 	peer := newFakePeer()
 	tr := &transport{
-		peers: map[types.ID]Peer{types.ID(1): peer},
+		peers:  map[types.ID]Peer{types.ID(1): peer},
+		prober: probing.NewProber(),
 	}
 	u := "http://localhost:2380"
 	tr.UpdatePeer(types.ID(1), []string{u})
@@ -133,6 +137,7 @@ func TestTransportErrorc(t *testing.T) {
 		roundTripper: newRespRoundTripper(http.StatusForbidden, nil),
 		leaderStats:  stats.NewLeaderStats(""),
 		peers:        make(map[types.ID]Peer),
+		prober:       probing.NewProber(),
 		errorc:       errorc,
 	}
 	tr.AddPeer(1, []string{"http://localhost:2380"})