Browse Source

add recvQueue

Xiang Li 12 years ago
parent
commit
23995ffc59
3 changed files with 29 additions and 10 deletions
  1. 1 1
      raft_handlers.go
  2. 6 5
      raft_server.go
  3. 22 4
      raft_stats.go

+ 1 - 1
raft_handlers.go

@@ -43,7 +43,7 @@ func AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) {
 	if err == nil {
 	if err == nil {
 		debugf("[recv] POST %s/log/append [%d]", r.url, len(aereq.Entries))
 		debugf("[recv] POST %s/log/append [%d]", r.url, len(aereq.Entries))
 
 
-		r.serverStats.RecvAppendReq(aereq.LeaderName)
+		r.serverStats.RecvAppendReq(aereq.LeaderName, int(req.ContentLength))
 
 
 		if resp := r.AppendEntries(aereq); resp != nil {
 		if resp := r.AppendEntries(aereq); resp != nil {
 			w.WriteHeader(http.StatusOK)
 			w.WriteHeader(http.StatusOK)

+ 6 - 5
raft_server.go

@@ -52,6 +52,9 @@ func newRaftServer(name string, url string, tlsConf *TLSConfig, tlsInfo *TLSInfo
 			sendRateQueue: &statsQueue{
 			sendRateQueue: &statsQueue{
 				back: -1,
 				back: -1,
 			},
 			},
+			recvRateQueue: &statsQueue{
+				back: -1,
+			},
 		},
 		},
 	}
 	}
 }
 }
@@ -282,13 +285,11 @@ func (r *raftServer) Stats() []byte {
 
 
 	queue := r.serverStats.sendRateQueue
 	queue := r.serverStats.sendRateQueue
 
 
-	front, back := queue.FrontAndBack()
-
-	sampleDuration := back.Time().Sub(front.Time())
+	r.serverStats.SendingPkgRate, r.serverStats.SendingBandwidthRate = queue.Rate()
 
 
-	r.serverStats.SendingPkgRate = float64(queue.Len()) / float64(sampleDuration) * float64(time.Second)
+	queue = r.serverStats.recvRateQueue
 
 
-	r.serverStats.SendingBandwidthRate = float64(queue.Size()) / float64(sampleDuration) * float64(time.Second)
+	r.serverStats.RecvingPkgRate, r.serverStats.RecvingBandwidthRate = queue.Rate()
 
 
 	sBytes, err := json.Marshal(r.serverStats)
 	sBytes, err := json.Marshal(r.serverStats)
 
 

+ 22 - 4
raft_stats.go

@@ -1,7 +1,6 @@
 package main
 package main
 
 
 import (
 import (
-	"container/list"
 	"fmt"
 	"fmt"
 	"math"
 	"math"
 	"sync"
 	"sync"
@@ -43,18 +42,21 @@ type raftServerStats struct {
 	SendAppendRequestCnt  uint64
 	SendAppendRequestCnt  uint64
 	SendAppendReqeustRate uint64
 	SendAppendReqeustRate uint64
 	sendRateQueue         *statsQueue
 	sendRateQueue         *statsQueue
-	recvRateQueue         *list.List
+	recvRateQueue         *statsQueue
 	SendingPkgRate        float64
 	SendingPkgRate        float64
 	SendingBandwidthRate  float64
 	SendingBandwidthRate  float64
+	RecvingPkgRate        float64
+	RecvingBandwidthRate  float64
 }
 }
 
 
-func (ss *raftServerStats) RecvAppendReq(leaderName string) {
+func (ss *raftServerStats) RecvAppendReq(leaderName string, pkgSize int) {
 	ss.State = raft.Follower
 	ss.State = raft.Follower
 	if leaderName != ss.Leader {
 	if leaderName != ss.Leader {
 		ss.Leader = leaderName
 		ss.Leader = leaderName
 		ss.leaderStartTime = time.Now()
 		ss.leaderStartTime = time.Now()
 	}
 	}
 
 
+	ss.recvRateQueue.Insert(NewPackageStats(time.Now(), pkgSize))
 	ss.RecvAppendRequestCnt++
 	ss.RecvAppendRequestCnt++
 }
 }
 
 
@@ -129,7 +131,7 @@ func (q *statsQueue) Size() int {
 
 
 // FrontAndBack gets the front and back elements in the queue
 // FrontAndBack gets the front and back elements in the queue
 // We must grab front and back together with the protection of the lock
 // We must grab front and back together with the protection of the lock
-func (q *statsQueue) FrontAndBack() (*packageStats, *packageStats) {
+func (q *statsQueue) frontAndBack() (*packageStats, *packageStats) {
 	q.rwl.RLock()
 	q.rwl.RLock()
 	defer q.rwl.RUnlock()
 	defer q.rwl.RUnlock()
 	if q.size != 0 {
 	if q.size != 0 {
@@ -155,3 +157,19 @@ func (q *statsQueue) Insert(p *packageStats) {
 
 
 	fmt.Println(q.front, q.back, q.size)
 	fmt.Println(q.front, q.back, q.size)
 }
 }
+
+func (q *statsQueue) Rate() (float64, float64) {
+	front, back := q.frontAndBack()
+
+	if front == nil || back == nil {
+		return 0, 0
+	}
+
+	sampleDuration := back.Time().Sub(front.Time())
+
+	pr := float64(q.Len()) / float64(sampleDuration) * float64(time.Second)
+
+	br := float64(q.Size()) / float64(sampleDuration) * float64(time.Second)
+
+	return pr, br
+}