Browse Source

metrics: add /rafthttp/stream metrics

Yicheng Qin 11 years ago
parent
commit
99821579bf

+ 18 - 0
etcdserver/etcdhttp/client.go

@@ -19,6 +19,7 @@ package etcdhttp
 import (
 	"encoding/json"
 	"errors"
+	"expvar"
 	"fmt"
 	"io/ioutil"
 	"log"
@@ -47,6 +48,7 @@ const (
 	deprecatedMachinesPrefix = "/v2/machines"
 	membersPrefix            = "/v2/members"
 	statsPrefix              = "/v2/stats"
+	statsPath                = "/stats"
 	healthPath               = "/health"
 	versionPath              = "/version"
 )
@@ -83,6 +85,7 @@ func NewClientHandler(server *etcdserver.EtcdServer) http.Handler {
 	mux.HandleFunc(statsPrefix+"/store", sh.serveStore)
 	mux.HandleFunc(statsPrefix+"/self", sh.serveSelf)
 	mux.HandleFunc(statsPrefix+"/leader", sh.serveLeader)
+	mux.HandleFunc(statsPath, serveStats)
 	mux.Handle(membersPrefix, mh)
 	mux.Handle(membersPrefix+"/", mh)
 	mux.Handle(deprecatedMachinesPrefix, dmh)
@@ -284,6 +287,21 @@ func (h *statsHandler) serveLeader(w http.ResponseWriter, r *http.Request) {
 	w.Write(stats)
 }
 
+func serveStats(w http.ResponseWriter, r *http.Request) {
+	w.Header().Set("Content-Type", "application/json; charset=utf-8")
+	// TODO: getting one key or a prefix of keys based on path
+	fmt.Fprintf(w, "{\n")
+	first := true
+	expvar.Do(func(kv expvar.KeyValue) {
+		if !first {
+			fmt.Fprintf(w, ",\n")
+		}
+		first = false
+		fmt.Fprintf(w, "%q: %s", kv.Key, kv.Value)
+	})
+	fmt.Fprintf(w, "\n}\n")
+}
+
 // TODO: change etcdserver to raft interface when we have it.
 //       add test for healthHeadler when we have the interface ready.
 func healthHandler(server *etcdserver.EtcdServer) http.HandlerFunc {

+ 111 - 0
pkg/metrics/metrics.go

@@ -0,0 +1,111 @@
+/*
+   Copyright 2014 CoreOS, Inc.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+// Package metrics provides metrics view of variables which is exposed through
+// expvar package.
+//
+// Naming conventions:
+// 1. volatile path components should be kept as deep into the hierarchy as possible
+// 2. each path component should have a clear and well-defined purpose
+// 3. components.separated.with.dot, and put package prefix at the head
+// 4. words_separated_with_underscore, and put clarifiers last, e.g., requests_total
+package metrics
+
+import (
+	"bytes"
+	"expvar"
+	"fmt"
+)
+
+// Counter is a number that increases over time monotonically.
+type Counter struct{ i *expvar.Int }
+
+func NewCounter(name string) *Counter {
+	return &Counter{i: expvar.NewInt(name)}
+}
+
+func (c *Counter) Add() { c.i.Add(1) }
+
+func (c *Counter) AddBy(delta int64) { c.i.Add(delta) }
+
+func (c *Counter) String() string { return c.i.String() }
+
+// Gauge returns instantaneous value that is expected to fluctuate over time.
+type Gauge struct{ i *expvar.Int }
+
+func NewGauge(name string) *Gauge {
+	return &Gauge{i: expvar.NewInt(name)}
+}
+
+func (g *Gauge) Set(value int64) { g.i.Set(value) }
+
+func (g *Gauge) String() string { return g.i.String() }
+
+type nilVar struct{}
+
+func (v *nilVar) String() string { return "nil" }
+
+// Map aggregates Counters and Gauges.
+type Map struct{ *expvar.Map }
+
+func NewMap(name string) *Map {
+	return &Map{Map: expvar.NewMap(name)}
+}
+
+// GetMap returns the map if it exists, or inits the given name map if it does
+// not exist.
+func GetMap(name string) *Map {
+	if m, ok := expvar.Get(name).(*expvar.Map); ok {
+		return &Map{Map: m}
+	}
+	return NewMap(name)
+}
+
+func (m *Map) NewCounter(key string) *Counter {
+	c := &Counter{i: new(expvar.Int)}
+	m.Set(key, c)
+	return c
+}
+
+func (m *Map) NewGauge(key string) *Gauge {
+	g := &Gauge{i: new(expvar.Int)}
+	m.Set(key, g)
+	return g
+}
+
+// TODO: remove the var from the map to avoid memory boom
+func (m *Map) Delete(key string) { m.Set(key, &nilVar{}) }
+
+// String returns JSON format string that represents the group.
+// It does not print out nilVar.
+func (m *Map) String() string {
+	var b bytes.Buffer
+	fmt.Fprintf(&b, "{")
+	first := true
+	m.Do(func(kv expvar.KeyValue) {
+		v := kv.Value.String()
+		if v == "nil" {
+			return
+		}
+		if !first {
+			fmt.Fprintf(&b, ", ")
+		}
+		fmt.Fprintf(&b, "%q: %v", kv.Key, v)
+		first = false
+	})
+	fmt.Fprintf(&b, "}")
+	return b.String()
+}

+ 48 - 0
pkg/metrics/metrics_test.go

@@ -0,0 +1,48 @@
+/*
+   Copyright 2014 CoreOS, Inc.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+package metrics
+
+import (
+	"expvar"
+	"testing"
+)
+
+// TestMetrics tests the basic usage of metrics.
+func TestMetrics(t *testing.T) {
+	m := &Map{Map: new(expvar.Map).Init()}
+
+	c := m.NewCounter("number")
+	c.Add()
+	c.AddBy(10)
+	if w := "11"; c.String() != w {
+		t.Errorf("counter = %s, want %s", c, w)
+	}
+
+	g := m.NewGauge("price")
+	g.Set(100)
+	if w := "100"; g.String() != w {
+		t.Errorf("gauge = %s, want %s", g, w)
+	}
+
+	if w := `{"number": 11, "price": 100}`; m.String() != w {
+		t.Errorf("map = %s, want %s", m, w)
+	}
+	m.Delete("price")
+	if w := `{"number": 11}`; m.String() != w {
+		t.Errorf("map after deletion = %s, want %s", m, w)
+	}
+}

+ 27 - 1
rafthttp/entry_reader.go

@@ -20,11 +20,27 @@ import (
 	"encoding/binary"
 	"io"
 
+	"github.com/coreos/etcd/pkg/metrics"
+	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/raft/raftpb"
 )
 
 type entryReader struct {
-	r io.Reader
+	r         io.Reader
+	id        types.ID
+	ents      *metrics.Counter
+	bytes     *metrics.Counter
+	lastIndex *metrics.Gauge
+}
+
+func newEntryReader(r io.Reader, id types.ID) *entryReader {
+	return &entryReader{
+		r:         r,
+		id:        id,
+		ents:      metrics.GetMap("rafthttp.stream.entries_received").NewCounter(id.String()),
+		bytes:     metrics.GetMap("rafthttp.stream.bytes_received").NewCounter(id.String()),
+		lastIndex: metrics.GetMap("rafthttp.stream.last_index_received").NewGauge(id.String()),
+	}
 }
 
 func (er *entryReader) readEntries() ([]raftpb.Entry, error) {
@@ -32,12 +48,15 @@ func (er *entryReader) readEntries() ([]raftpb.Entry, error) {
 	if err := binary.Read(er.r, binary.BigEndian, &l); err != nil {
 		return nil, err
 	}
+	er.bytes.AddBy(8)
 	ents := make([]raftpb.Entry, int(l))
 	for i := 0; i < int(l); i++ {
 		if err := er.readEntry(&ents[i]); err != nil {
 			return nil, err
 		}
+		er.ents.Add()
 	}
+	er.lastIndex.Set(int64(ents[l-1].Index))
 	return ents, nil
 }
 
@@ -50,5 +69,12 @@ func (er *entryReader) readEntry(ent *raftpb.Entry) error {
 	if _, err := io.ReadFull(er.r, buf); err != nil {
 		return err
 	}
+	er.bytes.AddBy(8 + int64(l))
 	return ent.Unmarshal(buf)
 }
+
+func (er *entryReader) stop() {
+	metrics.GetMap("rafthttp.stream.entries_received").Delete(er.id.String())
+	metrics.GetMap("rafthttp.stream.bytes_received").Delete(er.id.String())
+	metrics.GetMap("rafthttp.stream.last_index_received").Delete(er.id.String())
+}

+ 3 - 2
rafthttp/entry_test.go

@@ -21,6 +21,7 @@ import (
 	"reflect"
 	"testing"
 
+	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/raft/raftpb"
 )
 
@@ -45,12 +46,12 @@ func TestEntsWriteAndRead(t *testing.T) {
 	}
 	for i, tt := range tests {
 		b := &bytes.Buffer{}
-		ew := &entryWriter{w: b}
+		ew := newEntryWriter(b, types.ID(1))
 		if err := ew.writeEntries(tt); err != nil {
 			t.Errorf("#%d: unexpected write ents error: %v", i, err)
 			continue
 		}
-		er := &entryReader{r: b}
+		er := newEntryReader(b, types.ID(1))
 		ents, err := er.readEntries()
 		if err != nil {
 			t.Errorf("#%d: unexpected read ents error: %v", i, err)

+ 31 - 1
rafthttp/entry_writer.go

@@ -20,23 +20,46 @@ import (
 	"encoding/binary"
 	"io"
 
+	"github.com/coreos/etcd/pkg/metrics"
+	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/raft/raftpb"
 )
 
 type entryWriter struct {
-	w io.Writer
+	w         io.Writer
+	id        types.ID
+	ents      *metrics.Counter
+	bytes     *metrics.Counter
+	lastIndex *metrics.Gauge
+}
+
+func newEntryWriter(w io.Writer, id types.ID) *entryWriter {
+	ew := &entryWriter{
+		w:         w,
+		id:        id,
+		ents:      metrics.GetMap("rafthttp.stream.entries_sent").NewCounter(id.String()),
+		bytes:     metrics.GetMap("rafthttp.stream.bytes_sent").NewCounter(id.String()),
+		lastIndex: metrics.GetMap("rafthttp.stream.last_index_sent").NewGauge(id.String()),
+	}
+	return ew
 }
 
 func (ew *entryWriter) writeEntries(ents []raftpb.Entry) error {
 	l := len(ents)
+	if l == 0 {
+		return nil
+	}
 	if err := binary.Write(ew.w, binary.BigEndian, uint64(l)); err != nil {
 		return err
 	}
+	ew.bytes.AddBy(8)
 	for i := 0; i < l; i++ {
 		if err := ew.writeEntry(&ents[i]); err != nil {
 			return err
 		}
+		ew.ents.Add()
 	}
+	ew.lastIndex.Set(int64(ents[l-1].Index))
 	return nil
 }
 
@@ -50,5 +73,12 @@ func (ew *entryWriter) writeEntry(ent *raftpb.Entry) error {
 		return err
 	}
 	_, err = ew.w.Write(b)
+	ew.bytes.AddBy(8 + int64(size))
 	return err
 }
+
+func (ew *entryWriter) stop() {
+	metrics.GetMap("rafthttp.stream.entries_sent").Delete(ew.id.String())
+	metrics.GetMap("rafthttp.stream.bytes_sent").Delete(ew.id.String())
+	metrics.GetMap("rafthttp.stream.last_index_sent").Delete(ew.id.String())
+}

+ 5 - 3
rafthttp/streamer.go

@@ -160,7 +160,7 @@ type streamWriter struct {
 	done chan struct{}
 }
 
-// newStreamServer starts and returns a new started stream server.
+// newStreamWriter starts and returns a new unstarted stream writer.
 // The caller should call stop when finished, to shut it down.
 func newStreamWriter(to types.ID, term uint64) *streamWriter {
 	s := &streamWriter{
@@ -193,7 +193,8 @@ func (s *streamWriter) handle(w WriteFlusher) {
 		log.Printf("rafthttp: server streaming to %s at term %d has been stopped", s.to, s.term)
 	}()
 
-	ew := &entryWriter{w: w}
+	ew := newEntryWriter(w, s.to)
+	defer ew.stop()
 	for ents := range s.q {
 		start := time.Now()
 		if err := ew.writeEntries(ents); err != nil {
@@ -280,7 +281,8 @@ func (s *streamReader) handle(r io.Reader) {
 		log.Printf("rafthttp: client streaming to %s at term %d has been stopped", s.to, s.term)
 	}()
 
-	er := &entryReader{r: r}
+	er := newEntryReader(r, s.to)
+	defer er.stop()
 	for {
 		ents, err := er.readEntries()
 		if err != nil {