Browse Source

server: add /v2/stats/leader

Yicheng Qin 11 years ago
parent
commit
5574b6e224
6 changed files with 122 additions and 24 deletions
  1. 2 3
      etcd/etcd.go
  2. 3 0
      etcd/participant.go
  3. 18 11
      etcd/peer.go
  4. 13 10
      etcd/peer_hub.go
  5. 77 0
      etcd/raft_follower_stats.go
  6. 9 0
      etcd/v2_http.go

+ 2 - 3
etcd/etcd.go

@@ -87,12 +87,12 @@ func New(c *cfg.Config) (*Server, error) {
 
 		mode: atomicInt(stopMode),
 
-		client:  newClient(tc),
-		peerHub: newPeerHub(client),
+		client: newClient(tc),
 
 		exited:      make(chan error, 1),
 		stopNotifyc: make(chan struct{}),
 	}
+	s.peerHub = newPeerHub(s.id, client)
 	m := http.NewServeMux()
 	m.HandleFunc("/", s.requestHandler)
 	m.HandleFunc("/version", versionHandler)
@@ -118,7 +118,6 @@ func (s *Server) Stop() error {
 	close(s.stopNotifyc)
 	err := <-s.exited
 	s.client.CloseConnections()
-	s.peerHub.stop()
 	log.Printf("id=%x server.stop\n", s.id)
 	return err
 }

+ 3 - 0
etcd/participant.go

@@ -48,6 +48,7 @@ const (
 	v2machinePrefix       = "/v2/machines"
 	v2peersPrefix         = "/v2/peers"
 	v2LeaderPrefix        = "/v2/leader"
+	v2LeaderStatsPrefix   = "/v2/stats/leader"
 	v2StoreStatsPrefix    = "/v2/stats/store"
 	v2adminConfigPrefix   = "/v2/admin/config"
 	v2adminMachinesPrefix = "/v2/admin/machines/"
@@ -139,6 +140,7 @@ func newParticipant(id int64, pubAddr string, raftPubAddr string, dir string, cl
 	p.Handle(v2machinePrefix, handlerErr(p.serveMachines))
 	p.Handle(v2peersPrefix, handlerErr(p.serveMachines))
 	p.Handle(v2LeaderPrefix, handlerErr(p.serveLeader))
+	p.Handle(v2LeaderStatsPrefix, handlerErr(p.serveLeaderStats))
 	p.Handle(v2StoreStatsPrefix, handlerErr(p.serveStoreStats))
 	p.rh.Handle(v2adminConfigPrefix, handlerErr(p.serveAdminConfig))
 	p.rh.Handle(v2adminMachinesPrefix, handlerErr(p.serveAdminMachines))
@@ -233,6 +235,7 @@ func (p *participant) run(stop chan struct{}) {
 func (p *participant) cleanup() {
 	p.w.Close()
 	close(p.stopNotifyc)
+	p.peerHub.stop()
 }
 
 func (p *participant) raftHandler() http.Handler {

+ 18 - 11
etcd/peer.go

@@ -23,6 +23,7 @@ import (
 	"net/http"
 	"sync"
 	"sync/atomic"
+	"time"
 )
 
 const (
@@ -36,20 +37,22 @@ const (
 )
 
 type peer struct {
-	url      string
-	queue    chan []byte
-	status   int
-	inflight atomicInt
-	c        *http.Client
-	mu       sync.RWMutex
-	wg       sync.WaitGroup
+	url           string
+	queue         chan []byte
+	status        int
+	inflight      atomicInt
+	c             *http.Client
+	followerStats *raftFollowerStats
+	mu            sync.RWMutex
+	wg            sync.WaitGroup
 }
 
-func newPeer(url string, c *http.Client) *peer {
+func newPeer(url string, c *http.Client, followerStats *raftFollowerStats) *peer {
 	return &peer{
-		url:    url,
-		status: idlePeer,
-		c:      c,
+		url:           url,
+		status:        idlePeer,
+		c:             c,
+		followerStats: followerStats,
 	}
 }
 
@@ -120,7 +123,11 @@ func (p *peer) post(d []byte) {
 	p.inflight.Add(1)
 	defer p.inflight.Add(-1)
 	buf := bytes.NewBuffer(d)
+	start := time.Now()
 	resp, err := p.c.Post(p.url, "application/octet-stream", buf)
+	end := time.Now()
+	// TODO: Have no way to detect RPC success or failure now
+	p.followerStats.Succ(end.Sub(start))
 	if err != nil {
 		log.Printf("peer.post url=%s err=\"%v\"", p.url, err)
 		return

+ 13 - 10
etcd/peer_hub.go

@@ -39,18 +39,20 @@ type peerGetter interface {
 }
 
 type peerHub struct {
-	mu      sync.RWMutex
-	stopped bool
-	seeds   map[string]bool
-	peers   map[int64]*peer
-	c       *http.Client
+	mu             sync.RWMutex
+	stopped        bool
+	seeds          map[string]bool
+	peers          map[int64]*peer
+	c              *http.Client
+	followersStats *raftFollowersStats
 }
 
-func newPeerHub(c *http.Client) *peerHub {
+func newPeerHub(id int64, c *http.Client) *peerHub {
 	h := &peerHub{
-		peers: make(map[int64]*peer),
-		seeds: make(map[string]bool),
-		c:     c,
+		peers:          make(map[int64]*peer),
+		seeds:          make(map[string]bool),
+		c:              c,
+		followersStats: NewRaftFollowersStats(fmt.Sprint(id)),
 	}
 	return h
 }
@@ -78,6 +80,7 @@ func (h *peerHub) stop() {
 	for _, p := range h.peers {
 		p.stop()
 	}
+	h.followersStats.Reset()
 	// http.Transport needs some time to put used connections
 	// into idle queues.
 	time.Sleep(time.Millisecond)
@@ -109,7 +112,7 @@ func (h *peerHub) add(id int64, rawurl string) (*peer, error) {
 	if h.stopped {
 		return nil, fmt.Errorf("peerHub stopped")
 	}
-	h.peers[id] = newPeer(u.String(), h.c)
+	h.peers[id] = newPeer(u.String(), h.c, h.followersStats.Follower(fmt.Sprint(id)))
 	return h.peers[id], nil
 }
 

+ 77 - 0
etcd/raft_follower_stats.go

@@ -0,0 +1,77 @@
+package etcd
+
+import (
+	"math"
+	"time"
+)
+
+type raftFollowersStats struct {
+	Leader    string                        `json:"leader"`
+	Followers map[string]*raftFollowerStats `json:"followers"`
+}
+
+func NewRaftFollowersStats(name string) *raftFollowersStats {
+	return &raftFollowersStats{
+		Leader:    name,
+		Followers: make(map[string]*raftFollowerStats),
+	}
+}
+
+func (fs *raftFollowersStats) Follower(name string) *raftFollowerStats {
+	follower, ok := fs.Followers[name]
+	if !ok {
+		follower = &raftFollowerStats{}
+		follower.Latency.Minimum = 1 << 63
+		fs.Followers[name] = follower
+	}
+	return follower
+}
+
+func (fs *raftFollowersStats) Reset() {
+	fs.Followers = make(map[string]*raftFollowerStats)
+}
+
+type raftFollowerStats struct {
+	Latency struct {
+		Current           float64 `json:"current"`
+		Average           float64 `json:"average"`
+		averageSquare     float64
+		StandardDeviation float64 `json:"standardDeviation"`
+		Minimum           float64 `json:"minimum"`
+		Maximum           float64 `json:"maximum"`
+	} `json:"latency"`
+
+	Counts struct {
+		Fail    uint64 `json:"fail"`
+		Success uint64 `json:"success"`
+	} `json:"counts"`
+}
+
+// Succ function update the raftFollowerStats with a successful send
+func (ps *raftFollowerStats) Succ(d time.Duration) {
+	total := float64(ps.Counts.Success) * ps.Latency.Average
+	totalSquare := float64(ps.Counts.Success) * ps.Latency.averageSquare
+
+	ps.Counts.Success++
+
+	ps.Latency.Current = float64(d) / (1000000.0)
+
+	if ps.Latency.Current > ps.Latency.Maximum {
+		ps.Latency.Maximum = ps.Latency.Current
+	}
+
+	if ps.Latency.Current < ps.Latency.Minimum {
+		ps.Latency.Minimum = ps.Latency.Current
+	}
+
+	ps.Latency.Average = (total + ps.Latency.Current) / float64(ps.Counts.Success)
+	ps.Latency.averageSquare = (totalSquare + ps.Latency.Current*ps.Latency.Current) / float64(ps.Counts.Success)
+
+	// sdv = sqrt(avg(x^2) - avg(x)^2)
+	ps.Latency.StandardDeviation = math.Sqrt(ps.Latency.averageSquare - ps.Latency.Average*ps.Latency.Average)
+}
+
+// Fail function update the raftFollowerStats with a unsuccessful send
+func (ps *raftFollowerStats) Fail() {
+	ps.Counts.Fail++
+}

+ 9 - 0
etcd/v2_http.go

@@ -17,6 +17,7 @@ limitations under the License.
 package etcd
 
 import (
+	"encoding/json"
 	"fmt"
 	"log"
 	"net/http"
@@ -74,6 +75,14 @@ func (p *participant) serveLeader(w http.ResponseWriter, r *http.Request) error
 	return fmt.Errorf("no leader")
 }
 
+func (p *participant) serveLeaderStats(w http.ResponseWriter, req *http.Request) error {
+	if !p.node.IsLeader() {
+		return p.redirect(w, req, p.node.Leader())
+	}
+	w.Header().Set("Content-Type", "application/json")
+	return json.NewEncoder(w).Encode(p.peerHub.followersStats)
+}
+
 func (p *participant) serveStoreStats(w http.ResponseWriter, req *http.Request) error {
 	w.Header().Set("Content-Type", "application/json")
 	w.Write(p.Store.JsonStats())