Browse Source

Merge pull request #184 from xiangli-cmu/master

Fix restart from snapshot.
Xiang Li 12 years ago
parent
commit
8fc1abd9b1
4 changed files with 35 additions and 31 deletions
  1. 3 3
      command.go
  2. 13 13
      raft_server.go
  3. 9 9
      raft_stats.go
  4. 10 6
      transporter.go

+ 3 - 3
command.go

@@ -172,8 +172,8 @@ func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) {
 
 	// add peer stats
 	if c.Name != r.Name() {
-		r.peersStats.Peers[c.Name] = &raftPeerStats{}
-		r.peersStats.Peers[c.Name].Latency.Minimum = 1 << 63
+		r.followersStats.Followers[c.Name] = &raftFollowerStats{}
+		r.followersStats.Followers[c.Name].Latency.Minimum = 1 << 63
 	}
 
 	return b, err
@@ -202,7 +202,7 @@ func (c *RemoveCommand) Apply(raftServer *raft.Server) (interface{}, error) {
 	_, err := etcdStore.Delete(key, raftServer.CommitIndex())
 
 	// delete from stats
-	delete(r.peersStats.Peers, c.Name)
+	delete(r.followersStats.Followers, c.Name)
 
 	if err != nil {
 		return []byte{0}, err

+ 13 - 13
raft_server.go

@@ -17,15 +17,15 @@ import (
 
 type raftServer struct {
 	*raft.Server
-	version     string
-	joinIndex   uint64
-	name        string
-	url         string
-	listenHost  string
-	tlsConf     *TLSConfig
-	tlsInfo     *TLSInfo
-	peersStats  *raftPeersStats
-	serverStats *raftServerStats
+	version        string
+	joinIndex      uint64
+	name           string
+	url            string
+	listenHost     string
+	tlsConf        *TLSConfig
+	tlsInfo        *TLSInfo
+	followersStats *raftFollowersStats
+	serverStats    *raftServerStats
 }
 
 var r *raftServer
@@ -48,9 +48,9 @@ func newRaftServer(name string, url string, listenHost string, tlsConf *TLSConfi
 		listenHost: listenHost,
 		tlsConf:    tlsConf,
 		tlsInfo:    tlsInfo,
-		peersStats: &raftPeersStats{
-			Leader: name,
-			Peers:  make(map[string]*raftPeerStats),
+		followersStats: &raftFollowersStats{
+			Leader:    name,
+			Followers: make(map[string]*raftFollowerStats),
 		},
 		serverStats: &raftServerStats{
 			StartTime: time.Now(),
@@ -301,7 +301,7 @@ func (r *raftServer) Stats() []byte {
 
 func (r *raftServer) PeerStats() []byte {
 	if r.State() == raft.Leader {
-		b, _ := json.Marshal(r.peersStats)
+		b, _ := json.Marshal(r.followersStats)
 		return b
 	}
 	return nil

+ 9 - 9
raft_stats.go

@@ -79,12 +79,12 @@ func (ss *raftServerStats) SendAppendReq(pkgSize int) {
 	ss.SendAppendRequestCnt++
 }
 
-type raftPeersStats struct {
-	Leader string                    `json:"leader"`
-	Peers  map[string]*raftPeerStats `json:"peers"`
+type raftFollowersStats struct {
+	Leader    string                        `json:"leader"`
+	Followers map[string]*raftFollowerStats `json:"follwers"`
 }
 
-type raftPeerStats struct {
+type raftFollowerStats struct {
 	Latency struct {
 		Current           float64 `json:"current"`
 		Average           float64 `json:"average"`
@@ -100,8 +100,8 @@ type raftPeerStats struct {
 	} `json:"counts"`
 }
 
-// Succ function update the raftPeerStats with a successful send
-func (ps *raftPeerStats) Succ(d time.Duration) {
+// Succ function update the raftFollowerStats with a successful send
+func (ps *raftFollowerStats) Succ(d time.Duration) {
 	total := float64(ps.Counts.Success) * ps.Latency.Average
 	totalSquare := float64(ps.Counts.Success) * ps.Latency.averageSquare
 
@@ -118,14 +118,14 @@ func (ps *raftPeerStats) Succ(d time.Duration) {
 	}
 
 	ps.Latency.Average = (total + ps.Latency.Current) / float64(ps.Counts.Success)
-	ps.Latency.averageSquare = (totalSquare + ps.Latency.Current * ps.Latency.Current) / float64(ps.Counts.Success)
+	ps.Latency.averageSquare = (totalSquare + ps.Latency.Current*ps.Latency.Current) / float64(ps.Counts.Success)
 
 	// sdv = sqrt(avg(x^2) - avg(x)^2)
 	ps.Latency.StandardDeviation = math.Sqrt(ps.Latency.averageSquare - ps.Latency.Average*ps.Latency.Average)
 }
 
-// Fail function update the raftPeerStats with a unsuccessful send
-func (ps *raftPeerStats) Fail() {
+// Fail function update the raftFollowerStats with a unsuccessful send
+func (ps *raftFollowerStats) Fail() {
 	ps.Counts.Fail++
 }
 

+ 10 - 6
transporter.go

@@ -66,7 +66,13 @@ func (t *transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.P
 
 	debugf("Send LogEntries to %s ", u)
 
-	thisPeerStats, ok := r.peersStats.Peers[peer.Name]
+	thisFollowerStats, ok := r.followersStats.Followers[peer.Name]
+
+	if !ok { //this is the first time this follower has been seen
+		thisFollowerStats = &raftFollowerStats{}
+		thisFollowerStats.Latency.Minimum = 1 << 63
+		r.followersStats.Followers[peer.Name] = thisFollowerStats
+	}
 
 	start := time.Now()
 
@@ -77,16 +83,14 @@ func (t *transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.P
 	if err != nil {
 		debugf("Cannot send AppendEntriesRequest to %s: %s", u, err)
 		if ok {
-			thisPeerStats.Fail()
+			thisFollowerStats.Fail()
 		}
 	} else {
 		if ok {
-			thisPeerStats.Succ(end.Sub(start))
+			thisFollowerStats.Succ(end.Sub(start))
 		}
 	}
 
-	r.peersStats.Peers[peer.Name] = thisPeerStats
-
 	if resp != nil {
 		defer resp.Body.Close()
 		aersp = &raft.AppendEntriesResponse{}
@@ -211,7 +215,7 @@ func (t *transporter) Get(path string) (*http.Response, error) {
 
 func (t *transporter) waitResponse(responseChan chan *transporterResponse) (*http.Response, error) {
 
-	timeoutChan := time.After(t.timeout)
+	timeoutChan := time.After(t.timeout * 10)
 
 	select {
 	case <-timeoutChan: