Browse Source

tools: add etcd-top

Tyler Neely 10 years ago
parent
commit
00557e96af
2 changed files with 238 additions and 0 deletions
  1. 23 0
      tools/etcd-top/README.md
  2. 215 0
      tools/etcd-top/etcd-top.go

+ 23 - 0
tools/etcd-top/README.md

@@ -0,0 +1,23 @@
+# etcd-top
+etcd realtime workload analyzer.  Useful for rapid diagnosis of production usage issues and analysis of production request distributions.
+
+usage:
+```
+  -iface="eth0": interface for sniffing traffic on
+  -period=1: seconds between submissions
+  -ports="2379,4001": etcd listening ports
+  -promiscuous=true: whether to perform promiscuous sniffing or not.
+  -topk=10: submit stats for the top <K> sniffed paths
+```
+
+result:
+```
+go run etcd-top.go --period=1 -topk=3
+1440035702 sniffed 1074 requests over last 1 seconds
+
+Top 3 most popular http requests:
+     Sum     Rate Verb Path
+    1305       22 GET /v2/keys/c
+    1302        8 GET /v2/keys/S
+    1297       10 GET /v2/keys/h
+```

+ 215 - 0
tools/etcd-top/etcd-top.go

@@ -0,0 +1,215 @@
+package main
+
+import (
+	"bufio"
+	"bytes"
+	"flag"
+	"fmt"
+	"math"
+	"net/http"
+	"os"
+	"runtime"
+	"sort"
+	"strconv"
+	"strings"
+	"time"
+
+	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/akrennmair/gopcap"
+	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/spacejam/loghisto"
+)
+
+type nameSum struct {
+	Name string
+	Sum  float64
+	Rate float64
+}
+
+type nameSums []nameSum
+
+func (n nameSums) Len() int {
+	return len(n)
+}
+func (n nameSums) Less(i, j int) bool {
+	return n[i].Sum > n[j].Sum
+}
+func (n nameSums) Swap(i, j int) {
+	n[i], n[j] = n[j], n[i]
+}
+
+// This function listens for periodic metrics from the loghisto metric system,
+// and upon receipt of a batch of them it will print out the desired topK.
+func statPrinter(metricStream chan *loghisto.ProcessedMetricSet, topK, period uint) {
+	for m := range metricStream {
+		requestCounter := float64(0)
+		nvs := nameSums{}
+		for k, v := range m.Metrics {
+			// loghisto adds _rate suffixed metrics for counters and histograms
+			if strings.HasSuffix(k, "_rate") && !strings.HasSuffix(k, "_rate_rate") {
+				continue
+			}
+			nvs = append(nvs, nameSum{
+				Name: k,
+				Sum:  v,
+				Rate: m.Metrics[k+"_rate"],
+			})
+			requestCounter += m.Metrics[k+"_rate"]
+		}
+
+		fmt.Printf("\n%d sniffed %d requests over last %d seconds\n\n", time.Now().Unix(),
+			uint(requestCounter), period)
+		if len(nvs) == 0 {
+			continue
+		}
+		sort.Sort(nvs)
+		fmt.Printf("Top %d most popular http requests:\n", topK)
+		fmt.Println("Total Sum  Period Sum Verb Path")
+		for _, nv := range nvs[0:int(math.Min(float64(len(nvs)), float64(topK)))] {
+			fmt.Printf("%9.1d %7.1d %s\n", int(nv.Sum), int(nv.Rate), nv.Name)
+		}
+	}
+}
+
+// packetDecoder decodes packets and hands them off to the streamRouter
+func packetDecoder(packetsIn chan *pcap.Packet, packetsOut chan *pcap.Packet) {
+	for pkt := range packetsIn {
+		pkt.Decode()
+		select {
+		case packetsOut <- pkt:
+		default:
+			fmt.Fprint(os.Stderr, "shedding at decoder!")
+		}
+	}
+}
+
+// processor tries to parse an http request from each packet, and if
+// successful it records metrics about it in the loghisto metric system.
+func processor(ms *loghisto.MetricSystem, packetsIn chan *pcap.Packet) {
+	for pkt := range packetsIn {
+		req, reqErr := http.ReadRequest(bufio.NewReader(bytes.NewReader(pkt.Payload)))
+		if reqErr == nil {
+			ms.Counter(req.Method+" "+req.URL.Path, 1)
+		}
+	}
+}
+
+// streamRouter takes a decoded packet and routes it to a processor that can deal with all requests
+// and responses for this particular TCP connection.  This allows the processor to own a local map
+// of requests so that it can avoid coordinating with other goroutines to perform analysis.
+func streamRouter(ports []uint16, parsedPackets chan *pcap.Packet, processors []chan *pcap.Packet) {
+	for pkt := range parsedPackets {
+		if pkt.TCP == nil {
+			continue
+		}
+		clientPort := uint16(0)
+		for _, p := range ports {
+			if pkt.TCP.SrcPort == p {
+				clientPort = pkt.TCP.DestPort
+				break
+			}
+			if pkt.TCP.DestPort == p {
+				clientPort = pkt.TCP.SrcPort
+				break
+			}
+		}
+		if clientPort != 0 {
+			// client Port can be assumed to have sufficient entropy for
+			// distribution among processors, and we want the same
+			// tcp stream to go to the same processor every time
+			// so that if we do proper packet reconstruction it will
+			// be easier.
+			select {
+			case processors[int(clientPort)%len(processors)] <- pkt:
+			default:
+				fmt.Fprint(os.Stderr, "Shedding load at router!")
+			}
+		}
+	}
+}
+
+// 1. parse args
+// 2. start the loghisto metric system
+// 3. start the processing and printing goroutines
+// 4. open the pcap handler
+// 5. hand off packets from the handler to the decoder
+func main() {
+	portsArg := flag.String("ports", "2379,4001", "etcd listening ports")
+	iface := flag.String("iface", "eth0", "interface for sniffing traffic on")
+	promisc := flag.Bool("promiscuous", true, "promiscuous mode")
+	period := flag.Uint("period", 1, "seconds between submissions")
+	topK := flag.Uint("topk", 10, "submit stats for the top <K> sniffed paths")
+	flag.Parse()
+
+	numCPU := runtime.NumCPU()
+	runtime.GOMAXPROCS(numCPU)
+
+	ms := loghisto.NewMetricSystem(time.Duration(*period)*time.Second, false)
+	ms.Start()
+	metricStream := make(chan *loghisto.ProcessedMetricSet, 2)
+	ms.SubscribeToProcessedMetrics(metricStream)
+	defer ms.UnsubscribeFromProcessedMetrics(metricStream)
+
+	go statPrinter(metricStream, *topK, *period)
+
+	ports := []uint16{}
+	for _, p := range strings.Split(*portsArg, ",") {
+		p, err := strconv.Atoi(p)
+		if err == nil {
+			ports = append(ports, uint16(p))
+		} else {
+			fmt.Fprintf(os.Stderr, "Failed to parse port \"%s\": %v\n", p, err)
+			os.Exit(1)
+		}
+	}
+
+	if len(ports) == 0 {
+		fmt.Fprint(os.Stderr, "No ports given!  Exiting.\n")
+		os.Exit(1)
+	}
+
+	// We choose 1518 for the snaplen because it's the default
+	// ethernet MTU at the link layer.  We choose 1000 for the
+	// timeout based on a measurement for its impact on latency
+	// impact, but it is less precise.
+	h, err := pcap.Openlive(*iface, 1518, *promisc, 1000)
+	if err != nil {
+		fmt.Fprintf(os.Stderr, "%v", err)
+		os.Exit(1)
+	}
+	defer h.Close()
+
+	portArray := strings.Split(*portsArg, ",")
+	dst := strings.Join(portArray, " or dst port ")
+	src := strings.Join(portArray, " or src port ")
+	filter := fmt.Sprintf("tcp and (dst port %s or src port %s)", dst, src)
+	fmt.Println("using bpf filter: ", filter)
+	if err := h.Setfilter(filter); err != nil {
+		fmt.Fprintf(os.Stderr, "%v", err)
+		os.Exit(1)
+	}
+
+	unparsedPackets := make(chan *pcap.Packet, 16384)
+	parsedPackets := make(chan *pcap.Packet, 16384)
+	for i := 0; i < int(math.Max(2, float64(numCPU/4))); i++ {
+		go packetDecoder(unparsedPackets, parsedPackets)
+	}
+
+	processors := []chan *pcap.Packet{}
+	for i := 0; i < int(math.Max(2, float64(numCPU/4))); i++ {
+		p := make(chan *pcap.Packet, 16384)
+		processors = append(processors, p)
+		go processor(ms, p)
+	}
+
+	go streamRouter(ports, parsedPackets, processors)
+
+	for {
+		pkt := h.Next()
+		if pkt != nil {
+			select {
+			case unparsedPackets <- pkt:
+			default:
+				fmt.Fprint(os.Stderr, "SHEDDING IN MAIN")
+			}
+		}
+	}
+}