Browse Source

feat(metrics): enable some metrics; push to graphite

* -trace flag controls whether or not to enable metrics-gathering
  and the /debug/* HTTP endpoints
* -graphite-host flag controls where metrics should be sent
* timer.ae.handle metric tracks execution time of AppendEntriesRequest
Brian Waldon 12 years ago
parent
commit
7eaad5c8e0
6 changed files with 67 additions and 7 deletions
  1. 23 2
      etcd.go
  2. 16 0
      server/config.go
  3. 7 1
      server/peer_server.go
  4. 4 0
      server/peer_server_handlers.go
  5. 15 2
      server/server.go
  6. 2 2
      tests/server_utils.go

+ 23 - 2
etcd.go

@@ -19,9 +19,11 @@ package main
 import (
 import (
 	"fmt"
 	"fmt"
 	"os"
 	"os"
+	"runtime"
 	"time"
 	"time"
 
 
 	"github.com/coreos/etcd/log"
 	"github.com/coreos/etcd/log"
+	"github.com/coreos/etcd/metrics"
 	"github.com/coreos/etcd/server"
 	"github.com/coreos/etcd/server"
 	"github.com/coreos/etcd/store"
 	"github.com/coreos/etcd/store"
 	"github.com/coreos/raft"
 	"github.com/coreos/raft"
@@ -81,6 +83,21 @@ func main() {
 		log.Fatal("Peer TLS:", err)
 		log.Fatal("Peer TLS:", err)
 	}
 	}
 
 
+	var mbName string
+	if config.Trace() {
+		mbName = config.MetricsBucketName()
+		runtime.SetBlockProfileRate(1)
+	}
+
+	mb := metrics.NewBucket(mbName)
+
+	if config.GraphiteHost != "" {
+		err := mb.Publish(config.GraphiteHost)
+		if err != nil {
+			panic(err)
+		}
+	}
+
 	// Create etcd key-value store and registry.
 	// Create etcd key-value store and registry.
 	store := store.New()
 	store := store.New()
 	registry := server.NewRegistry(store)
 	registry := server.NewRegistry(store)
@@ -88,16 +105,20 @@ func main() {
 	// Create peer server.
 	// Create peer server.
 	heartbeatTimeout := time.Duration(config.Peer.HeartbeatTimeout) * time.Millisecond
 	heartbeatTimeout := time.Duration(config.Peer.HeartbeatTimeout) * time.Millisecond
 	electionTimeout := time.Duration(config.Peer.ElectionTimeout) * time.Millisecond
 	electionTimeout := time.Duration(config.Peer.ElectionTimeout) * time.Millisecond
-	ps := server.NewPeerServer(info.Name, config.DataDir, info.RaftURL, info.RaftListenHost, &peerTLSConfig, &info.RaftTLS, registry, store, config.SnapshotCount, heartbeatTimeout, electionTimeout)
+	ps := server.NewPeerServer(info.Name, config.DataDir, info.RaftURL, info.RaftListenHost, &peerTLSConfig, &info.RaftTLS, registry, store, config.SnapshotCount, heartbeatTimeout, electionTimeout, &mb)
 	ps.MaxClusterSize = config.MaxClusterSize
 	ps.MaxClusterSize = config.MaxClusterSize
 	ps.RetryTimes = config.MaxRetryAttempts
 	ps.RetryTimes = config.MaxRetryAttempts
 
 
 	// Create client server.
 	// Create client server.
-	s := server.New(info.Name, info.EtcdURL, info.EtcdListenHost, &tlsConfig, &info.EtcdTLS, ps, registry, store)
+	s := server.New(info.Name, info.EtcdURL, info.EtcdListenHost, &tlsConfig, &info.EtcdTLS, ps, registry, store, &mb)
 	if err := s.AllowOrigins(config.CorsOrigins); err != nil {
 	if err := s.AllowOrigins(config.CorsOrigins); err != nil {
 		panic(err)
 		panic(err)
 	}
 	}
 
 
+	if config.Trace() {
+		s.EnableTracing()
+	}
+
 	ps.SetServer(s)
 	ps.SetServer(s)
 
 
 	// Run peer server in separate thread while the client server blocks.
 	// Run peer server in separate thread while the client server blocks.

+ 16 - 0
server/config.go

@@ -77,6 +77,8 @@ type Config struct {
 		HeartbeatTimeout int    `toml:"heartbeat_timeout" env:"ETCD_PEER_HEARTBEAT_TIMEOUT"`
 		HeartbeatTimeout int    `toml:"heartbeat_timeout" env:"ETCD_PEER_HEARTBEAT_TIMEOUT"`
 		ElectionTimeout  int    `toml:"election_timeout" env:"ETCD_PEER_ELECTION_TIMEOUT"`
 		ElectionTimeout  int    `toml:"election_timeout" env:"ETCD_PEER_ELECTION_TIMEOUT"`
 	}
 	}
+	strTrace     string `toml:"trace" env:"ETCD_TRACE"`
+	GraphiteHost string `toml:"graphite_host" env:"ETCD_GRAPHITE_HOST"`
 }
 }
 
 
 // NewConfig returns a Config initialized with default values.
 // NewConfig returns a Config initialized with default values.
@@ -247,6 +249,9 @@ func (c *Config) LoadFlags(arguments []string) error {
 	f.IntVar(&c.SnapshotCount, "snapshot-count", c.SnapshotCount, "")
 	f.IntVar(&c.SnapshotCount, "snapshot-count", c.SnapshotCount, "")
 	f.StringVar(&c.CPUProfileFile, "cpuprofile", "", "")
 	f.StringVar(&c.CPUProfileFile, "cpuprofile", "", "")
 
 
+	f.StringVar(&c.strTrace, "trace", "", "")
+	f.StringVar(&c.GraphiteHost, "graphite-host", "", "")
+
 	// BEGIN IGNORED FLAGS
 	// BEGIN IGNORED FLAGS
 	f.StringVar(&path, "config", "", "")
 	f.StringVar(&path, "config", "", "")
 	// BEGIN IGNORED FLAGS
 	// BEGIN IGNORED FLAGS
@@ -453,6 +458,17 @@ func (c *Config) PeerTLSConfig() (TLSConfig, error) {
 	return c.PeerTLSInfo().Config()
 	return c.PeerTLSInfo().Config()
 }
 }
 
 
+// MetricsBucketName generates the name that should be used for a
+// corresponding MetricsBucket object
+func (c *Config) MetricsBucketName() string {
+	return fmt.Sprintf("etcd.%s", c.Name)
+}
+
+// Trace determines if any trace-level information should be emitted
+func (c *Config) Trace() bool {
+	return c.strTrace == "*"
+}
+
 // sanitizeURL will cleanup a host string in the format hostname[:port] and
 // sanitizeURL will cleanup a host string in the format hostname[:port] and
 // attach a schema.
 // attach a schema.
 func sanitizeURL(host string, defaultScheme string) (string, error) {
 func sanitizeURL(host string, defaultScheme string) (string, error) {

+ 7 - 1
server/peer_server.go

@@ -15,6 +15,7 @@ import (
 
 
 	etcdErr "github.com/coreos/etcd/error"
 	etcdErr "github.com/coreos/etcd/error"
 	"github.com/coreos/etcd/log"
 	"github.com/coreos/etcd/log"
+	"github.com/coreos/etcd/metrics"
 	"github.com/coreos/etcd/store"
 	"github.com/coreos/etcd/store"
 	"github.com/coreos/raft"
 	"github.com/coreos/raft"
 	"github.com/gorilla/mux"
 	"github.com/gorilla/mux"
@@ -47,6 +48,8 @@ type PeerServer struct {
 
 
 	closeChan            chan bool
 	closeChan            chan bool
 	timeoutThresholdChan chan interface{}
 	timeoutThresholdChan chan interface{}
+
+	metrics *metrics.Bucket
 }
 }
 
 
 // TODO: find a good policy to do snapshot
 // TODO: find a good policy to do snapshot
@@ -62,7 +65,8 @@ type snapshotConf struct {
 	snapshotThr uint64
 	snapshotThr uint64
 }
 }
 
 
-func NewPeerServer(name string, path string, url string, bindAddr string, tlsConf *TLSConfig, tlsInfo *TLSInfo, registry *Registry, store store.Store, snapshotCount int, heartbeatTimeout, electionTimeout time.Duration) *PeerServer {
+func NewPeerServer(name string, path string, url string, bindAddr string, tlsConf *TLSConfig, tlsInfo *TLSInfo, registry *Registry, store store.Store, snapshotCount int, heartbeatTimeout, electionTimeout time.Duration, mb *metrics.Bucket) *PeerServer {
+
 	s := &PeerServer{
 	s := &PeerServer{
 		name:     name,
 		name:     name,
 		url:      url,
 		url:      url,
@@ -89,6 +93,8 @@ func NewPeerServer(name string, path string, url string, bindAddr string, tlsCon
 		ElectionTimeout:  electionTimeout,
 		ElectionTimeout:  electionTimeout,
 
 
 		timeoutThresholdChan: make(chan interface{}, 1),
 		timeoutThresholdChan: make(chan interface{}, 1),
+
+		metrics: mb,
 	}
 	}
 
 
 	// Create transporter for raft
 	// Create transporter for raft

+ 4 - 0
server/peer_server_handlers.go

@@ -4,6 +4,7 @@ import (
 	"encoding/json"
 	"encoding/json"
 	"net/http"
 	"net/http"
 	"strconv"
 	"strconv"
+	"time"
 
 
 	etcdErr "github.com/coreos/etcd/error"
 	etcdErr "github.com/coreos/etcd/error"
 	"github.com/coreos/etcd/log"
 	"github.com/coreos/etcd/log"
@@ -49,6 +50,7 @@ func (ps *PeerServer) VoteHttpHandler(w http.ResponseWriter, req *http.Request)
 
 
 // Response to append entries request
 // Response to append entries request
 func (ps *PeerServer) AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) {
 func (ps *PeerServer) AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) {
+	start := time.Now()
 	aereq := &raft.AppendEntriesRequest{}
 	aereq := &raft.AppendEntriesRequest{}
 
 
 	if _, err := aereq.Decode(req.Body); err != nil {
 	if _, err := aereq.Decode(req.Body); err != nil {
@@ -78,6 +80,8 @@ func (ps *PeerServer) AppendEntriesHttpHandler(w http.ResponseWriter, req *http.
 		http.Error(w, "", http.StatusInternalServerError)
 		http.Error(w, "", http.StatusInternalServerError)
 		return
 		return
 	}
 	}
+
+	(*ps.metrics).Timer("timer.appendentries.handle").UpdateSince(start)
 }
 }
 
 
 // Response to recover from snapshot request
 // Response to recover from snapshot request

+ 15 - 2
server/server.go

@@ -12,6 +12,7 @@ import (
 
 
 	etcdErr "github.com/coreos/etcd/error"
 	etcdErr "github.com/coreos/etcd/error"
 	"github.com/coreos/etcd/log"
 	"github.com/coreos/etcd/log"
+	"github.com/coreos/etcd/metrics"
 	"github.com/coreos/etcd/mod"
 	"github.com/coreos/etcd/mod"
 	"github.com/coreos/etcd/server/v1"
 	"github.com/coreos/etcd/server/v1"
 	"github.com/coreos/etcd/server/v2"
 	"github.com/coreos/etcd/server/v2"
@@ -34,10 +35,11 @@ type Server struct {
 	tlsInfo     *TLSInfo
 	tlsInfo     *TLSInfo
 	router      *mux.Router
 	router      *mux.Router
 	corsHandler *corsHandler
 	corsHandler *corsHandler
+	metrics     *metrics.Bucket
 }
 }
 
 
 // Creates a new Server.
 // Creates a new Server.
-func New(name string, urlStr string, bindAddr string, tlsConf *TLSConfig, tlsInfo *TLSInfo, peerServer *PeerServer, registry *Registry, store store.Store) *Server {
+func New(name string, urlStr string, bindAddr string, tlsConf *TLSConfig, tlsInfo *TLSInfo, peerServer *PeerServer, registry *Registry, store store.Store, mb *metrics.Bucket) *Server {
 	r := mux.NewRouter()
 	r := mux.NewRouter()
 	cors := &corsHandler{router: r}
 	cors := &corsHandler{router: r}
 
 
@@ -56,6 +58,7 @@ func New(name string, urlStr string, bindAddr string, tlsConf *TLSConfig, tlsInf
 		peerServer:  peerServer,
 		peerServer:  peerServer,
 		router:      r,
 		router:      r,
 		corsHandler: cors,
 		corsHandler: cors,
+		metrics:     mb,
 	}
 	}
 
 
 	// Install the routes.
 	// Install the routes.
@@ -63,11 +66,14 @@ func New(name string, urlStr string, bindAddr string, tlsConf *TLSConfig, tlsInf
 	s.installV1()
 	s.installV1()
 	s.installV2()
 	s.installV2()
 	s.installMod()
 	s.installMod()
-	s.installDebug()
 
 
 	return s
 	return s
 }
 }
 
 
+func (s *Server) EnableTracing() {
+	s.installDebug()
+}
+
 // The current state of the server in the cluster.
 // The current state of the server in the cluster.
 func (s *Server) State() string {
 func (s *Server) State() string {
 	return s.peerServer.RaftServer().State()
 	return s.peerServer.RaftServer().State()
@@ -141,6 +147,7 @@ func (s *Server) installMod() {
 }
 }
 
 
 func (s *Server) installDebug() {
 func (s *Server) installDebug() {
+	s.handleFunc("/debug/metrics", s.GetMetricsHandler).Methods("GET")
 	s.router.HandleFunc("/debug/pprof", pprof.Index)
 	s.router.HandleFunc("/debug/pprof", pprof.Index)
 	s.router.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
 	s.router.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
 	s.router.HandleFunc("/debug/pprof/profile", pprof.Profile)
 	s.router.HandleFunc("/debug/pprof/profile", pprof.Profile)
@@ -402,3 +409,9 @@ func (s *Server) SpeedTestHandler(w http.ResponseWriter, req *http.Request) erro
 	w.Write([]byte("speed test success"))
 	w.Write([]byte("speed test success"))
 	return nil
 	return nil
 }
 }
+
+// Retrieves metrics from bucket
+func (s *Server) GetMetricsHandler(w http.ResponseWriter, req *http.Request) error {
+	(*s.metrics).Dump(w)
+	return nil
+}

+ 2 - 2
tests/server_utils.go

@@ -26,9 +26,9 @@ func RunServer(f func(*server.Server)) {
 	store := store.New()
 	store := store.New()
 	registry := server.NewRegistry(store)
 	registry := server.NewRegistry(store)
 
 
-	ps := server.NewPeerServer(testName, path, "http://"+testRaftURL, testRaftURL, &server.TLSConfig{Scheme: "http"}, &server.TLSInfo{}, registry, store, testSnapshotCount, testHeartbeatTimeout, testElectionTimeout)
+	ps := server.NewPeerServer(testName, path, "http://"+testRaftURL, testRaftURL, &server.TLSConfig{Scheme: "http"}, &server.TLSInfo{}, registry, store, testSnapshotCount, testHeartbeatTimeout, testElectionTimeout, nil)
 	ps.MaxClusterSize = 9
 	ps.MaxClusterSize = 9
-	s := server.New(testName, "http://"+testClientURL, testClientURL, &server.TLSConfig{Scheme: "http"}, &server.TLSInfo{}, ps, registry, store)
+	s := server.New(testName, "http://"+testClientURL, testClientURL, &server.TLSConfig{Scheme: "http"}, &server.TLSInfo{}, ps, registry, store, nil)
 	ps.SetServer(s)
 	ps.SetServer(s)
 
 
 	// Start up peer server.
 	// Start up peer server.