| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229 |
- // Copyright 2015 The etcd Authors
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package main
- import (
- "bufio"
- "bytes"
- "flag"
- "fmt"
- "math"
- "net/http"
- "os"
- "runtime"
- "sort"
- "strconv"
- "strings"
- "time"
- "github.com/akrennmair/gopcap"
- "github.com/spacejam/loghisto"
- )
- type nameSum struct {
- Name string
- Sum float64
- Rate float64
- }
- type nameSums []nameSum
- func (n nameSums) Len() int {
- return len(n)
- }
- func (n nameSums) Less(i, j int) bool {
- return n[i].Sum > n[j].Sum
- }
- func (n nameSums) Swap(i, j int) {
- n[i], n[j] = n[j], n[i]
- }
- // This function listens for periodic metrics from the loghisto metric system,
- // and upon receipt of a batch of them it will print out the desired topK.
- func statPrinter(metricStream chan *loghisto.ProcessedMetricSet, topK, period uint) {
- for m := range metricStream {
- requestCounter := float64(0)
- nvs := nameSums{}
- for k, v := range m.Metrics {
- // loghisto adds _rate suffixed metrics for counters and histograms
- if strings.HasSuffix(k, "_rate") && !strings.HasSuffix(k, "_rate_rate") {
- continue
- }
- nvs = append(nvs, nameSum{
- Name: k,
- Sum: v,
- Rate: m.Metrics[k+"_rate"],
- })
- requestCounter += m.Metrics[k+"_rate"]
- }
- fmt.Printf("\n%d sniffed %d requests over last %d seconds\n\n", time.Now().Unix(),
- uint(requestCounter), period)
- if len(nvs) == 0 {
- continue
- }
- sort.Sort(nvs)
- fmt.Printf("Top %d most popular http requests:\n", topK)
- fmt.Println("Total Sum Period Sum Verb Path")
- for _, nv := range nvs[0:int(math.Min(float64(len(nvs)), float64(topK)))] {
- fmt.Printf("%9.1d %7.1d %s\n", int(nv.Sum), int(nv.Rate), nv.Name)
- }
- }
- }
- // packetDecoder decodes packets and hands them off to the streamRouter
- func packetDecoder(packetsIn chan *pcap.Packet, packetsOut chan *pcap.Packet) {
- for pkt := range packetsIn {
- pkt.Decode()
- select {
- case packetsOut <- pkt:
- default:
- fmt.Fprint(os.Stderr, "shedding at decoder!")
- }
- }
- }
- // processor tries to parse an http request from each packet, and if
- // successful it records metrics about it in the loghisto metric system.
- func processor(ms *loghisto.MetricSystem, packetsIn chan *pcap.Packet) {
- for pkt := range packetsIn {
- req, reqErr := http.ReadRequest(bufio.NewReader(bytes.NewReader(pkt.Payload)))
- if reqErr == nil {
- ms.Counter(req.Method+" "+req.URL.Path, 1)
- }
- }
- }
- // streamRouter takes a decoded packet and routes it to a processor that can deal with all requests
- // and responses for this particular TCP connection. This allows the processor to own a local map
- // of requests so that it can avoid coordinating with other goroutines to perform analysis.
- func streamRouter(ports []uint16, parsedPackets chan *pcap.Packet, processors []chan *pcap.Packet) {
- for pkt := range parsedPackets {
- if pkt.TCP == nil {
- continue
- }
- clientPort := uint16(0)
- for _, p := range ports {
- if pkt.TCP.SrcPort == p {
- clientPort = pkt.TCP.DestPort
- break
- }
- if pkt.TCP.DestPort == p {
- clientPort = pkt.TCP.SrcPort
- break
- }
- }
- if clientPort != 0 {
- // client Port can be assumed to have sufficient entropy for
- // distribution among processors, and we want the same
- // tcp stream to go to the same processor every time
- // so that if we do proper packet reconstruction it will
- // be easier.
- select {
- case processors[int(clientPort)%len(processors)] <- pkt:
- default:
- fmt.Fprint(os.Stderr, "Shedding load at router!")
- }
- }
- }
- }
- // 1. parse args
- // 2. start the loghisto metric system
- // 3. start the processing and printing goroutines
- // 4. open the pcap handler
- // 5. hand off packets from the handler to the decoder
- func main() {
- portsArg := flag.String("ports", "2379", "etcd listening ports")
- iface := flag.String("iface", "eth0", "interface for sniffing traffic on")
- promisc := flag.Bool("promiscuous", true, "promiscuous mode")
- period := flag.Uint("period", 1, "seconds between submissions")
- topK := flag.Uint("topk", 10, "submit stats for the top <K> sniffed paths")
- flag.Parse()
- numCPU := runtime.NumCPU()
- runtime.GOMAXPROCS(numCPU)
- ms := loghisto.NewMetricSystem(time.Duration(*period)*time.Second, false)
- ms.Start()
- metricStream := make(chan *loghisto.ProcessedMetricSet, 2)
- ms.SubscribeToProcessedMetrics(metricStream)
- defer ms.UnsubscribeFromProcessedMetrics(metricStream)
- go statPrinter(metricStream, *topK, *period)
- ports := []uint16{}
- for _, p := range strings.Split(*portsArg, ",") {
- port, err := strconv.Atoi(p)
- if err == nil {
- ports = append(ports, uint16(port))
- } else {
- fmt.Fprintf(os.Stderr, "Failed to parse port \"%s\": %v\n", p, err)
- os.Exit(1)
- }
- }
- if len(ports) == 0 {
- fmt.Fprint(os.Stderr, "No ports given! Exiting.\n")
- os.Exit(1)
- }
- // We choose 1518 for the snaplen because it's the default
- // ethernet MTU at the link layer. We choose 1000 for the
- // timeout based on a measurement for its impact on latency
- // impact, but it is less precise.
- h, err := pcap.Openlive(*iface, 1518, *promisc, 1000)
- if err != nil {
- fmt.Fprintf(os.Stderr, "%v", err)
- os.Exit(1)
- }
- defer h.Close()
- portArray := strings.Split(*portsArg, ",")
- dst := strings.Join(portArray, " or dst port ")
- src := strings.Join(portArray, " or src port ")
- filter := fmt.Sprintf("tcp and (dst port %s or src port %s)", dst, src)
- fmt.Println("using bpf filter: ", filter)
- if err := h.Setfilter(filter); err != nil {
- fmt.Fprintf(os.Stderr, "%v", err)
- os.Exit(1)
- }
- unparsedPackets := make(chan *pcap.Packet, 16384)
- parsedPackets := make(chan *pcap.Packet, 16384)
- for i := 0; i < int(math.Max(2, float64(numCPU/4))); i++ {
- go packetDecoder(unparsedPackets, parsedPackets)
- }
- processors := []chan *pcap.Packet{}
- for i := 0; i < int(math.Max(2, float64(numCPU/4))); i++ {
- p := make(chan *pcap.Packet, 16384)
- processors = append(processors, p)
- go processor(ms, p)
- }
- go streamRouter(ports, parsedPackets, processors)
- for {
- pkt := h.Next()
- if pkt != nil {
- select {
- case unparsedPackets <- pkt:
- default:
- fmt.Fprint(os.Stderr, "SHEDDING IN MAIN")
- }
- }
- }
- }
|