etcd-top.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. // Copyright 2015 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package main
  15. import (
  16. "bufio"
  17. "bytes"
  18. "flag"
  19. "fmt"
  20. "math"
  21. "net/http"
  22. "os"
  23. "runtime"
  24. "sort"
  25. "strconv"
  26. "strings"
  27. "time"
  28. "github.com/akrennmair/gopcap"
  29. "github.com/spacejam/loghisto"
  30. )
  31. type nameSum struct {
  32. Name string
  33. Sum float64
  34. Rate float64
  35. }
  36. type nameSums []nameSum
  37. func (n nameSums) Len() int {
  38. return len(n)
  39. }
  40. func (n nameSums) Less(i, j int) bool {
  41. return n[i].Sum > n[j].Sum
  42. }
  43. func (n nameSums) Swap(i, j int) {
  44. n[i], n[j] = n[j], n[i]
  45. }
  46. // This function listens for periodic metrics from the loghisto metric system,
  47. // and upon receipt of a batch of them it will print out the desired topK.
  48. func statPrinter(metricStream chan *loghisto.ProcessedMetricSet, topK, period uint) {
  49. for m := range metricStream {
  50. requestCounter := float64(0)
  51. nvs := nameSums{}
  52. for k, v := range m.Metrics {
  53. // loghisto adds _rate suffixed metrics for counters and histograms
  54. if strings.HasSuffix(k, "_rate") && !strings.HasSuffix(k, "_rate_rate") {
  55. continue
  56. }
  57. nvs = append(nvs, nameSum{
  58. Name: k,
  59. Sum: v,
  60. Rate: m.Metrics[k+"_rate"],
  61. })
  62. requestCounter += m.Metrics[k+"_rate"]
  63. }
  64. fmt.Printf("\n%d sniffed %d requests over last %d seconds\n\n", time.Now().Unix(),
  65. uint(requestCounter), period)
  66. if len(nvs) == 0 {
  67. continue
  68. }
  69. sort.Sort(nvs)
  70. fmt.Printf("Top %d most popular http requests:\n", topK)
  71. fmt.Println("Total Sum Period Sum Verb Path")
  72. for _, nv := range nvs[0:int(math.Min(float64(len(nvs)), float64(topK)))] {
  73. fmt.Printf("%9.1d %7.1d %s\n", int(nv.Sum), int(nv.Rate), nv.Name)
  74. }
  75. }
  76. }
  77. // packetDecoder decodes packets and hands them off to the streamRouter
  78. func packetDecoder(packetsIn chan *pcap.Packet, packetsOut chan *pcap.Packet) {
  79. for pkt := range packetsIn {
  80. pkt.Decode()
  81. select {
  82. case packetsOut <- pkt:
  83. default:
  84. fmt.Fprint(os.Stderr, "shedding at decoder!")
  85. }
  86. }
  87. }
  88. // processor tries to parse an http request from each packet, and if
  89. // successful it records metrics about it in the loghisto metric system.
  90. func processor(ms *loghisto.MetricSystem, packetsIn chan *pcap.Packet) {
  91. for pkt := range packetsIn {
  92. req, reqErr := http.ReadRequest(bufio.NewReader(bytes.NewReader(pkt.Payload)))
  93. if reqErr == nil {
  94. ms.Counter(req.Method+" "+req.URL.Path, 1)
  95. }
  96. }
  97. }
  98. // streamRouter takes a decoded packet and routes it to a processor that can deal with all requests
  99. // and responses for this particular TCP connection. This allows the processor to own a local map
  100. // of requests so that it can avoid coordinating with other goroutines to perform analysis.
  101. func streamRouter(ports []uint16, parsedPackets chan *pcap.Packet, processors []chan *pcap.Packet) {
  102. for pkt := range parsedPackets {
  103. if pkt.TCP == nil {
  104. continue
  105. }
  106. clientPort := uint16(0)
  107. for _, p := range ports {
  108. if pkt.TCP.SrcPort == p {
  109. clientPort = pkt.TCP.DestPort
  110. break
  111. }
  112. if pkt.TCP.DestPort == p {
  113. clientPort = pkt.TCP.SrcPort
  114. break
  115. }
  116. }
  117. if clientPort != 0 {
  118. // client Port can be assumed to have sufficient entropy for
  119. // distribution among processors, and we want the same
  120. // tcp stream to go to the same processor every time
  121. // so that if we do proper packet reconstruction it will
  122. // be easier.
  123. select {
  124. case processors[int(clientPort)%len(processors)] <- pkt:
  125. default:
  126. fmt.Fprint(os.Stderr, "Shedding load at router!")
  127. }
  128. }
  129. }
  130. }
  131. // 1. parse args
  132. // 2. start the loghisto metric system
  133. // 3. start the processing and printing goroutines
  134. // 4. open the pcap handler
  135. // 5. hand off packets from the handler to the decoder
  136. func main() {
  137. portsArg := flag.String("ports", "2379", "etcd listening ports")
  138. iface := flag.String("iface", "eth0", "interface for sniffing traffic on")
  139. promisc := flag.Bool("promiscuous", true, "promiscuous mode")
  140. period := flag.Uint("period", 1, "seconds between submissions")
  141. topK := flag.Uint("topk", 10, "submit stats for the top <K> sniffed paths")
  142. flag.Parse()
  143. numCPU := runtime.NumCPU()
  144. runtime.GOMAXPROCS(numCPU)
  145. ms := loghisto.NewMetricSystem(time.Duration(*period)*time.Second, false)
  146. ms.Start()
  147. metricStream := make(chan *loghisto.ProcessedMetricSet, 2)
  148. ms.SubscribeToProcessedMetrics(metricStream)
  149. defer ms.UnsubscribeFromProcessedMetrics(metricStream)
  150. go statPrinter(metricStream, *topK, *period)
  151. ports := []uint16{}
  152. for _, p := range strings.Split(*portsArg, ",") {
  153. port, err := strconv.Atoi(p)
  154. if err == nil {
  155. ports = append(ports, uint16(port))
  156. } else {
  157. fmt.Fprintf(os.Stderr, "Failed to parse port \"%s\": %v\n", p, err)
  158. os.Exit(1)
  159. }
  160. }
  161. if len(ports) == 0 {
  162. fmt.Fprint(os.Stderr, "No ports given! Exiting.\n")
  163. os.Exit(1)
  164. }
  165. // We choose 1518 for the snaplen because it's the default
  166. // ethernet MTU at the link layer. We choose 1000 for the
  167. // timeout based on a measurement for its impact on latency
  168. // impact, but it is less precise.
  169. h, err := pcap.Openlive(*iface, 1518, *promisc, 1000)
  170. if err != nil {
  171. fmt.Fprintf(os.Stderr, "%v", err)
  172. os.Exit(1)
  173. }
  174. defer h.Close()
  175. portArray := strings.Split(*portsArg, ",")
  176. dst := strings.Join(portArray, " or dst port ")
  177. src := strings.Join(portArray, " or src port ")
  178. filter := fmt.Sprintf("tcp and (dst port %s or src port %s)", dst, src)
  179. fmt.Println("using bpf filter: ", filter)
  180. if err := h.Setfilter(filter); err != nil {
  181. fmt.Fprintf(os.Stderr, "%v", err)
  182. os.Exit(1)
  183. }
  184. unparsedPackets := make(chan *pcap.Packet, 16384)
  185. parsedPackets := make(chan *pcap.Packet, 16384)
  186. for i := 0; i < int(math.Max(2, float64(numCPU/4))); i++ {
  187. go packetDecoder(unparsedPackets, parsedPackets)
  188. }
  189. processors := []chan *pcap.Packet{}
  190. for i := 0; i < int(math.Max(2, float64(numCPU/4))); i++ {
  191. p := make(chan *pcap.Packet, 16384)
  192. processors = append(processors, p)
  193. go processor(ms, p)
  194. }
  195. go streamRouter(ports, parsedPackets, processors)
  196. for {
  197. pkt := h.Next()
  198. if pkt != nil {
  199. select {
  200. case unparsedPackets <- pkt:
  201. default:
  202. fmt.Fprint(os.Stderr, "SHEDDING IN MAIN")
  203. }
  204. }
  205. }
  206. }