Xiang Li 12 years ago
parent
commit
8d245b546f
6 changed files with 82 additions and 49 deletions
  1. 5 2
      command.go
  2. 24 19
      etcd_handlers.go
  3. 12 13
      raft_server.go
  4. 22 13
      raft_stats.go
  5. 2 2
      transporter.go
  6. 17 0
      util.go

+ 5 - 2
command.go

@@ -170,8 +170,9 @@ func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) {
 	value := fmt.Sprintf("raft=%s&etcd=%s&raftVersion=%s", c.RaftURL, c.EtcdURL, c.RaftVersion)
 	etcdStore.Set(key, value, time.Unix(0, 0), raftServer.CommitIndex())
 
+	// add peer stats
 	if c.Name != r.Name() {
-		r.peersStats[c.Name] = &raftPeerStats{MinLatency: 1 << 63}
+		r.peersStats.Peers[c.Name] = &raftPeerStats{MinLatency: 1 << 63}
 	}
 
 	return b, err
@@ -198,7 +199,9 @@ func (c *RemoveCommand) Apply(raftServer *raft.Server) (interface{}, error) {
 	key := path.Join("_etcd/machines", c.Name)
 
 	_, err := etcdStore.Delete(key, raftServer.CommitIndex())
-	delete(r.peersStats, c.Name)
+
+	// delete from stats
+	delete(r.peersStats.Peers, c.Name)
 
 	if err != nil {
 		return []byte{0}, err

+ 24 - 19
etcd_handlers.go

@@ -22,7 +22,7 @@ func NewEtcdMuxer() *http.ServeMux {
 	etcdMux.Handle("/"+version+"/watch/", errorHandler(WatchHttpHandler))
 	etcdMux.Handle("/"+version+"/leader", errorHandler(LeaderHttpHandler))
 	etcdMux.Handle("/"+version+"/machines", errorHandler(MachinesHttpHandler))
-	etcdMux.Handle("/"+version+"/stats", errorHandler(StatsHttpHandler))
+	etcdMux.Handle("/"+version+"/stats/", errorHandler(StatsHttpHandler))
 	etcdMux.Handle("/version", errorHandler(VersionHttpHandler))
 	etcdMux.HandleFunc("/test/", TestHttpHandler)
 	return etcdMux
@@ -167,22 +167,8 @@ func dispatch(c Command, w http.ResponseWriter, req *http.Request, etcd bool) er
 			return etcdErr.NewError(300, "")
 		}
 
-		// tell the client where is the leader
-		path := req.URL.Path
+		redirect(leader, etcd, w, req)
 
-		var url string
-
-		if etcd {
-			etcdAddr, _ := nameToEtcdURL(leader)
-			url = etcdAddr + path
-		} else {
-			raftAddr, _ := nameToRaftURL(leader)
-			url = raftAddr + path
-		}
-
-		debugf("Redirect to %s", url)
-
-		http.Redirect(w, req, url, http.StatusTemporaryRedirect)
 		return nil
 	}
 	return etcdErr.NewError(300, "")
@@ -227,9 +213,28 @@ func VersionHttpHandler(w http.ResponseWriter, req *http.Request) error {
 
 // Handler to return the basic stats of etcd
 func StatsHttpHandler(w http.ResponseWriter, req *http.Request) error {
-	w.WriteHeader(http.StatusOK)
-	w.Write(etcdStore.Stats())
-	w.Write(r.Stats())
+	option := req.URL.Path[len("/v1/stats/"):]
+
+	switch option {
+	case "self":
+		w.WriteHeader(http.StatusOK)
+		w.Write(r.Stats())
+	case "leader":
+		if r.State() == raft.Leader {
+			w.Write(r.PeerStats())
+		} else {
+			leader := r.Leader()
+			// current no leader
+			if leader == "" {
+				return etcdErr.NewError(300, "")
+			}
+			redirect(leader, true, w, req)
+		}
+	case "store":
+		w.WriteHeader(http.StatusOK)
+		w.Write(etcdStore.Stats())
+	}
+
 	return nil
 }
 

+ 12 - 13
raft_server.go

@@ -24,7 +24,7 @@ type raftServer struct {
 	listenHost  string
 	tlsConf     *TLSConfig
 	tlsInfo     *TLSInfo
-	peersStats  map[string]*raftPeerStats
+	peersStats  *raftPeersStats
 	serverStats *raftServerStats
 }
 
@@ -48,7 +48,10 @@ func newRaftServer(name string, url string, listenHost string, tlsConf *TLSConfi
 		listenHost: listenHost,
 		tlsConf:    tlsConf,
 		tlsInfo:    tlsInfo,
-		peersStats: make(map[string]*raftPeerStats),
+		peersStats: &raftPeersStats{
+			Leader: name,
+			Peers:  make(map[string]*raftPeerStats),
+		},
 		serverStats: &raftServerStats{
 			StartTime: time.Now(),
 			sendRateQueue: &statsQueue{
@@ -63,7 +66,6 @@ func newRaftServer(name string, url string, listenHost string, tlsConf *TLSConfi
 
 // Start the raft server
 func (r *raftServer) ListenAndServe() {
-
 	// Setup commands.
 	registerCommands()
 
@@ -282,7 +284,7 @@ func joinByMachine(s *raft.Server, machine string, scheme string) error {
 }
 
 func (r *raftServer) Stats() []byte {
-	r.serverStats.LeaderUptime = time.Now().Sub(r.serverStats.leaderStartTime).String()
+	r.serverStats.LeaderInfo.Uptime = time.Now().Sub(r.serverStats.LeaderInfo.startTime).String()
 
 	queue := r.serverStats.sendRateQueue
 
@@ -292,20 +294,17 @@ func (r *raftServer) Stats() []byte {
 
 	r.serverStats.RecvingPkgRate, r.serverStats.RecvingBandwidthRate = queue.Rate()
 
-	sBytes, err := json.Marshal(r.serverStats)
+	b, _ := json.Marshal(r.serverStats)
 
-	if err != nil {
-		warn(err)
-	}
+	return b
+}
 
+func (r *raftServer) PeerStats() []byte {
 	if r.State() == raft.Leader {
-		pBytes, _ := json.Marshal(r.peersStats)
-
-		b := append(sBytes, pBytes...)
+		b, _ := json.Marshal(r.peersStats)
 		return b
 	}
-
-	return sBytes
+	return nil
 }
 
 // Register commands to raft server

+ 22 - 13
raft_stats.go

@@ -33,10 +33,14 @@ func (ps *packageStats) Time() time.Time {
 }
 
 type raftServerStats struct {
-	State        string    `json:"state"`
-	StartTime    time.Time `json:"startTime"`
-	Leader       string    `json:"leader"`
-	LeaderUptime string    `json:"leaderUptime"`
+	State     string    `json:"state"`
+	StartTime time.Time `json:"startTime"`
+
+	LeaderInfo struct {
+		Name      string `json:"leader"`
+		Uptime    string `json:"uptime"`
+		startTime time.Time
+	} `json:"leaderInfo"`
 
 	RecvAppendRequestCnt uint64  `json:"recvAppendRequestCnt,"`
 	RecvingPkgRate       float64 `json:"recvPkgRate,omitempty"`
@@ -46,16 +50,15 @@ type raftServerStats struct {
 	SendingPkgRate       float64 `json:"sendPkgRate,omitempty"`
 	SendingBandwidthRate float64 `json:"sendBandwidthRate,omitempty"`
 
-	leaderStartTime time.Time
-	sendRateQueue   *statsQueue
-	recvRateQueue   *statsQueue
+	sendRateQueue *statsQueue
+	recvRateQueue *statsQueue
 }
 
 func (ss *raftServerStats) RecvAppendReq(leaderName string, pkgSize int) {
 	ss.State = raft.Follower
-	if leaderName != ss.Leader {
-		ss.Leader = leaderName
-		ss.leaderStartTime = time.Now()
+	if leaderName != ss.LeaderInfo.Name {
+		ss.LeaderInfo.Name = leaderName
+		ss.LeaderInfo.startTime = time.Now()
 	}
 
 	ss.recvRateQueue.Insert(NewPackageStats(time.Now(), pkgSize))
@@ -64,17 +67,23 @@ func (ss *raftServerStats) RecvAppendReq(leaderName string, pkgSize int) {
 
 func (ss *raftServerStats) SendAppendReq(pkgSize int) {
 	now := time.Now()
+
 	if ss.State != raft.Leader {
 		ss.State = raft.Leader
-		ss.Leader = r.Name()
-		ss.leaderStartTime = now
+		ss.LeaderInfo.Name = r.Name()
+		ss.LeaderInfo.startTime = now
 	}
 
-	ss.sendRateQueue.Insert(NewPackageStats(time.Now(), pkgSize))
+	ss.sendRateQueue.Insert(NewPackageStats(now, pkgSize))
 
 	ss.SendAppendRequestCnt++
 }
 
+type raftPeersStats struct {
+	Leader string                    `json:"leader"`
+	Peers  map[string]*raftPeerStats `json:"peers"`
+}
+
 type raftPeerStats struct {
 	Latency          float64 `json:"latency"`
 	AvgLatency       float64 `json:"averageLatency"`

+ 2 - 2
transporter.go

@@ -66,7 +66,7 @@ func (t *transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.P
 
 	debugf("Send LogEntries to %s ", u)
 
-	thisPeerStats, ok := r.peersStats[peer.Name]
+	thisPeerStats, ok := r.peersStats.Peers[peer.Name]
 
 	start := time.Now()
 
@@ -85,7 +85,7 @@ func (t *transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.P
 		}
 	}
 
-	r.peersStats[peer.Name] = thisPeerStats
+	r.peersStats.Peers[peer.Name] = thisPeerStats
 
 	if resp != nil {
 		defer resp.Body.Close()

+ 17 - 0
util.go

@@ -128,6 +128,23 @@ func sanitizeListenHost(listen string, advertised string) string {
 	return net.JoinHostPort(listen, aport)
 }
 
+func redirect(node string, etcd bool, w http.ResponseWriter, req *http.Request) {
+	var url string
+	path := req.URL.Path
+
+	if etcd {
+		etcdAddr, _ := nameToEtcdURL(node)
+		url = etcdAddr + path
+	} else {
+		raftAddr, _ := nameToRaftURL(node)
+		url = raftAddr + path
+	}
+
+	debugf("Redirect to %s", url)
+
+	http.Redirect(w, req, url, http.StatusTemporaryRedirect)
+}
+
 func check(err error) {
 	if err != nil {
 		fatal(err)