浏览代码

Godeps: add probing dependency

Xiang Li 10 年之前
父节点
当前提交
306085db5f

+ 4 - 0
Godeps/Godeps.json

@@ -102,6 +102,10 @@
 			"ImportPath": "github.com/stretchr/testify/assert",
 			"Rev": "9cc77fa25329013ce07362c7742952ff887361f2"
 		},
+		{
+			"ImportPath": "github.com/xiang90/probing",
+			"Rev": "e8a0407769cb84c61c2ddf8f1d9cdae9fb489b9b"
+		},
 		{
 			"ImportPath": "golang.org/x/crypto/bcrypt",
 			"Rev": "1351f936d976c60a0a48d728281922cf63eafb8d"

+ 24 - 0
Godeps/_workspace/src/github.com/xiang90/probing/.gitignore

@@ -0,0 +1,24 @@
+# Compiled Object files, Static and Dynamic libs (Shared Objects)
+*.o
+*.a
+*.so
+
+# Folders
+_obj
+_test
+
+# Architecture specific extensions/prefixes
+*.[568vq]
+[568vq].out
+
+*.cgo1.go
+*.cgo2.c
+_cgo_defun.c
+_cgo_gotypes.go
+_cgo_export.*
+
+_testmain.go
+
+*.exe
+*.test
+*.prof

+ 22 - 0
Godeps/_workspace/src/github.com/xiang90/probing/LICENSE

@@ -0,0 +1,22 @@
+The MIT License (MIT)
+
+Copyright (c) 2015 Xiang Li
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
+

+ 39 - 0
Godeps/_workspace/src/github.com/xiang90/probing/README.md

@@ -0,0 +1,39 @@
+## Getting Started
+
+### Install the handler
+
+We first need to serve the probing HTTP handler.
+
+```go
+    http.HandleFunc("/health", probing.NewHandler())
+    err := http.ListenAndServe(":12345", nil)
+	if err != nil {
+		log.Fatal("ListenAndServe: ", err)
+	}
+```
+
+### Start to probe
+
+Now we can start to probe the endpoint.
+
+``` go
+    id := "example"
+    probingInterval = 5 * time.Second
+    url := "http://example.com:12345/health"
+    p.AddHTTP(id, probingInterval, url)
+
+	time.Sleep(13 * time.Second)
+	status, err := p.Status(id)
+ 	fmt.Printf("Total Probing: %d, Total Loss: %d, Estimated RTT: %v, Estimated Clock Difference: %v\n",
+		status.Total(), status.Loss(), status.SRTT(), status.ClockDiff())
+	// Total Probing: 2, Total Loss: 0, Estimated RTT: 320.771µs, Estimated Clock Difference: -35.869µs
+```
+
+### TODOs:
+
+- TCP probing
+- UDP probing
+- Gossip based probing
+- More accurate RTT estimation
+- More accurate Clock difference estimation
+- Use a clock interface rather than the real clock

+ 112 - 0
Godeps/_workspace/src/github.com/xiang90/probing/prober.go

@@ -0,0 +1,112 @@
+package probing
+
+import (
+	"encoding/json"
+	"errors"
+	"net/http"
+	"sync"
+	"time"
+)
+
+var (
+	ErrNotFound = errors.New("probing: id not found")
+	ErrExist    = errors.New("probing: id exists")
+)
+
+type Prober interface {
+	AddHTTP(id string, probingInterval time.Duration, endpoints []string) error
+	Remove(id string) error
+	Reset(id string) error
+	Status(id string) (Status, error)
+}
+
+type prober struct {
+	mu      sync.Mutex
+	targets map[string]*status
+}
+
+func NewProber() Prober {
+	return &prober{targets: make(map[string]*status)}
+}
+
+func (p *prober) AddHTTP(id string, probingInterval time.Duration, endpoints []string) error {
+	p.mu.Lock()
+	defer p.mu.Unlock()
+	if _, ok := p.targets[id]; ok {
+		return ErrExist
+	}
+
+	s := &status{stopC: make(chan struct{})}
+	p.targets[id] = s
+
+	ticker := time.NewTicker(probingInterval)
+
+	go func() {
+		pinned := 0
+		for {
+			select {
+			case <-ticker.C:
+				start := time.Now()
+				resp, err := http.Get(endpoints[pinned])
+				if err != nil {
+					s.recordFailure()
+					pinned = (pinned + 1) % len(endpoints)
+					continue
+				}
+
+				var hh Health
+				d := json.NewDecoder(resp.Body)
+				err = d.Decode(&hh)
+				resp.Body.Close()
+				if err != nil || !hh.OK {
+					s.recordFailure()
+					pinned = (pinned + 1) % len(endpoints)
+					continue
+				}
+
+				s.record(time.Since(start), hh.Now)
+			case <-s.stopC:
+				ticker.Stop()
+				return
+			}
+		}
+	}()
+
+	return nil
+}
+
+func (p *prober) Remove(id string) error {
+	p.mu.Lock()
+	defer p.mu.Unlock()
+
+	s, ok := p.targets[id]
+	if !ok {
+		return ErrNotFound
+	}
+	close(s.stopC)
+	delete(p.targets, id)
+	return nil
+}
+
+func (p *prober) Reset(id string) error {
+	p.mu.Lock()
+	defer p.mu.Unlock()
+
+	s, ok := p.targets[id]
+	if !ok {
+		return ErrNotFound
+	}
+	s.reset()
+	return nil
+}
+
+func (p *prober) Status(id string) (Status, error) {
+	p.mu.Lock()
+	defer p.mu.Unlock()
+
+	s, ok := p.targets[id]
+	if !ok {
+		return nil, ErrNotFound
+	}
+	return s, nil
+}

+ 90 - 0
Godeps/_workspace/src/github.com/xiang90/probing/prober_test.go

@@ -0,0 +1,90 @@
+package probing
+
+import (
+	"net/http/httptest"
+	"testing"
+	"time"
+)
+
+var (
+	testID = "testID"
+)
+
+func TestProbe(t *testing.T) {
+	s := httptest.NewServer(NewHandler())
+
+	p := NewProber()
+	p.AddHTTP(testID, time.Millisecond, []string{s.URL})
+	defer p.Remove(testID)
+
+	time.Sleep(100 * time.Millisecond)
+	status, err := p.Status(testID)
+	if err != nil {
+		t.Fatalf("err = %v, want %v", err, nil)
+	}
+	if total := status.Total(); total < 50 || total > 150 {
+		t.Fatalf("total = %v, want around %v", total, 100)
+	}
+	if health := status.Health(); health != true {
+		t.Fatalf("health = %v, want %v", health, true)
+	}
+
+	// become unhealthy
+	s.Close()
+
+	time.Sleep(100 * time.Millisecond)
+	if total := status.Total(); total < 150 || total > 250 {
+		t.Fatalf("total = %v, want around %v", total, 200)
+	}
+	if loss := status.Loss(); loss < 50 || loss > 150 {
+		t.Fatalf("loss = %v, want around %v", loss, 200)
+	}
+	if health := status.Health(); health != false {
+		t.Fatalf("health = %v, want %v", health, false)
+	}
+}
+
+func TestProbeReset(t *testing.T) {
+	s := httptest.NewServer(NewHandler())
+	defer s.Close()
+
+	p := NewProber()
+	p.AddHTTP(testID, time.Millisecond, []string{s.URL})
+	defer p.Remove(testID)
+
+	time.Sleep(100 * time.Millisecond)
+	status, err := p.Status(testID)
+	if err != nil {
+		t.Fatalf("err = %v, want %v", err, nil)
+	}
+	if total := status.Total(); total < 50 || total > 150 {
+		t.Fatalf("total = %v, want around %v", total, 100)
+	}
+	if health := status.Health(); health != true {
+		t.Fatalf("health = %v, want %v", health, true)
+	}
+
+	p.Reset(testID)
+
+	time.Sleep(100 * time.Millisecond)
+	if total := status.Total(); total < 50 || total > 150 {
+		t.Fatalf("total = %v, want around %v", total, 100)
+	}
+	if health := status.Health(); health != true {
+		t.Fatalf("health = %v, want %v", health, true)
+	}
+}
+
+func TestProbeRemove(t *testing.T) {
+	s := httptest.NewServer(NewHandler())
+	defer s.Close()
+
+	p := NewProber()
+	p.AddHTTP(testID, time.Millisecond, []string{s.URL})
+
+	p.Remove(testID)
+	_, err := p.Status(testID)
+	if err != ErrNotFound {
+		t.Fatalf("err = %v, want %v", err, ErrNotFound)
+	}
+}

+ 25 - 0
Godeps/_workspace/src/github.com/xiang90/probing/server.go

@@ -0,0 +1,25 @@
+package probing
+
+import (
+	"encoding/json"
+	"net/http"
+	"time"
+)
+
+func NewHandler() http.Handler {
+	return &httpHealth{}
+}
+
+type httpHealth struct {
+}
+
+type Health struct {
+	OK  bool
+	Now time.Time
+}
+
+func (h *httpHealth) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+	health := Health{OK: true, Now: time.Now()}
+	e := json.NewEncoder(w)
+	e.Encode(health)
+}

+ 96 - 0
Godeps/_workspace/src/github.com/xiang90/probing/status.go

@@ -0,0 +1,96 @@
+package probing
+
+import (
+	"sync"
+	"time"
+)
+
+var (
+	// weight factor
+	α = 0.125
+)
+
+type Status interface {
+	Total() int64
+	Loss() int64
+	Health() bool
+	// Estimated smoothed round trip time
+	SRTT() time.Duration
+	// Estimated clock difference
+	ClockDiff() time.Duration
+	StopNotify() <-chan struct{}
+}
+
+type status struct {
+	mu        sync.Mutex
+	srtt      time.Duration
+	total     int64
+	loss      int64
+	health    bool
+	clockdiff time.Duration
+	stopC     chan struct{}
+}
+
+// SRTT = (1-α) * SRTT + α * RTT
+func (s *status) SRTT() time.Duration {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	return s.srtt
+}
+
+func (s *status) Total() int64 {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	return s.total
+}
+
+func (s *status) Loss() int64 {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	return s.loss
+}
+
+func (s *status) Health() bool {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	return s.health
+}
+
+func (s *status) ClockDiff() time.Duration {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	return s.clockdiff
+}
+
+func (s *status) StopNotify() <-chan struct{} {
+	return s.stopC
+}
+
+func (s *status) record(rtt time.Duration, when time.Time) {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+
+	s.total += 1
+	s.health = true
+	s.srtt = time.Duration((1-α)*float64(s.srtt) + α*float64(rtt))
+	s.clockdiff = time.Now().Sub(when) - s.srtt/2
+}
+
+func (s *status) recordFailure() {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+
+	s.total++
+	s.health = false
+	s.loss += 1
+}
+
+func (s *status) reset() {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+
+	s.srtt = 0
+	s.total = 0
+	s.health = false
+	s.clockdiff = 0
+}