Переглянути джерело

Merge pull request #8282 from gyuho/metrics-port

*: serve '/metrics' in insecure port
Gyu-Ho Lee 8 роки тому
батько
коміт
ff7a021c8f

+ 10 - 0
e2e/cluster_proxy_test.go

@@ -54,6 +54,9 @@ func (p *proxyEtcdProcess) Config() *etcdServerProcessConfig { return p.etcdProc
 
 func (p *proxyEtcdProcess) EndpointsV2() []string { return p.proxyV2.endpoints() }
 func (p *proxyEtcdProcess) EndpointsV3() []string { return p.proxyV3.endpoints() }
+func (p *proxyEtcdProcess) EndpointsMetrics() []string {
+	panic("not implemented; proxy doesn't provide health information")
+}
 
 func (p *proxyEtcdProcess) Start() error {
 	if err := p.etcdProc.Start(); err != nil {
@@ -113,6 +116,7 @@ type proxyProc struct {
 	execPath string
 	args     []string
 	ep       string
+	murl     string
 	donec    chan struct{}
 
 	proc *expect.ExpectProcess
@@ -232,6 +236,11 @@ func newProxyV3Proc(cfg *etcdServerProcessConfig) *proxyV3Proc {
 		// pass-through member RPCs
 		"--advertise-client-url", "",
 	}
+	murl := ""
+	if cfg.murl != "" {
+		murl = proxyListenURL(cfg, 4)
+		args = append(args, "--metrics-addr", murl)
+	}
 	tlsArgs := []string{}
 	for i := 0; i < len(cfg.tlsArgs); i++ {
 		switch cfg.tlsArgs[i] {
@@ -258,6 +267,7 @@ func newProxyV3Proc(cfg *etcdServerProcessConfig) *proxyV3Proc {
 			execPath: cfg.execPath,
 			args:     append(args, tlsArgs...),
 			ep:       listenAddr,
+			murl:     murl,
 			donec:    make(chan struct{}),
 		},
 	}

+ 12 - 1
e2e/cluster_test.go

@@ -101,6 +101,8 @@ type etcdProcessClusterConfig struct {
 	baseScheme string
 	basePort   int
 
+	metricsURLScheme string
+
 	snapCount int // default is 10000
 
 	clientTLS             clientConnType
@@ -175,7 +177,7 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerPro
 	for i := 0; i < cfg.clusterSize; i++ {
 		var curls []string
 		var curl, curltls string
-		port := cfg.basePort + 4*i
+		port := cfg.basePort + 5*i
 		curlHost := fmt.Sprintf("localhost:%d", port)
 
 		switch cfg.clientTLS {
@@ -221,6 +223,14 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerPro
 		if cfg.noStrictReconfig {
 			args = append(args, "--strict-reconfig-check=false")
 		}
+		var murl string
+		if cfg.metricsURLScheme != "" {
+			murl = (&url.URL{
+				Scheme: cfg.metricsURLScheme,
+				Host:   fmt.Sprintf("localhost:%d", port+2),
+			}).String()
+			args = append(args, "--listen-metrics-urls", murl)
+		}
 
 		args = append(args, cfg.tlsArgs()...)
 		etcdCfgs[i] = &etcdServerProcessConfig{
@@ -232,6 +242,7 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerPro
 			name:         name,
 			purl:         purl,
 			acurl:        curl,
+			murl:         murl,
 			initialToken: cfg.initialToken,
 		}
 	}

+ 5 - 2
e2e/etcd_process.go

@@ -29,6 +29,7 @@ var etcdServerReadyLines = []string{"enabled capabilities for version", "publish
 type etcdProcess interface {
 	EndpointsV2() []string
 	EndpointsV3() []string
+	EndpointsMetrics() []string
 
 	Start() error
 	Restart() error
@@ -57,6 +58,7 @@ type etcdServerProcessConfig struct {
 	purl url.URL
 
 	acurl string
+	murl  string
 
 	initialToken   string
 	initialCluster string
@@ -74,8 +76,9 @@ func newEtcdServerProcess(cfg *etcdServerProcessConfig) (*etcdServerProcess, err
 	return &etcdServerProcess{cfg: cfg, donec: make(chan struct{})}, nil
 }
 
-func (ep *etcdServerProcess) EndpointsV2() []string { return []string{ep.cfg.acurl} }
-func (ep *etcdServerProcess) EndpointsV3() []string { return ep.EndpointsV2() }
+func (ep *etcdServerProcess) EndpointsV2() []string      { return []string{ep.cfg.acurl} }
+func (ep *etcdServerProcess) EndpointsV3() []string      { return ep.EndpointsV2() }
+func (ep *etcdServerProcess) EndpointsMetrics() []string { return []string{ep.cfg.murl} }
 
 func (ep *etcdServerProcess) Start() error {
 	if ep.proc != nil {

+ 47 - 0
e2e/metrics_test.go

@@ -0,0 +1,47 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// +build !cluster_proxy
+
+package e2e
+
+import (
+	"testing"
+)
+
+func TestV3MetricsSecure(t *testing.T) {
+	cfg := configTLS
+	cfg.clusterSize = 1
+	cfg.metricsURLScheme = "https"
+	testCtl(t, metricsTest)
+}
+
+func TestV3MetricsInsecure(t *testing.T) {
+	cfg := configTLS
+	cfg.clusterSize = 1
+	cfg.metricsURLScheme = "http"
+	testCtl(t, metricsTest)
+}
+
+func metricsTest(cx ctlCtx) {
+	if err := ctlV3Put(cx, "k", "v", ""); err != nil {
+		cx.t.Fatal(err)
+	}
+	if err := cURLGet(cx.epc, cURLReq{endpoint: "/metrics", expected: `etcd_debugging_mvcc_keys_total 1`, metricsURLScheme: cx.cfg.metricsURLScheme}); err != nil {
+		cx.t.Fatalf("failed get with curl (%v)", err)
+	}
+	if err := cURLGet(cx.epc, cURLReq{endpoint: "/health", expected: `{"health": "true"}`, metricsURLScheme: cx.cfg.metricsURLScheme}); err != nil {
+		cx.t.Fatalf("failed get with curl (%v)", err)
+	}
+}

+ 14 - 7
e2e/v2_curl_test.go

@@ -125,6 +125,8 @@ type cURLReq struct {
 	value    string
 	expected string
 	header   string
+
+	metricsURLScheme string
 }
 
 // cURLPrefixArgs builds the beginning of a curl command for a given key
@@ -134,14 +136,19 @@ func cURLPrefixArgs(clus *etcdProcessCluster, method string, req cURLReq) []stri
 		cmdArgs = []string{"curl"}
 		acurl   = clus.procs[rand.Intn(clus.cfg.clusterSize)].Config().acurl
 	)
-	if req.isTLS {
-		if clus.cfg.clientTLS != clientTLSAndNonTLS {
-			panic("should not use cURLPrefixArgsUseTLS when serving only TLS or non-TLS")
+	if req.metricsURLScheme != "https" {
+		if req.isTLS {
+			if clus.cfg.clientTLS != clientTLSAndNonTLS {
+				panic("should not use cURLPrefixArgsUseTLS when serving only TLS or non-TLS")
+			}
+			cmdArgs = append(cmdArgs, "--cacert", caPath, "--cert", certPath, "--key", privateKeyPath)
+			acurl = toTLS(clus.procs[rand.Intn(clus.cfg.clusterSize)].Config().acurl)
+		} else if clus.cfg.clientTLS == clientTLS {
+			cmdArgs = append(cmdArgs, "--cacert", caPath, "--cert", certPath, "--key", privateKeyPath)
 		}
-		cmdArgs = append(cmdArgs, "--cacert", caPath, "--cert", certPath, "--key", privateKeyPath)
-		acurl = toTLS(clus.procs[rand.Intn(clus.cfg.clusterSize)].Config().acurl)
-	} else if clus.cfg.clientTLS == clientTLS {
-		cmdArgs = append(cmdArgs, "--cacert", caPath, "--cert", certPath, "--key", privateKeyPath)
+	}
+	if req.metricsURLScheme != "" {
+		acurl = clus.procs[rand.Intn(clus.cfg.clusterSize)].EndpointsMetrics()[0]
 	}
 	ep := acurl + req.endpoint
 

+ 6 - 4
embed/etcd.go

@@ -35,7 +35,6 @@ import (
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/rafthttp"
 	"github.com/coreos/pkg/capnslog"
-	"github.com/prometheus/client_golang/prometheus"
 )
 
 var plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "embed")
@@ -405,12 +404,15 @@ func (e *Etcd) serve() (err error) {
 	}
 
 	if len(e.cfg.ListenMetricsUrls) > 0 {
-		// TODO: maybe etcdhttp.MetricsPath or get the path from the user-provided URL
 		metricsMux := http.NewServeMux()
-		metricsMux.Handle("/metrics", prometheus.Handler())
+		etcdhttp.HandleMetricsHealth(metricsMux, e.Server)
 
 		for _, murl := range e.cfg.ListenMetricsUrls {
-			ml, err := transport.NewListener(murl.Host, murl.Scheme, &e.cfg.ClientTLSInfo)
+			tlsInfo := &e.cfg.ClientTLSInfo
+			if murl.Scheme == "http" {
+				tlsInfo = nil
+			}
+			ml, err := transport.NewListener(murl.Host, murl.Scheme, tlsInfo)
 			if err != nil {
 				return err
 			}

+ 2 - 2
etcdmain/etcd.go

@@ -30,6 +30,7 @@ import (
 	"github.com/coreos/etcd/discovery"
 	"github.com/coreos/etcd/embed"
 	"github.com/coreos/etcd/etcdserver"
+	"github.com/coreos/etcd/etcdserver/api/etcdhttp"
 	"github.com/coreos/etcd/pkg/cors"
 	"github.com/coreos/etcd/pkg/fileutil"
 	pkgioutil "github.com/coreos/etcd/pkg/ioutil"
@@ -40,7 +41,6 @@ import (
 	"github.com/coreos/etcd/version"
 	"github.com/coreos/pkg/capnslog"
 	"github.com/grpc-ecosystem/go-grpc-prometheus"
-	"github.com/prometheus/client_golang/prometheus"
 	"google.golang.org/grpc"
 )
 
@@ -344,7 +344,7 @@ func startProxy(cfg *config) error {
 		go func() {
 			plog.Info("proxy: listening for client requests on ", host)
 			mux := http.NewServeMux()
-			mux.Handle("/metrics", prometheus.Handler()) // v2 proxy just uses the same port
+			etcdhttp.HandlePrometheus(mux) // v2 proxy just uses the same port
 			mux.Handle("/", ph)
 			plog.Fatal(http.Serve(l, mux))
 		}()

+ 3 - 3
etcdmain/grpc_proxy.go

@@ -26,6 +26,7 @@ import (
 
 	"github.com/coreos/etcd/clientv3"
 	"github.com/coreos/etcd/clientv3/namespace"
+	"github.com/coreos/etcd/etcdserver/api/etcdhttp"
 	"github.com/coreos/etcd/etcdserver/api/v3election/v3electionpb"
 	"github.com/coreos/etcd/etcdserver/api/v3lock/v3lockpb"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
@@ -35,7 +36,6 @@ import (
 
 	"github.com/cockroachdb/cmux"
 	grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
-	"github.com/prometheus/client_golang/prometheus"
 	"github.com/spf13/cobra"
 	"google.golang.org/grpc"
 )
@@ -157,7 +157,7 @@ func startGRPCProxy(cmd *cobra.Command, args []string) {
 		mhttpl := mustMetricsListener(tlsinfo)
 		go func() {
 			mux := http.NewServeMux()
-			mux.Handle("/metrics", prometheus.Handler())
+			etcdhttp.HandlePrometheus(mux)
 			plog.Fatal(http.Serve(mhttpl, mux))
 		}()
 	}
@@ -293,7 +293,7 @@ func newGRPCProxyServer(client *clientv3.Client) *grpc.Server {
 func mustHTTPListener(m cmux.CMux, tlsinfo *transport.TLSInfo) (*http.Server, net.Listener) {
 	httpmux := http.NewServeMux()
 	httpmux.HandleFunc("/", http.NotFound)
-	httpmux.Handle("/metrics", prometheus.Handler())
+	etcdhttp.HandlePrometheus(httpmux)
 	if grpcProxyEnablePprof {
 		for p, h := range debugutil.PProfHandlers() {
 			httpmux.Handle(p, h)

+ 1 - 33
etcdserver/api/etcdhttp/base.go

@@ -20,19 +20,14 @@ import (
 	"fmt"
 	"net/http"
 	"strings"
-	"time"
 
 	etcdErr "github.com/coreos/etcd/error"
 	"github.com/coreos/etcd/etcdserver"
 	"github.com/coreos/etcd/etcdserver/api"
 	"github.com/coreos/etcd/etcdserver/api/v2http/httptypes"
-	"github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/pkg/logutil"
-	"github.com/coreos/etcd/raft"
 	"github.com/coreos/etcd/version"
 	"github.com/coreos/pkg/capnslog"
-	"github.com/prometheus/client_golang/prometheus"
-	"golang.org/x/net/context"
 )
 
 var (
@@ -42,8 +37,6 @@ var (
 
 const (
 	configPath  = "/config"
-	metricsPath = "/metrics"
-	healthPath  = "/health"
 	varsPath    = "/debug/vars"
 	versionPath = "/version"
 )
@@ -53,35 +46,10 @@ const (
 func HandleBasic(mux *http.ServeMux, server *etcdserver.EtcdServer) {
 	mux.HandleFunc(varsPath, serveVars)
 	mux.HandleFunc(configPath+"/local/log", logHandleFunc)
-	mux.Handle(metricsPath, prometheus.Handler())
-	mux.Handle(healthPath, healthHandler(server))
+	HandleMetricsHealth(mux, server)
 	mux.HandleFunc(versionPath, versionHandler(server.Cluster(), serveVersion))
 }
 
-func healthHandler(server *etcdserver.EtcdServer) http.HandlerFunc {
-	return func(w http.ResponseWriter, r *http.Request) {
-		if !allowMethod(w, r, "GET") {
-			return
-		}
-		if uint64(server.Leader()) == raft.None {
-			http.Error(w, `{"health": "false"}`, http.StatusServiceUnavailable)
-			return
-		}
-		if len(server.Alarms()) > 0 {
-			w.Write([]byte(`{"health": "false"}`))
-			return
-		}
-		ctx, cancel := context.WithTimeout(context.Background(), time.Second)
-		defer cancel()
-		if _, err := server.Do(ctx, etcdserverpb.Request{Method: "QGET"}); err != nil {
-			http.Error(w, `{"health": "false"}`, http.StatusServiceUnavailable)
-			return
-		}
-		w.WriteHeader(http.StatusOK)
-		w.Write([]byte(`{"health": "true"}`))
-	}
-}
-
 func versionHandler(c api.Cluster, fn func(http.ResponseWriter, *http.Request, string)) http.HandlerFunc {
 	return func(w http.ResponseWriter, r *http.Request) {
 		v := c.Version()

+ 91 - 0
etcdserver/api/etcdhttp/metrics.go

@@ -0,0 +1,91 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package etcdhttp
+
+import (
+	"context"
+	"fmt"
+	"net/http"
+	"time"
+
+	"github.com/coreos/etcd/etcdserver"
+	"github.com/coreos/etcd/etcdserver/etcdserverpb"
+	"github.com/coreos/etcd/raft"
+
+	"github.com/prometheus/client_golang/prometheus"
+)
+
+const (
+	pathMetrics = "/metrics"
+	pathHealth  = "/health"
+)
+
+// HandleMetricsHealth registers metrics and health handlers.
+func HandleMetricsHealth(mux *http.ServeMux, srv *etcdserver.EtcdServer) {
+	mux.Handle(pathMetrics, prometheus.Handler())
+	mux.Handle(pathHealth, newHealthHandler(srv))
+}
+
+// HandlePrometheus registers prometheus handler on '/metrics'.
+func HandlePrometheus(mux *http.ServeMux) {
+	mux.Handle(pathMetrics, prometheus.Handler())
+}
+
+// HandleHealth registers health handler on '/health'.
+func HandleHealth(mux *http.ServeMux, srv *etcdserver.EtcdServer) {
+	mux.Handle(pathHealth, newHealthHandler(srv))
+}
+
+// newHealthHandler handles '/health' requests.
+func newHealthHandler(srv *etcdserver.EtcdServer) http.HandlerFunc {
+	return func(w http.ResponseWriter, r *http.Request) {
+		if r.Method != http.MethodGet {
+			w.Header().Set("Allow", http.MethodGet)
+			http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
+			return
+		}
+		h := checkHealth(srv)
+		d := []byte(fmt.Sprintf(`{"health": "%v"}`, h.Health))
+		if !h.Health {
+			http.Error(w, string(d), http.StatusServiceUnavailable)
+			return
+		}
+		w.WriteHeader(http.StatusOK)
+		w.Write(d)
+	}
+}
+
+type health struct {
+	Health bool `json:"health"`
+}
+
+func checkHealth(srv *etcdserver.EtcdServer) health {
+	h := health{Health: false}
+	if len(srv.Alarms()) > 0 {
+		// TODO: provide alarm lists
+		return h
+	}
+
+	if uint64(srv.Leader()) == raft.None {
+		return h
+	}
+
+	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
+	_, err := srv.Do(ctx, etcdserverpb.Request{Method: "QGET"})
+	cancel()
+
+	h.Health = err == nil
+	return h
+}