Browse Source

etcdserver: fix race and improve stats interfaces

Jonathan Boulle 11 years ago
parent
commit
c28907ba95

+ 1 - 2
etcdserver/cluster_store.go

@@ -6,7 +6,6 @@ import (
 	"fmt"
 	"fmt"
 	"log"
 	"log"
 	"net/http"
 	"net/http"
-	"strconv"
 	"time"
 	"time"
 
 
 	etcdErr "github.com/coreos/etcd/error"
 	etcdErr "github.com/coreos/etcd/error"
@@ -148,7 +147,7 @@ func send(c *http.Client, cls ClusterStore, m raftpb.Message, ss *stats.ServerSt
 		if m.Type == raftpb.MsgApp {
 		if m.Type == raftpb.MsgApp {
 			ss.SendAppendReq(len(data))
 			ss.SendAppendReq(len(data))
 		}
 		}
-		to := strconv.FormatUint(m.To, 16)
+		to := idAsHex(m.To)
 		fs := ls.Follower(to)
 		fs := ls.Follower(to)
 
 
 		start := time.Now()
 		start := time.Now()

+ 5 - 15
etcdserver/etcdhttp/http.go

@@ -42,7 +42,6 @@ func NewClientHandler(server *etcdserver.EtcdServer) http.Handler {
 		server:       server,
 		server:       server,
 		clusterStore: server.ClusterStore,
 		clusterStore: server.ClusterStore,
 		stats:        server,
 		stats:        server,
-		storestats:   server,
 		timer:        server,
 		timer:        server,
 		timeout:      defaultServerTimeout,
 		timeout:      defaultServerTimeout,
 	}
 	}
@@ -77,8 +76,7 @@ func NewPeerHandler(server *etcdserver.EtcdServer) http.Handler {
 type serverHandler struct {
 type serverHandler struct {
 	timeout      time.Duration
 	timeout      time.Duration
 	server       etcdserver.Server
 	server       etcdserver.Server
-	stats        etcdserver.ServerStats
-	storestats   etcdserver.StoreStats
+	stats        etcdserver.Stats
 	timer        etcdserver.RaftTimer
 	timer        etcdserver.RaftTimer
 	clusterStore etcdserver.ClusterStore
 	clusterStore etcdserver.ClusterStore
 }
 }
@@ -176,7 +174,7 @@ func (h serverHandler) serveStoreStats(w http.ResponseWriter, r *http.Request) {
 		return
 		return
 	}
 	}
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Content-Type", "application/json")
-	w.Write(h.storestats.StoreStatsJSON())
+	w.Write(h.stats.StoreStats())
 }
 }
 
 
 func (h serverHandler) serveSelfStats(w http.ResponseWriter, r *http.Request) {
 func (h serverHandler) serveSelfStats(w http.ResponseWriter, r *http.Request) {
@@ -184,22 +182,15 @@ func (h serverHandler) serveSelfStats(w http.ResponseWriter, r *http.Request) {
 		return
 		return
 	}
 	}
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Content-Type", "application/json")
-	w.Write(h.stats.SelfStatsJSON())
+	w.Write(h.stats.SelfStats())
 }
 }
 
 
 func (h serverHandler) serveLeaderStats(w http.ResponseWriter, r *http.Request) {
 func (h serverHandler) serveLeaderStats(w http.ResponseWriter, r *http.Request) {
 	if !allowMethod(w, r.Method, "GET") {
 	if !allowMethod(w, r.Method, "GET") {
 		return
 		return
 	}
 	}
-	s := h.stats.LeaderStats()
-	b, err := json.Marshal(s)
-	if err != nil {
-		log.Printf("error marshalling stats: %v\n", err)
-		http.Error(w, "Internal Server Error", http.StatusInternalServerError)
-		return
-	}
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Content-Type", "application/json")
-	w.Write(b)
+	w.Write(h.stats.LeaderStats())
 }
 }
 
 
 func (h serverHandler) serveRaft(w http.ResponseWriter, r *http.Request) {
 func (h serverHandler) serveRaft(w http.ResponseWriter, r *http.Request) {
@@ -221,8 +212,7 @@ func (h serverHandler) serveRaft(w http.ResponseWriter, r *http.Request) {
 	}
 	}
 	log.Printf("etcdhttp: raft recv message from %#x: %+v", m.From, m)
 	log.Printf("etcdhttp: raft recv message from %#x: %+v", m.From, m)
 	if m.Type == raftpb.MsgApp {
 	if m.Type == raftpb.MsgApp {
-		// TODO(jonboulle): standardize id uint-->string process: always base 16?
-		h.stats.SelfStats().RecvAppendReq(strconv.FormatUint(m.From, 16), int(r.ContentLength))
+		h.stats.UpdateRecvApp(m.From, r.ContentLength)
 	}
 	}
 	if err := h.server.Process(context.TODO(), m); err != nil {
 	if err := h.server.Process(context.TODO(), m); err != nil {
 		log.Println("etcdhttp: error processing raft message:", err)
 		log.Println("etcdhttp: error processing raft message:", err)

+ 19 - 40
etcdserver/etcdhttp/http_test.go

@@ -19,7 +19,6 @@ import (
 	etcdErr "github.com/coreos/etcd/error"
 	etcdErr "github.com/coreos/etcd/error"
 	"github.com/coreos/etcd/etcdserver"
 	"github.com/coreos/etcd/etcdserver"
 	"github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/etcdserver/etcdserverpb"
-	"github.com/coreos/etcd/etcdserver/stats"
 	"github.com/coreos/etcd/raft/raftpb"
 	"github.com/coreos/etcd/raft/raftpb"
 	"github.com/coreos/etcd/store"
 	"github.com/coreos/etcd/store"
 )
 )
@@ -638,28 +637,20 @@ func TestServeMachines(t *testing.T) {
 	}
 	}
 }
 }
 
 
-type dummyServerStats struct {
-	js []byte
-	ls *stats.LeaderStats
+type dummyStats struct {
+	data []byte
 }
 }
 
 
-func (dss *dummyServerStats) SelfStatsJSON() []byte           { return dss.js }
-func (dss *dummyServerStats) SelfStats() *stats.ServerStats   { return nil }
-func (dss *dummyServerStats) LeaderStats() *stats.LeaderStats { return dss.ls }
+func (ds *dummyStats) SelfStats() []byte               { return ds.data }
+func (ds *dummyStats) LeaderStats() []byte             { return ds.data }
+func (ds *dummyStats) StoreStats() []byte              { return ds.data }
+func (ds *dummyStats) UpdateRecvApp(_ uint64, _ int64) {}
 
 
 func TestServeSelfStats(t *testing.T) {
 func TestServeSelfStats(t *testing.T) {
-	ss := &stats.ServerStats{
-		Name:           "foobar",
-		RecvingPkgRate: 123.4,
-	}
-	w, err := json.Marshal(ss)
-	if err != nil {
-		t.Fatal("error marshaling: %v", err)
-	}
+	wb := []byte("some statistics")
+	w := string(wb)
 	sh := &serverHandler{
 	sh := &serverHandler{
-		stats: &dummyServerStats{
-			js: w,
-		},
+		stats: &dummyStats{data: wb},
 	}
 	}
 	rw := httptest.NewRecorder()
 	rw := httptest.NewRecorder()
 	sh.serveSelfStats(rw, &http.Request{Method: "GET"})
 	sh.serveSelfStats(rw, &http.Request{Method: "GET"})
@@ -670,8 +661,8 @@ func TestServeSelfStats(t *testing.T) {
 	if gct := rw.Header().Get("Content-Type"); gct != wct {
 	if gct := rw.Header().Get("Content-Type"); gct != wct {
 		t.Errorf("Content-Type = %q, want %q", gct, wct)
 		t.Errorf("Content-Type = %q, want %q", gct, wct)
 	}
 	}
-	if g := rw.Body.String(); g != string(w) {
-		t.Errorf("body = %s, want %s", g, string(w))
+	if g := rw.Body.String(); g != w {
+		t.Errorf("body = %s, want %s", g, w)
 	}
 	}
 }
 }
 
 
@@ -708,17 +699,10 @@ func TestLeaderServeStatsBad(t *testing.T) {
 }
 }
 
 
 func TestServeLeaderStats(t *testing.T) {
 func TestServeLeaderStats(t *testing.T) {
-	ls := &stats.LeaderStats{
-		Leader: "foobar",
-	}
-	w, err := json.Marshal(ls)
-	if err != nil {
-		t.Fatal("error marshaling: %v", err)
-	}
+	wb := []byte("some statistics")
+	w := string(wb)
 	sh := &serverHandler{
 	sh := &serverHandler{
-		stats: &dummyServerStats{
-			ls: ls,
-		},
+		stats: &dummyStats{data: wb},
 	}
 	}
 	rw := httptest.NewRecorder()
 	rw := httptest.NewRecorder()
 	sh.serveLeaderStats(rw, &http.Request{Method: "GET"})
 	sh.serveLeaderStats(rw, &http.Request{Method: "GET"})
@@ -729,21 +713,16 @@ func TestServeLeaderStats(t *testing.T) {
 	if gct := rw.Header().Get("Content-Type"); gct != wct {
 	if gct := rw.Header().Get("Content-Type"); gct != wct {
 		t.Errorf("Content-Type = %q, want %q", gct, wct)
 		t.Errorf("Content-Type = %q, want %q", gct, wct)
 	}
 	}
-	if g := rw.Body.String(); g != string(w) {
-		t.Errorf("body = %s, want %s", g, string(w))
+	if g := rw.Body.String(); g != w {
+		t.Errorf("body = %s, want %s", g, w)
 	}
 	}
 }
 }
 
 
-type dummyStoreStats struct {
-	data []byte
-}
-
-func (dss *dummyStoreStats) StoreStatsJSON() []byte { return dss.data }
-
 func TestServeStoreStats(t *testing.T) {
 func TestServeStoreStats(t *testing.T) {
-	w := "foobarbaz"
+	wb := []byte("some statistics")
+	w := string(wb)
 	sh := &serverHandler{
 	sh := &serverHandler{
-		storestats: &dummyStoreStats{data: []byte(w)},
+		stats: &dummyStats{data: wb},
 	}
 	}
 	rw := httptest.NewRecorder()
 	rw := httptest.NewRecorder()
 	sh.serveStoreStats(rw, &http.Request{Method: "GET"})
 	sh.serveStoreStats(rw, &http.Request{Method: "GET"})

+ 1 - 1
etcdserver/member.go

@@ -55,7 +55,7 @@ func newMember(name string, peerURLs types.URLs, now *time.Time) *Member {
 }
 }
 
 
 func (m Member) storeKey() string {
 func (m Member) storeKey() string {
-	return path.Join(membersKVPrefix, strconv.FormatUint(m.ID, 16))
+	return path.Join(membersKVPrefix, idAsHex(m.ID))
 }
 }
 
 
 func parseMemberID(key string) uint64 {
 func parseMemberID(key string) uint64 {

+ 22 - 30
etcdserver/server.go

@@ -90,19 +90,16 @@ type Server interface {
 	RemoveMember(ctx context.Context, id uint64) error
 	RemoveMember(ctx context.Context, id uint64) error
 }
 }
 
 
-type ServerStats interface {
+type Stats interface {
 	// SelfStats returns the struct representing statistics of this server
 	// SelfStats returns the struct representing statistics of this server
-	SelfStats() *stats.ServerStats
-	// SelfStats returns the statistics of this server in JSON form
-	SelfStatsJSON() []byte
+	SelfStats() []byte
 	// LeaderStats returns the statistics of all followers in the cluster
 	// LeaderStats returns the statistics of all followers in the cluster
 	// if this server is leader. Otherwise, nil is returned.
 	// if this server is leader. Otherwise, nil is returned.
-	LeaderStats() *stats.LeaderStats
-}
-
-type StoreStats interface {
-	// StoreStatsJSON returns statistics of the store in JSON format
-	StoreStatsJSON() []byte
+	LeaderStats() []byte
+	// StoreStats returns statistics of the store backing this EtcdServer
+	StoreStats() []byte
+	// UpdateRecvApp updates the underlying statistics in response to a receiving an Append request
+	UpdateRecvApp(from uint64, length int64)
 }
 }
 
 
 type RaftTimer interface {
 type RaftTimer interface {
@@ -195,9 +192,9 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
 
 
 	sstats := &stats.ServerStats{
 	sstats := &stats.ServerStats{
 		Name: cfg.Name,
 		Name: cfg.Name,
-		ID:   strconv.FormatUint(cfg.ID(), 16),
+		ID:   idAsHex(cfg.ID()),
 	}
 	}
-	lstats := stats.NewLeaderStats(strconv.FormatUint(cfg.ID(), 16))
+	lstats := stats.NewLeaderStats(idAsHex(cfg.ID()))
 
 
 	s := &EtcdServer{
 	s := &EtcdServer{
 		store:      st,
 		store:      st,
@@ -364,32 +361,23 @@ func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) {
 	}
 	}
 }
 }
 
 
-func (s *EtcdServer) SelfStats() *stats.ServerStats {
-	return s.stats
-}
-
-func (s *EtcdServer) SelfStatsJSON() []byte {
-	stats := *s.stats
-	stats.LeaderInfo.Uptime = time.Now().Sub(stats.LeaderInfo.StartTime).String()
-	stats.SendingPkgRate, stats.SendingBandwidthRate = stats.SendRates()
-	stats.RecvingPkgRate, stats.RecvingBandwidthRate = stats.RecvRates()
-	b, err := json.Marshal(s.stats)
-	// TODO(jonboulle): appropriate error handling?
-	if err != nil {
-		log.Printf("error marshalling self stats: %v", err)
-	}
-	return b
+func (s *EtcdServer) SelfStats() []byte {
+	return s.stats.JSON()
 }
 }
 
 
-func (s *EtcdServer) LeaderStats() *stats.LeaderStats {
+func (s *EtcdServer) LeaderStats() []byte {
 	// TODO(jonboulle): need to lock access to lstats, set it to nil when not leader, ...
 	// TODO(jonboulle): need to lock access to lstats, set it to nil when not leader, ...
-	return s.lstats
+	return s.lstats.JSON()
 }
 }
 
 
-func (s *EtcdServer) StoreStatsJSON() []byte {
+func (s *EtcdServer) StoreStats() []byte {
 	return s.store.JsonStats()
 	return s.store.JsonStats()
 }
 }
 
 
+func (s *EtcdServer) UpdateRecvApp(from uint64, length int64) {
+	s.stats.RecvAppendReq(idAsHex(from), int(length))
+}
+
 func (s *EtcdServer) AddMember(ctx context.Context, memb Member) error {
 func (s *EtcdServer) AddMember(ctx context.Context, memb Member) error {
 	// TODO: move Member to protobuf type
 	// TODO: move Member to protobuf type
 	b, err := json.Marshal(memb)
 	b, err := json.Marshal(memb)
@@ -691,3 +679,7 @@ func containsUint64(a []uint64, x uint64) bool {
 	}
 	}
 	return false
 	return false
 }
 }
+
+func idAsHex(id uint64) string {
+	return strconv.FormatUint(id, 16)
+}

+ 6 - 0
etcdserver/stats/leader.go

@@ -1,6 +1,7 @@
 package stats
 package stats
 
 
 import (
 import (
+	"encoding/json"
 	"math"
 	"math"
 	"sync"
 	"sync"
 	"time"
 	"time"
@@ -24,6 +25,11 @@ func NewLeaderStats(id string) *LeaderStats {
 	}
 	}
 }
 }
 
 
+func (ls *LeaderStats) JSON() []byte {
+	b, _ := json.Marshal(ls)
+	return b
+}
+
 func (ls *LeaderStats) Follower(name string) *FollowerStats {
 func (ls *LeaderStats) Follower(name string) *FollowerStats {
 	ls.Lock()
 	ls.Lock()
 	defer ls.Unlock()
 	defer ls.Unlock()

+ 16 - 0
etcdserver/stats/server.go

@@ -1,6 +1,8 @@
 package stats
 package stats
 
 
 import (
 import (
+	"encoding/json"
+	"log"
 	"sync"
 	"sync"
 	"time"
 	"time"
 
 
@@ -36,6 +38,20 @@ type ServerStats struct {
 	sync.Mutex
 	sync.Mutex
 }
 }
 
 
+func (ss *ServerStats) JSON() []byte {
+	ss.Lock()
+	defer ss.Unlock()
+	ss.LeaderInfo.Uptime = time.Now().Sub(ss.LeaderInfo.StartTime).String()
+	ss.SendingPkgRate, ss.SendingBandwidthRate = ss.SendRates()
+	ss.RecvingPkgRate, ss.RecvingBandwidthRate = ss.RecvRates()
+	b, err := json.Marshal(ss)
+	// TODO(jonboulle): appropriate error handling?
+	if err != nil {
+		log.Printf("error marshalling server stats: %v", err)
+	}
+	return b
+}
+
 // Initialize clears the statistics of ServerStats and resets its start time
 // Initialize clears the statistics of ServerStats and resets its start time
 func (ss *ServerStats) Initialize() {
 func (ss *ServerStats) Initialize() {
 	if ss == nil {
 	if ss == nil {