Browse Source

etcdserver: add ServerStats and LeaderStats

This adds the remaining two stats endpoints: `/v2/stats/self`, for
various statistics on the EtcdServer, and `/v2/stats/leader`, for
statistics on a leader's followers.

By and large most of the stats code is copied across from 0.4.x, updated
where necessary to integrate with the new decoupling of raft from
transport.

This does not satisfactorily resolve the question of name vs ID. In the
old world, names were unique in the cluster and transmitted over the
wire, so they could be used safely in all statistics. In the new world,
a given EtcdServer only knows its own name, and it is instead IDs that
are communicated among the cluster members. Hence in most places here we
simply substitute a string-encoded ID in place of name, and only where
possible do we retain the actual given name of the EtcdServer.
Jonathan Boulle 11 years ago
parent
commit
8168fed825

+ 33 - 6
etcdserver/cluster_store.go

@@ -6,8 +6,12 @@ import (
 	"fmt"
 	"log"
 	"net/http"
+	"strconv"
+	"time"
 
 	etcdErr "github.com/coreos/etcd/error"
+
+	"github.com/coreos/etcd/etcdserver/stats"
 	"github.com/coreos/etcd/raft/raftpb"
 	"github.com/coreos/etcd/store"
 )
@@ -103,19 +107,25 @@ func (s *clusterStore) Remove(id uint64) {
 	}
 }
 
-func Sender(t *http.Transport, cls ClusterStore) func(msgs []raftpb.Message) {
+// Sender creates the default production sender used to transport raft messages
+// in the cluster. The returned sender will update the given ServerStats and
+// LeaderStats appropriately.
+func Sender(t *http.Transport, cls ClusterStore, ss *stats.ServerStats, ls *stats.LeaderStats) func(msgs []raftpb.Message) {
 	c := &http.Client{Transport: t}
 
 	return func(msgs []raftpb.Message) {
 		for _, m := range msgs {
 			// TODO: reuse go routines
 			// limit the number of outgoing connections for the same receiver
-			go send(c, cls, m)
+			go send(c, cls, m, ss, ls)
 		}
 	}
 }
 
-func send(c *http.Client, cls ClusterStore, m raftpb.Message) {
+// send uses the given client to send a message to a member in the given
+// ClusterStore, retrying up to 3 times for each message. The given
+// ServerStats and LeaderStats are updated appropriately
+func send(c *http.Client, cls ClusterStore, m raftpb.Message, ss *stats.ServerStats, ls *stats.LeaderStats) {
 	// TODO (xiangli): reasonable retry logic
 	for i := 0; i < 3; i++ {
 		u := cls.Get().Pick(m.To)
@@ -126,7 +136,6 @@ func send(c *http.Client, cls ClusterStore, m raftpb.Message) {
 			log.Printf("etcdhttp: no addr for %d", m.To)
 			return
 		}
-
 		u = fmt.Sprintf("%s%s", u, raftPrefix)
 
 		// TODO: don't block. we should be able to have 1000s
@@ -136,13 +145,31 @@ func send(c *http.Client, cls ClusterStore, m raftpb.Message) {
 			log.Println("etcdhttp: dropping message:", err)
 			return // drop bad message
 		}
-		if httpPost(c, u, data) {
-			return // success
+		if m.Type == raftpb.MsgApp {
+			ss.SendAppendReq(len(data))
+		}
+		to := strconv.FormatUint(m.To, 10)
+		fs, ok := ls.Followers[to]
+		if !ok {
+			fs = &stats.FollowerStats{}
+			fs.Latency.Minimum = 1 << 63
+			ls.Followers[to] = fs
+		}
+
+		start := time.Now()
+		sent := httpPost(c, u, data)
+		end := time.Now()
+		if sent {
+			fs.Succ(end.Sub(start))
+			return
 		}
+		fs.Fail()
 		// TODO: backoff
 	}
 }
 
+// httpPost POSTs a data payload to a url using the given client. Returns true
+// if the POST succeeds, false on any failure.
 func httpPost(c *http.Client, url string, data []byte) bool {
 	resp, err := c.Post(url, "application/protobuf", bytes.NewBuffer(data))
 	if err != nil {

+ 39 - 3
etcdserver/etcdhttp/http.go

@@ -41,14 +41,16 @@ func NewClientHandler(server *etcdserver.EtcdServer) http.Handler {
 	sh := &serverHandler{
 		server:       server,
 		clusterStore: server.ClusterStore,
+		stats:        server,
 		timer:        server,
 		timeout:      defaultServerTimeout,
-		storeStats:   server.StoreStats,
 	}
 	mux := http.NewServeMux()
 	mux.HandleFunc(keysPrefix, sh.serveKeys)
 	mux.HandleFunc(keysPrefix+"/", sh.serveKeys)
 	mux.HandleFunc(statsPrefix+"/store", sh.serveStoreStats)
+	mux.HandleFunc(statsPrefix+"/self", sh.serveSelfStats)
+	mux.HandleFunc(statsPrefix+"/leader", sh.serveLeaderStats)
 	// TODO: dynamic configuration may make this outdated. take care of it.
 	// TODO: dynamic configuration may introduce race also.
 	// TODO: add serveMembers
@@ -73,9 +75,9 @@ func NewPeerHandler(server etcdserver.Server) http.Handler {
 type serverHandler struct {
 	timeout      time.Duration
 	server       etcdserver.Server
+	stats        etcdserver.Stats
 	timer        etcdserver.RaftTimer
 	clusterStore etcdserver.ClusterStore
-	storeStats   func() []byte
 }
 
 func (h serverHandler) serveKeys(w http.ResponseWriter, r *http.Request) {
@@ -171,7 +173,37 @@ func (h serverHandler) serveStoreStats(w http.ResponseWriter, r *http.Request) {
 		return
 	}
 	w.Header().Set("Content-Type", "application/json")
-	w.Write(h.storeStats())
+	w.Write(h.stats.StoreStats())
+}
+
+func (h serverHandler) serveSelfStats(w http.ResponseWriter, r *http.Request) {
+	if !allowMethod(w, r.Method, "GET") {
+		return
+	}
+	s := h.stats.ServerStats()
+	b, err := json.Marshal(s)
+	if err != nil {
+		log.Printf("error marshalling stats: %v\n", err)
+		http.Error(w, "Internal Server Error", http.StatusInternalServerError)
+		return
+	}
+	w.Header().Set("Content-Type", "application/json")
+	w.Write(b)
+}
+
+func (h serverHandler) serveLeaderStats(w http.ResponseWriter, r *http.Request) {
+	if !allowMethod(w, r.Method, "GET") {
+		return
+	}
+	s := h.stats.LeaderStats()
+	b, err := json.Marshal(s)
+	if err != nil {
+		log.Printf("error marshalling stats: %v\n", err)
+		http.Error(w, "Internal Server Error", http.StatusInternalServerError)
+		return
+	}
+	w.Header().Set("Content-Type", "application/json")
+	w.Write(b)
 }
 
 func (h serverHandler) serveRaft(w http.ResponseWriter, r *http.Request) {
@@ -192,6 +224,10 @@ func (h serverHandler) serveRaft(w http.ResponseWriter, r *http.Request) {
 		return
 	}
 	log.Printf("etcdhttp: raft recv message from %#x: %+v", m.From, m)
+	if m.Type == raftpb.MsgApp {
+		// TODO(jonboulle):
+		h.stats.ServerStats().RecvAppendReq(strconv.FormatUint(m.From, 10), int(r.ContentLength))
+	}
 	if err := h.server.Process(context.TODO(), m); err != nil {
 		log.Println("etcdhttp: error processing raft message:", err)
 		writeError(w, err)

+ 10 - 3
etcdserver/etcdhttp/http_test.go

@@ -19,6 +19,7 @@ import (
 	etcdErr "github.com/coreos/etcd/error"
 	"github.com/coreos/etcd/etcdserver"
 	"github.com/coreos/etcd/etcdserver/etcdserverpb"
+	"github.com/coreos/etcd/etcdserver/stats"
 	"github.com/coreos/etcd/raft/raftpb"
 	"github.com/coreos/etcd/store"
 )
@@ -637,12 +638,18 @@ func TestServeMachines(t *testing.T) {
 	}
 }
 
+type ds struct {
+	data []byte
+}
+
+func (s *ds) ServerStats() *stats.ServerStats { return nil }
+func (s *ds) LeaderStats() *stats.LeaderStats { return nil }
+func (s *ds) StoreStats() []byte              { return s.data }
+
 func TestServeStoreStats(t *testing.T) {
 	w := "foobarbaz"
 	sh := &serverHandler{
-		storeStats: func() []byte {
-			return []byte(w)
-		},
+		stats: &ds{data: []byte(w)},
 	}
 	rw := httptest.NewRecorder()
 	req, err := http.NewRequest("GET", "", nil)

+ 42 - 5
etcdserver/server.go

@@ -6,12 +6,14 @@ import (
 	"log"
 	"math/rand"
 	"os"
+	"strconv"
 	"sync/atomic"
 	"time"
 
 	"github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
 	"github.com/coreos/etcd/discovery"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	"github.com/coreos/etcd/etcdserver/stats"
 	"github.com/coreos/etcd/pkg/pbutil"
 	"github.com/coreos/etcd/raft"
 	"github.com/coreos/etcd/raft/raftpb"
@@ -88,6 +90,16 @@ type Server interface {
 	RemoveMember(ctx context.Context, id uint64) error
 }
 
+type Stats interface {
+	// ServerStats returns the statistics of this server
+	ServerStats() *stats.ServerStats
+	// LeaderStats returns the statistics of all followers in the cluster
+	// if this server is leader. Otherwise, nil is returned.
+	LeaderStats() *stats.LeaderStats
+	// StoreStats returns statistics of the underlying Store used by the etcdserver
+	StoreStats() []byte
+}
+
 type RaftTimer interface {
 	Index() uint64
 	Term() uint64
@@ -105,6 +117,9 @@ type EtcdServer struct {
 	node  raft.Node
 	store store.Store
 
+	stats  *stats.ServerStats
+	lstats *stats.LeaderStats
+
 	// send specifies the send function for sending msgs to members. send
 	// MUST NOT block. It is okay to drop messages, since clients should
 	// timeout and reissue their messages.  If send is nil, server will
@@ -172,6 +187,13 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
 	}
 
 	cls := &clusterStore{Store: st}
+
+	sstats := &stats.ServerStats{
+		Name: cfg.Name,
+		ID:   strconv.FormatUint(cfg.ID(), 10),
+	}
+	lstats := stats.NewLeaderStats(strconv.FormatUint(cfg.ID(), 10))
+
 	s := &EtcdServer{
 		store:      st,
 		node:       n,
@@ -181,7 +203,9 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
 			*wal.WAL
 			*snap.Snapshotter
 		}{w, ss},
-		send:         Sender(cfg.Transport, cls),
+		stats:        sstats,
+		lstats:       lstats,
+		send:         Sender(cfg.Transport, cls, sstats, lstats),
 		ticker:       time.Tick(100 * time.Millisecond),
 		syncTicker:   time.Tick(500 * time.Millisecond),
 		snapCount:    cfg.SnapCount,
@@ -198,10 +222,6 @@ func (s *EtcdServer) Start() {
 	go s.publish(defaultPublishRetryInterval)
 }
 
-func (s *EtcdServer) StoreStats() []byte {
-	return s.store.JsonStats()
-}
-
 // start prepares and starts server in a new goroutine. It is no longer safe to
 // modify a server's fields after it has been sent to Start.
 // This function is just used for testing.
@@ -212,6 +232,7 @@ func (s *EtcdServer) start() {
 	}
 	s.w = wait.New()
 	s.done = make(chan struct{})
+	s.stats.Initialize()
 	// TODO: if this is an empty log, writes all peer infos
 	// into the first entry
 	go s.run()
@@ -338,6 +359,22 @@ func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) {
 	}
 }
 
+func (s *EtcdServer) ServerStats() *stats.ServerStats {
+	s.stats.LeaderInfo.Uptime = time.Now().Sub(s.stats.LeaderInfo.StartTime).String()
+	s.stats.SendingPkgRate, s.stats.SendingBandwidthRate = s.stats.SendRates()
+	s.stats.RecvingPkgRate, s.stats.RecvingBandwidthRate = s.stats.RecvRates()
+	return s.stats
+}
+
+func (s *EtcdServer) LeaderStats() *stats.LeaderStats {
+	// TODO(jonboulle): need to lock access to lstats, set it to nil when not leader, ...
+	return s.lstats
+}
+
+func (s *EtcdServer) StoreStats() []byte {
+	return s.store.JsonStats()
+}
+
 func (s *EtcdServer) AddMember(ctx context.Context, memb Member) error {
 	// TODO: move Member to protobuf type
 	b, err := json.Marshal(memb)

+ 68 - 0
etcdserver/stats/leader.go

@@ -0,0 +1,68 @@
+package stats
+
+import (
+	"math"
+	"time"
+)
+
+// LeaderStats is used by the leader in an etcd cluster, and encapsulates
+// statistics about communication with its followers
+type LeaderStats struct {
+	// TODO(jonboulle): clarify that these are IDs, not names
+	Leader    string                    `json:"leader"`
+	Followers map[string]*FollowerStats `json:"followers"`
+}
+
+// NewLeaderStats generates a new LeaderStats with the given id as leader
+func NewLeaderStats(id string) *LeaderStats {
+	return &LeaderStats{
+		Leader:    id,
+		Followers: make(map[string]*FollowerStats),
+	}
+}
+
+// FollowerStats encapsulates various statistics about a follower in an etcd cluster
+type FollowerStats struct {
+	Latency struct {
+		Current           float64 `json:"current"`
+		Average           float64 `json:"average"`
+		averageSquare     float64
+		StandardDeviation float64 `json:"standardDeviation"`
+		Minimum           float64 `json:"minimum"`
+		Maximum           float64 `json:"maximum"`
+	} `json:"latency"`
+
+	Counts struct {
+		Fail    uint64 `json:"fail"`
+		Success uint64 `json:"success"`
+	} `json:"counts"`
+}
+
+// Succ updates the FollowerStats with a successful send
+func (fs *FollowerStats) Succ(d time.Duration) {
+	total := float64(fs.Counts.Success) * fs.Latency.Average
+	totalSquare := float64(fs.Counts.Success) * fs.Latency.averageSquare
+
+	fs.Counts.Success++
+
+	fs.Latency.Current = float64(d) / (1000000.0)
+
+	if fs.Latency.Current > fs.Latency.Maximum {
+		fs.Latency.Maximum = fs.Latency.Current
+	}
+
+	if fs.Latency.Current < fs.Latency.Minimum {
+		fs.Latency.Minimum = fs.Latency.Current
+	}
+
+	fs.Latency.Average = (total + fs.Latency.Current) / float64(fs.Counts.Success)
+	fs.Latency.averageSquare = (totalSquare + fs.Latency.Current*fs.Latency.Current) / float64(fs.Counts.Success)
+
+	// sdv = sqrt(avg(x^2) - avg(x)^2)
+	fs.Latency.StandardDeviation = math.Sqrt(fs.Latency.averageSquare - fs.Latency.Average*fs.Latency.Average)
+}
+
+// Fail updates the FollowerStats with an unsuccessful send
+func (fs *FollowerStats) Fail() {
+	fs.Counts.Fail++
+}

+ 96 - 0
etcdserver/stats/queue.go

@@ -0,0 +1,96 @@
+package stats
+
+import (
+	"sync"
+	"time"
+)
+
+const (
+	queueCapacity = 200
+)
+
+// RequestStats represent the stats for a request.
+// It encapsulates the sending time and the size of the request.
+type RequestStats struct {
+	SendingTime time.Time
+	Size        int
+}
+
+type statsQueue struct {
+	items        [queueCapacity]*RequestStats
+	size         int
+	front        int
+	back         int
+	totalReqSize int
+	rwl          sync.RWMutex
+}
+
+func (q *statsQueue) Len() int {
+	return q.size
+}
+
+func (q *statsQueue) ReqSize() int {
+	return q.totalReqSize
+}
+
+// FrontAndBack gets the front and back elements in the queue
+// We must grab front and back together with the protection of the lock
+func (q *statsQueue) frontAndBack() (*RequestStats, *RequestStats) {
+	q.rwl.RLock()
+	defer q.rwl.RUnlock()
+	if q.size != 0 {
+		return q.items[q.front], q.items[q.back]
+	}
+	return nil, nil
+}
+
+// Insert function insert a RequestStats into the queue and update the records
+func (q *statsQueue) Insert(p *RequestStats) {
+	q.rwl.Lock()
+	defer q.rwl.Unlock()
+
+	q.back = (q.back + 1) % queueCapacity
+
+	if q.size == queueCapacity { //dequeue
+		q.totalReqSize -= q.items[q.front].Size
+		q.front = (q.back + 1) % queueCapacity
+	} else {
+		q.size++
+	}
+
+	q.items[q.back] = p
+	q.totalReqSize += q.items[q.back].Size
+
+}
+
+// Rate function returns the package rate and byte rate
+func (q *statsQueue) Rate() (float64, float64) {
+	front, back := q.frontAndBack()
+
+	if front == nil || back == nil {
+		return 0, 0
+	}
+
+	if time.Now().Sub(back.SendingTime) > time.Second {
+		q.Clear()
+		return 0, 0
+	}
+
+	sampleDuration := back.SendingTime.Sub(front.SendingTime)
+
+	pr := float64(q.Len()) / float64(sampleDuration) * float64(time.Second)
+
+	br := float64(q.ReqSize()) / float64(sampleDuration) * float64(time.Second)
+
+	return pr, br
+}
+
+// Clear function clear up the statsQueue
+func (q *statsQueue) Clear() {
+	q.rwl.Lock()
+	defer q.rwl.Unlock()
+	q.back = -1
+	q.front = 0
+	q.size = 0
+	q.totalReqSize = 0
+}

+ 110 - 0
etcdserver/stats/server.go

@@ -0,0 +1,110 @@
+package stats
+
+import (
+	"sync"
+	"time"
+
+	"github.com/coreos/etcd/raft"
+)
+
+// ServerStats encapsulates various statistics about an EtcdServer and its
+// communication with other members of the cluster
+type ServerStats struct {
+	Name string `json:"name"`
+	// TODO(jonboulle): use ID instead of name?
+	ID        string         `json:"id"`
+	State     raft.StateType `json:"state"`
+	StartTime time.Time      `json:"startTime"`
+
+	LeaderInfo struct {
+		Name      string    `json:"leader"`
+		Uptime    string    `json:"uptime"`
+		StartTime time.Time `json:"startTime"`
+	} `json:"leaderInfo"`
+
+	RecvAppendRequestCnt uint64  `json:"recvAppendRequestCnt,"`
+	RecvingPkgRate       float64 `json:"recvPkgRate,omitempty"`
+	RecvingBandwidthRate float64 `json:"recvBandwidthRate,omitempty"`
+
+	SendAppendRequestCnt uint64  `json:"sendAppendRequestCnt"`
+	SendingPkgRate       float64 `json:"sendPkgRate,omitempty"`
+	SendingBandwidthRate float64 `json:"sendBandwidthRate,omitempty"`
+
+	sendRateQueue *statsQueue
+	recvRateQueue *statsQueue
+
+	sync.Mutex
+}
+
+// Initialize clears the statistics of ServerStats and resets its start time
+func (ss *ServerStats) Initialize() {
+	if ss == nil {
+		return
+	}
+	now := time.Now()
+	ss.StartTime = now
+	ss.LeaderInfo.StartTime = now
+	ss.sendRateQueue = &statsQueue{
+		back: -1,
+	}
+	ss.recvRateQueue = &statsQueue{
+		back: -1,
+	}
+}
+
+// RecvRates calculates and returns the rate of received append requests
+func (ss *ServerStats) RecvRates() (float64, float64) {
+	return ss.recvRateQueue.Rate()
+}
+
+// SendRates calculates and returns the rate of sent append requests
+func (ss *ServerStats) SendRates() (float64, float64) {
+	return ss.sendRateQueue.Rate()
+}
+
+// RecvAppendReq updates the ServerStats in response to an AppendRequest
+// from the given leader being received
+func (ss *ServerStats) RecvAppendReq(leader string, reqSize int) {
+	ss.Lock()
+	defer ss.Unlock()
+
+	now := time.Now()
+
+	ss.State = raft.StateFollower
+	if leader != ss.LeaderInfo.Name {
+		ss.LeaderInfo.Name = leader
+		ss.LeaderInfo.StartTime = now
+	}
+
+	ss.recvRateQueue.Insert(
+		&RequestStats{
+			SendingTime: now,
+			Size:        reqSize,
+		},
+	)
+	ss.RecvAppendRequestCnt++
+}
+
+// SendAppendReq updates the ServerStats in response to an AppendRequest
+// being sent by this server
+func (ss *ServerStats) SendAppendReq(reqSize int) {
+	ss.Lock()
+	defer ss.Unlock()
+
+	now := time.Now()
+
+	if ss.State != raft.StateLeader {
+		ss.State = raft.StateLeader
+		ss.LeaderInfo.Name = ss.ID
+		ss.LeaderInfo.StartTime = now
+	}
+
+	ss.sendRateQueue.Insert(
+		&RequestStats{
+			SendingTime: now,
+			Size:        reqSize,
+		},
+	)
+
+	ss.SendAppendRequestCnt++
+}

+ 4 - 0
raft/raft.go

@@ -34,6 +34,10 @@ func (st StateType) String() string {
 	return stmap[uint64(st)]
 }
 
+func (st StateType) MarshalJSON() ([]byte, error) {
+	return []byte(fmt.Sprintf("%q", st.String())), nil
+}
+
 type progress struct {
 	match, next uint64
 }