Browse Source

thread-saft queue

Xiang Li 12 years ago
parent
commit
6ef18b1ae3
3 changed files with 65 additions and 36 deletions
  1. 14 8
      raft_server.go
  2. 46 26
      raft_stats.go
  3. 5 2
      transporter.go

+ 14 - 8
raft_server.go

@@ -2,7 +2,6 @@ package main
 
 import (
 	"bytes"
-	"container/list"
 	"crypto/tls"
 	"encoding/binary"
 	"encoding/json"
@@ -49,8 +48,10 @@ func newRaftServer(name string, url string, tlsConf *TLSConfig, tlsInfo *TLSInfo
 		tlsInfo:    tlsInfo,
 		peersStats: make(map[string]*raftPeerStats),
 		serverStats: &raftServerStats{
-			StartTime:     time.Now(),
-			sendRateQueue: list.New(),
+			StartTime: time.Now(),
+			sendRateQueue: &statsQueue{
+				back: -1,
+			},
 		},
 	}
 }
@@ -281,14 +282,19 @@ func (r *raftServer) Stats() []byte {
 
 	queue := r.serverStats.sendRateQueue
 
-	frontValue, _ := queue.Front().Value.(time.Time)
-	backValue, _ := queue.Back().Value.(time.Time)
+	front, back := queue.FrontAndBack()
 
-	sampleDuration := backValue.Sub(frontValue)
+	sampleDuration := back.Time().Sub(front.Time())
 
-	r.serverStats.SendingRate = float64(queue.Len()) / float64(sampleDuration) * float64(time.Second)
+	r.serverStats.SendingPkgRate = float64(queue.Len()) / float64(sampleDuration) * float64(time.Second)
 
-	sBytes, _ := json.Marshal(r.serverStats)
+	r.serverStats.SendingBandwidthRate = float64(queue.Size()) / float64(sampleDuration) * float64(time.Second)
+
+	sBytes, err := json.Marshal(r.serverStats)
+
+	if err != nil {
+		warn(err)
+	}
 
 	pBytes, _ := json.Marshal(r.peersStats)
 

+ 46 - 26
raft_stats.go

@@ -2,7 +2,9 @@ package main
 
 import (
 	"container/list"
+	"fmt"
 	"math"
+	"sync"
 	"time"
 
 	"github.com/coreos/go-raft"
@@ -17,7 +19,18 @@ type runtimeStats struct {
 
 type packageStats struct {
 	sendingTime time.Time
-	size        uint64
+	size        int
+}
+
+func NewPackageStats(now time.Time, size int) *packageStats {
+	return &packageStats{
+		sendingTime: now,
+		size:        size,
+	}
+}
+
+func (ps *packageStats) Time() time.Time {
+	return ps.sendingTime
 }
 
 type raftServerStats struct {
@@ -29,9 +42,10 @@ type raftServerStats struct {
 	RecvAppendRequestCnt  uint64
 	SendAppendRequestCnt  uint64
 	SendAppendReqeustRate uint64
-	sendRateQueue         *list.List
+	sendRateQueue         *statsQueue
 	recvRateQueue         *list.List
-	SendingRate           float64
+	SendingPkgRate        float64
+	SendingBandwidthRate  float64
 }
 
 func (ss *raftServerStats) RecvAppendReq(leaderName string) {
@@ -44,7 +58,7 @@ func (ss *raftServerStats) RecvAppendReq(leaderName string) {
 	ss.RecvAppendRequestCnt++
 }
 
-func (ss *raftServerStats) SendAppendReq() {
+func (ss *raftServerStats) SendAppendReq(pkgSize int) {
 	now := time.Now()
 	if ss.State != raft.Leader {
 		ss.State = raft.Leader
@@ -52,12 +66,7 @@ func (ss *raftServerStats) SendAppendReq() {
 		ss.leaderStartTime = now
 	}
 
-	if ss.sendRateQueue.Len() < 200 {
-		ss.sendRateQueue.PushBack(now)
-	} else {
-		ss.sendRateQueue.PushBack(now)
-		ss.sendRateQueue.Remove(ss.sendRateQueue.Front())
-	}
+	ss.sendRateQueue.Insert(NewPackageStats(time.Now(), pkgSize))
 
 	ss.SendAppendRequestCnt++
 }
@@ -102,36 +111,47 @@ func (ps *raftPeerStats) Succ(d time.Duration) {
 }
 
 type statsQueue struct {
-	items [queueCapacity]*packageStats
-	size  int
-	front int
-	back  int
+	items        [queueCapacity]*packageStats
+	size         int
+	front        int
+	back         int
+	totalPkgSize int
+	rwl          sync.RWMutex
 }
 
 func (q *statsQueue) Len() int {
 	return q.size
 }
 
-func (q *statsQueue) Front() *packageStats {
-	if q.size != 0 {
-		return q.items[q.front]
-	}
-	return nil
+func (q *statsQueue) Size() int {
+	return q.totalPkgSize
 }
 
-func (q *statsQueue) Back() *packageStats {
+// FrontAndBack gets the front and back elements in the queue
+// We must grab front and back together with the protection of the lock
+func (q *statsQueue) FrontAndBack() (*packageStats, *packageStats) {
+	q.rwl.RLock()
+	defer q.rwl.RUnlock()
 	if q.size != 0 {
-		return q.items[q.back]
+		return q.items[q.front], q.items[q.back]
 	}
-	return nil
+	return nil, nil
 }
 
 func (q *statsQueue) Insert(p *packageStats) {
-	q.back = (q.back + 1) % queueCapacity
-	q.items[q.back] = p
-	if q.size == queueCapacity {
-		q.front = (q.back + 1) % queueCapacity
+	q.rwl.Lock()
+	defer q.rwl.Unlock()
+
+	if q.size == queueCapacity { //dequeue
+		q.totalPkgSize -= q.items[q.front].size
+		q.front = (q.back + 2) % queueCapacity
 	} else {
 		q.size++
 	}
+
+	q.back = (q.back + 1) % queueCapacity
+	q.items[q.back] = p
+	q.totalPkgSize += q.items[q.back].size
+
+	fmt.Println(q.front, q.back, q.size)
 }

+ 5 - 2
transporter.go

@@ -48,11 +48,14 @@ func (t transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Pe
 	var aersp *raft.AppendEntriesResponse
 	var b bytes.Buffer
 
-	r.serverStats.SendAppendReq()
-
 	json.NewEncoder(&b).Encode(req)
 
+	size := b.Len()
+
+	r.serverStats.SendAppendReq(size)
+
 	u, _ := nameToRaftURL(peer.Name)
+
 	debugf("Send LogEntries to %s ", u)
 
 	thisPeerStats, ok := r.peersStats[peer.Name]