Ver código fonte

server: add /v2/stats/self

Yicheng Qin 11 anos atrás
pai
commit
9203f68894

+ 25 - 0
etcd/package_stats.go

@@ -0,0 +1,25 @@
+package etcd
+
+import (
+	"time"
+)
+
+// packageStats represent the stats we need for a package.
+// It has sending time and the size of the package.
+type packageStats struct {
+	sendingTime time.Time
+	size        int
+}
+
+// NewPackageStats creates a pacakgeStats and return the pointer to it.
+func NewPackageStats(now time.Time, size int) *packageStats {
+	return &packageStats{
+		sendingTime: now,
+		size:        size,
+	}
+}
+
+// Time return the sending time of the package.
+func (ps *packageStats) Time() time.Time {
+	return ps.sendingTime
+}

+ 9 - 4
etcd/participant.go

@@ -48,6 +48,7 @@ const (
 	v2machinePrefix       = "/v2/machines"
 	v2peersPrefix         = "/v2/peers"
 	v2LeaderPrefix        = "/v2/leader"
+	v2SelfStatsPrefix     = "/v2/stats/self"
 	v2LeaderStatsPrefix   = "/v2/stats/leader"
 	v2StoreStatsPrefix    = "/v2/stats/store"
 	v2adminConfigPrefix   = "/v2/admin/config"
@@ -75,8 +76,9 @@ type participant struct {
 	removeNodeC chan raft.Config
 	node        *v2Raft
 	store.Store
-	rh *raftHandler
-	w  *wal.WAL
+	rh          *raftHandler
+	w           *wal.WAL
+	serverStats *raftServerStats
 
 	stopNotifyc chan struct{}
 
@@ -97,13 +99,15 @@ func newParticipant(id int64, pubAddr string, raftPubAddr string, dir string, cl
 		node: &v2Raft{
 			result: make(map[wait]chan interface{}),
 		},
-		Store: store.New(),
+		Store:       store.New(),
+		serverStats: NewRaftServerStats(fmt.Sprint(id)),
 
 		stopNotifyc: make(chan struct{}),
 
 		ServeMux: http.NewServeMux(),
 	}
-	p.rh = newRaftHandler(peerHub, p.Store.Version())
+	p.rh = newRaftHandler(peerHub, p.Store.Version(), p.serverStats)
+	p.peerHub.setServerStats(p.serverStats)
 
 	walPath := path.Join(dir, "wal")
 	w, err := wal.Open(walPath)
@@ -140,6 +144,7 @@ func newParticipant(id int64, pubAddr string, raftPubAddr string, dir string, cl
 	p.Handle(v2machinePrefix, handlerErr(p.serveMachines))
 	p.Handle(v2peersPrefix, handlerErr(p.serveMachines))
 	p.Handle(v2LeaderPrefix, handlerErr(p.serveLeader))
+	p.Handle(v2SelfStatsPrefix, handlerErr(p.serveSelfStats))
 	p.Handle(v2LeaderStatsPrefix, handlerErr(p.serveLeaderStats))
 	p.Handle(v2StoreStatsPrefix, handlerErr(p.serveStoreStats))
 	p.rh.Handle(v2adminConfigPrefix, handlerErr(p.serveAdminConfig))

+ 8 - 0
etcd/peer_hub.go

@@ -45,6 +45,7 @@ type peerHub struct {
 	peers          map[int64]*peer
 	c              *http.Client
 	followersStats *raftFollowersStats
+	serverStats    *raftServerStats
 }
 
 func newPeerHub(id int64, c *http.Client) *peerHub {
@@ -57,6 +58,10 @@ func newPeerHub(id int64, c *http.Client) *peerHub {
 	return h
 }
 
+func (h *peerHub) setServerStats(serverStats *raftServerStats) {
+	h.serverStats = serverStats
+}
+
 func (h *peerHub) setSeeds(seeds []string) {
 	for _, seed := range seeds {
 		h.seeds[seed] = true
@@ -122,6 +127,9 @@ func (h *peerHub) send(msg raft.Message) error {
 		if err != nil {
 			return err
 		}
+		if msg.IsMsgApp() {
+			h.serverStats.SendAppendReq(len(data))
+		}
 		return p.send(data)
 	}
 	return errUnknownPeer

+ 6 - 1
etcd/raft_handler.go

@@ -38,16 +38,18 @@ type raftHandler struct {
 
 	peerGetter   peerGetter
 	storeVersion int
+	serverStats  *raftServerStats
 
 	recv chan *raft.Message
 	*http.ServeMux
 }
 
-func newRaftHandler(p peerGetter, version int) *raftHandler {
+func newRaftHandler(p peerGetter, version int, serverStats *raftServerStats) *raftHandler {
 	h := &raftHandler{
 		recv:         make(chan *raft.Message, 512),
 		peerGetter:   p,
 		storeVersion: version,
+		serverStats:  serverStats,
 	}
 	h.ServeMux = http.NewServeMux()
 	h.ServeMux.HandleFunc(raftPrefix+"/cfg/", h.serveCfg)
@@ -83,6 +85,9 @@ func (h *raftHandler) serveRaft(w http.ResponseWriter, r *http.Request) {
 		log.Printf("raftHandler.serve decodeErr=\"%v\"\n", err)
 		return
 	}
+	if msg.IsMsgApp() {
+		h.serverStats.RecvAppendReq(fmt.Sprint(msg.From), int(r.ContentLength))
+	}
 
 	select {
 	case h.recv <- msg:

+ 83 - 0
etcd/raft_server_stats.go

@@ -0,0 +1,83 @@
+package etcd
+
+import (
+	"sync"
+	"time"
+)
+
+type raftServerStats struct {
+	Name      string    `json:"name"`
+	State     string    `json:"state"`
+	StartTime time.Time `json:"startTime"`
+
+	LeaderInfo struct {
+		Name      string    `json:"leader"`
+		Uptime    string    `json:"uptime"`
+		StartTime time.Time `json:"startTime"`
+	} `json:"leaderInfo"`
+
+	RecvAppendRequestCnt uint64  `json:"recvAppendRequestCnt,"`
+	RecvingPkgRate       float64 `json:"recvPkgRate,omitempty"`
+	RecvingBandwidthRate float64 `json:"recvBandwidthRate,omitempty"`
+
+	SendAppendRequestCnt uint64  `json:"sendAppendRequestCnt"`
+	SendingPkgRate       float64 `json:"sendPkgRate,omitempty"`
+	SendingBandwidthRate float64 `json:"sendBandwidthRate,omitempty"`
+
+	sendRateQueue *statsQueue
+	recvRateQueue *statsQueue
+
+	sync.Mutex
+}
+
+func NewRaftServerStats(name string) *raftServerStats {
+	stats := &raftServerStats{
+		Name:      name,
+		StartTime: time.Now(),
+		sendRateQueue: &statsQueue{
+			back: -1,
+		},
+		recvRateQueue: &statsQueue{
+			back: -1,
+		},
+	}
+	stats.LeaderInfo.StartTime = time.Now()
+	return stats
+}
+
+func (ss *raftServerStats) Reset() {
+	name := ss.Name
+	ss = NewRaftServerStats(name)
+	return
+}
+
+func (ss *raftServerStats) RecvAppendReq(leaderName string, pkgSize int) {
+	ss.Lock()
+	defer ss.Unlock()
+
+	ss.State = stateFollower
+	if leaderName != ss.LeaderInfo.Name {
+		ss.LeaderInfo.Name = leaderName
+		ss.LeaderInfo.StartTime = time.Now()
+	}
+
+	ss.recvRateQueue.Insert(NewPackageStats(time.Now(), pkgSize))
+	ss.RecvAppendRequestCnt++
+}
+
+func (ss *raftServerStats) SendAppendReq(pkgSize int) {
+	ss.Lock()
+	defer ss.Unlock()
+
+	now := time.Now()
+
+	if ss.State != stateLeader {
+		ss.State = stateLeader
+		ss.LeaderInfo.Name = ss.Name
+		ss.LeaderInfo.StartTime = now
+	}
+
+	ss.sendRateQueue.Insert(NewPackageStats(now, pkgSize))
+
+	ss.SendAppendRequestCnt++
+}

+ 89 - 0
etcd/stats_queue.go

@@ -0,0 +1,89 @@
+package etcd
+
+import (
+	"sync"
+	"time"
+)
+
+const (
+	queueCapacity = 200
+)
+
+type statsQueue struct {
+	items        [queueCapacity]*packageStats
+	size         int
+	front        int
+	back         int
+	totalPkgSize int
+	rwl          sync.RWMutex
+}
+
+func (q *statsQueue) Len() int {
+	return q.size
+}
+
+func (q *statsQueue) PkgSize() int {
+	return q.totalPkgSize
+}
+
+// FrontAndBack gets the front and back elements in the queue
+// We must grab front and back together with the protection of the lock
+func (q *statsQueue) frontAndBack() (*packageStats, *packageStats) {
+	q.rwl.RLock()
+	defer q.rwl.RUnlock()
+	if q.size != 0 {
+		return q.items[q.front], q.items[q.back]
+	}
+	return nil, nil
+}
+
+// Insert function insert a packageStats into the queue and update the records
+func (q *statsQueue) Insert(p *packageStats) {
+	q.rwl.Lock()
+	defer q.rwl.Unlock()
+
+	q.back = (q.back + 1) % queueCapacity
+
+	if q.size == queueCapacity { //dequeue
+		q.totalPkgSize -= q.items[q.front].size
+		q.front = (q.back + 1) % queueCapacity
+	} else {
+		q.size++
+	}
+
+	q.items[q.back] = p
+	q.totalPkgSize += q.items[q.back].size
+
+}
+
+// Rate function returns the package rate and byte rate
+func (q *statsQueue) Rate() (float64, float64) {
+	front, back := q.frontAndBack()
+
+	if front == nil || back == nil {
+		return 0, 0
+	}
+
+	if time.Now().Sub(back.Time()) > time.Second {
+		q.Clear()
+		return 0, 0
+	}
+
+	sampleDuration := back.Time().Sub(front.Time())
+
+	pr := float64(q.Len()) / float64(sampleDuration) * float64(time.Second)
+
+	br := float64(q.PkgSize()) / float64(sampleDuration) * float64(time.Second)
+
+	return pr, br
+}
+
+// Clear function clear up the statsQueue
+func (q *statsQueue) Clear() {
+	q.rwl.Lock()
+	defer q.rwl.Unlock()
+	q.back = -1
+	q.front = 0
+	q.size = 0
+	q.totalPkgSize = 0
+}

+ 17 - 0
etcd/v2_http.go

@@ -23,6 +23,7 @@ import (
 	"net/http"
 	"net/url"
 	"strings"
+	"time"
 
 	etcdErr "github.com/coreos/etcd/error"
 )
@@ -75,6 +76,22 @@ func (p *participant) serveLeader(w http.ResponseWriter, r *http.Request) error
 	return fmt.Errorf("no leader")
 }
 
+func (p *participant) serveSelfStats(w http.ResponseWriter, req *http.Request) error {
+	p.serverStats.LeaderInfo.Uptime = time.Now().Sub(p.serverStats.LeaderInfo.StartTime).String()
+
+	if p.node.IsLeader() {
+		p.serverStats.LeaderInfo.Name = fmt.Sprint(p.id)
+	}
+
+	queue := p.serverStats.sendRateQueue
+	p.serverStats.SendingPkgRate, p.serverStats.SendingBandwidthRate = queue.Rate()
+
+	queue = p.serverStats.recvRateQueue
+	p.serverStats.RecvingPkgRate, p.serverStats.RecvingBandwidthRate = queue.Rate()
+
+	return json.NewEncoder(w).Encode(p.serverStats)
+}
+
 func (p *participant) serveLeaderStats(w http.ResponseWriter, req *http.Request) error {
 	if !p.node.IsLeader() {
 		return p.redirect(w, req, p.node.Leader())

+ 4 - 0
raft/raft.go

@@ -80,6 +80,10 @@ type Message struct {
 	Snapshot  Snapshot
 }
 
+func (m Message) IsMsgApp() bool {
+	return m.Type == msgApp
+}
+
 func (m Message) String() string {
 	return fmt.Sprintf("type=%v from=%x to=%x term=%d logTerm=%d i=%d ci=%d len(ents)=%d",
 		m.Type, m.From, m.To, m.Term, m.LogTerm, m.Index, m.Commit, len(m.Entries))