main.go 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363
  1. // Copyright 2018 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package main
  15. import (
  16. "context"
  17. "flag"
  18. "fmt"
  19. "io/ioutil"
  20. "net/http"
  21. "net/url"
  22. "os"
  23. "sort"
  24. "strings"
  25. "time"
  26. "github.com/coreos/etcd/clientv3"
  27. "github.com/coreos/etcd/embed"
  28. "github.com/coreos/etcd/pkg/transport"
  29. "go.uber.org/zap"
  30. )
  31. var lg *zap.Logger
  32. func init() {
  33. var err error
  34. lg, err = zap.NewProduction()
  35. if err != nil {
  36. panic(err)
  37. }
  38. }
  39. func main() {
  40. addr := flag.String("addr", "", "etcd metrics URL to fetch from (empty to use current git branch)")
  41. enableLog := flag.Bool("server-log", false, "true to enable embedded etcd server logs")
  42. debug := flag.Bool("debug", false, "true to enable debug logging")
  43. flag.Parse()
  44. if *debug {
  45. lg = zap.NewExample()
  46. }
  47. ep := *addr
  48. if ep == "" {
  49. uss := newEmbedURLs(4)
  50. ep = uss[0].String() + "/metrics"
  51. cfgs := []*embed.Config{embed.NewConfig(), embed.NewConfig()}
  52. cfgs[0].Name, cfgs[1].Name = "0", "1"
  53. setupEmbedCfg(cfgs[0], *enableLog, []url.URL{uss[0]}, []url.URL{uss[1]}, []url.URL{uss[1], uss[3]})
  54. setupEmbedCfg(cfgs[1], *enableLog, []url.URL{uss[2]}, []url.URL{uss[3]}, []url.URL{uss[1], uss[3]})
  55. type embedAndError struct {
  56. ec *embed.Etcd
  57. err error
  58. }
  59. ech := make(chan embedAndError)
  60. for _, cfg := range cfgs {
  61. go func(c *embed.Config) {
  62. e, err := embed.StartEtcd(c)
  63. if err != nil {
  64. ech <- embedAndError{err: err}
  65. return
  66. }
  67. <-e.Server.ReadyNotify()
  68. ech <- embedAndError{ec: e}
  69. }(cfg)
  70. }
  71. for range cfgs {
  72. ev := <-ech
  73. if ev.err != nil {
  74. lg.Panic("failed to start embedded etcd", zap.Error(ev.err))
  75. }
  76. defer ev.ec.Close()
  77. }
  78. // give enough time for peer-to-peer metrics
  79. time.Sleep(7 * time.Second)
  80. lg.Debug("started 2-node embedded etcd cluster")
  81. }
  82. lg.Debug("starting etcd-dump-metrics", zap.String("endpoint", ep))
  83. // send client requests to populate gRPC client-side metrics
  84. // TODO: enable default metrics initialization in v3.1 and v3.2
  85. cli, err := clientv3.New(clientv3.Config{Endpoints: []string{strings.Replace(ep, "/metrics", "", 1)}})
  86. if err != nil {
  87. lg.Panic("failed to create client", zap.Error(err))
  88. }
  89. defer cli.Close()
  90. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  91. defer cancel()
  92. _, err = cli.Put(ctx, "____test", "")
  93. if err != nil {
  94. lg.Panic("failed to write test key", zap.Error(err))
  95. }
  96. _, err = cli.Get(ctx, "____test")
  97. if err != nil {
  98. lg.Panic("failed to read test key", zap.Error(err))
  99. }
  100. _, err = cli.Delete(ctx, "____test")
  101. if err != nil {
  102. lg.Panic("failed to delete test key", zap.Error(err))
  103. }
  104. cli.Watch(ctx, "____test", clientv3.WithCreatedNotify())
  105. fmt.Println(getMetrics(ep))
  106. }
  107. func getMetrics(ep string) (m metricSlice) {
  108. lines, err := fetchMetrics(ep)
  109. if err != nil {
  110. lg.Panic("failed to fetch metrics", zap.Error(err))
  111. }
  112. mss := parse(lines)
  113. sort.Sort(metricSlice(mss))
  114. return mss
  115. }
  116. func (mss metricSlice) String() (s string) {
  117. ver := "unknown"
  118. for i, v := range mss {
  119. if strings.HasPrefix(v.name, "etcd_server_version") {
  120. ver = v.metrics[0]
  121. }
  122. s += v.String()
  123. if i != len(mss)-1 {
  124. s += "\n\n"
  125. }
  126. }
  127. return "# server version: " + ver + "\n\n" + s
  128. }
  129. type metricSlice []metric
  130. func (mss metricSlice) Len() int {
  131. return len(mss)
  132. }
  133. func (mss metricSlice) Less(i, j int) bool {
  134. return mss[i].name < mss[j].name
  135. }
  136. func (mss metricSlice) Swap(i, j int) {
  137. mss[i], mss[j] = mss[j], mss[i]
  138. }
  139. type metric struct {
  140. // raw data for debugging purposes
  141. raw []string
  142. // metrics name
  143. name string
  144. // metrics description
  145. desc string
  146. // metrics type
  147. tp string
  148. // aggregates of "grpc_server_handled_total"
  149. grpcCodes []string
  150. // keep fist 1 and last 4 if histogram or summary
  151. // otherwise, keep only 1
  152. metrics []string
  153. }
  154. func (m metric) String() (s string) {
  155. s += fmt.Sprintf("# name: %q\n", m.name)
  156. s += fmt.Sprintf("# description: %q\n", m.desc)
  157. s += fmt.Sprintf("# type: %q\n", m.tp)
  158. if len(m.grpcCodes) > 0 {
  159. s += "# gRPC codes: \n"
  160. for _, c := range m.grpcCodes {
  161. s += fmt.Sprintf("# - %q\n", c)
  162. }
  163. }
  164. s += strings.Join(m.metrics, "\n")
  165. return s
  166. }
  167. func parse(lines []string) (mss []metric) {
  168. m := metric{raw: make([]string, 0), metrics: make([]string, 0)}
  169. for _, line := range lines {
  170. if strings.HasPrefix(line, "# HELP ") {
  171. // add previous metric and initialize
  172. if m.name != "" {
  173. mss = append(mss, m)
  174. }
  175. m = metric{raw: make([]string, 0), metrics: make([]string, 0)}
  176. m.raw = append(m.raw, line)
  177. ss := strings.Split(strings.Replace(line, "# HELP ", "", 1), " ")
  178. m.name, m.desc = ss[0], strings.Join(ss[1:], " ")
  179. continue
  180. }
  181. if strings.HasPrefix(line, "# TYPE ") {
  182. m.raw = append(m.raw, line)
  183. m.tp = strings.Split(strings.Replace(line, "# TYPE "+m.tp, "", 1), " ")[1]
  184. continue
  185. }
  186. m.raw = append(m.raw, line)
  187. m.metrics = append(m.metrics, strings.Split(line, " ")[0])
  188. }
  189. if m.name != "" {
  190. mss = append(mss, m)
  191. }
  192. // aggregate
  193. for i := range mss {
  194. /*
  195. munge data for:
  196. etcd_network_active_peers{Local="c6c9b5143b47d146",Remote="fbdddd08d7e1608b"}
  197. etcd_network_peer_sent_bytes_total{To="c6c9b5143b47d146"}
  198. etcd_network_peer_received_bytes_total{From="0"}
  199. etcd_network_peer_received_bytes_total{From="fd422379fda50e48"}
  200. etcd_network_peer_round_trip_time_seconds_bucket{To="91bc3c398fb3c146",le="0.0001"}
  201. etcd_network_peer_round_trip_time_seconds_bucket{To="fd422379fda50e48",le="0.8192"}
  202. etcd_network_peer_round_trip_time_seconds_bucket{To="fd422379fda50e48",le="+Inf"}
  203. etcd_network_peer_round_trip_time_seconds_sum{To="fd422379fda50e48"}
  204. etcd_network_peer_round_trip_time_seconds_count{To="fd422379fda50e48"}
  205. */
  206. if mss[i].name == "etcd_network_active_peers" {
  207. mss[i].metrics = []string{`etcd_network_active_peers{Local="LOCAL_NODE_ID",Remote="REMOTE_PEER_NODE_ID"}`}
  208. }
  209. if mss[i].name == "etcd_network_peer_sent_bytes_total" {
  210. mss[i].metrics = []string{`etcd_network_peer_sent_bytes_total{To="REMOTE_PEER_NODE_ID"}`}
  211. }
  212. if mss[i].name == "etcd_network_peer_received_bytes_total" {
  213. mss[i].metrics = []string{`etcd_network_peer_received_bytes_total{From="REMOTE_PEER_NODE_ID"}`}
  214. }
  215. if mss[i].tp == "histogram" || mss[i].tp == "summary" {
  216. if mss[i].name == "etcd_network_peer_round_trip_time_seconds" {
  217. for j := range mss[i].metrics {
  218. l := mss[i].metrics[j]
  219. if strings.Contains(l, `To="`) && strings.Contains(l, `le="`) {
  220. k1 := strings.Index(l, `To="`)
  221. k2 := strings.Index(l, `",le="`)
  222. mss[i].metrics[j] = l[:k1+4] + "REMOTE_PEER_NODE_ID" + l[k2:]
  223. }
  224. if strings.HasPrefix(l, "etcd_network_peer_round_trip_time_seconds_sum") {
  225. mss[i].metrics[j] = `etcd_network_peer_round_trip_time_seconds_sum{To="REMOTE_PEER_NODE_ID"}`
  226. }
  227. if strings.HasPrefix(l, "etcd_network_peer_round_trip_time_seconds_count") {
  228. mss[i].metrics[j] = `etcd_network_peer_round_trip_time_seconds_count{To="REMOTE_PEER_NODE_ID"}`
  229. }
  230. }
  231. mss[i].metrics = aggSort(mss[i].metrics)
  232. }
  233. }
  234. // aggregate gRPC RPC metrics
  235. if mss[i].name == "grpc_server_handled_total" {
  236. pfx := `grpc_server_handled_total{grpc_code="`
  237. codes, metrics := make(map[string]struct{}), make(map[string]struct{})
  238. for _, v := range mss[i].metrics {
  239. v2 := strings.Replace(v, pfx, "", 1)
  240. idx := strings.Index(v2, `",grpc_method="`)
  241. code := v2[:idx]
  242. v2 = v2[idx:]
  243. codes[code] = struct{}{}
  244. v2 = pfx + "CODE" + v2
  245. metrics[v2] = struct{}{}
  246. }
  247. mss[i].grpcCodes = sortMap(codes)
  248. mss[i].metrics = sortMap(metrics)
  249. }
  250. }
  251. return mss
  252. }
  253. func fetchMetrics(ep string) (lines []string, err error) {
  254. tr, err := transport.NewTimeoutTransport(transport.TLSInfo{}, time.Second, time.Second, time.Second)
  255. if err != nil {
  256. return nil, err
  257. }
  258. cli := &http.Client{Transport: tr}
  259. resp, err := cli.Get(ep)
  260. if err != nil {
  261. return nil, err
  262. }
  263. defer resp.Body.Close()
  264. b, rerr := ioutil.ReadAll(resp.Body)
  265. if rerr != nil {
  266. return nil, rerr
  267. }
  268. lines = strings.Split(string(b), "\n")
  269. return lines, nil
  270. }
  271. func newEmbedURLs(n int) (urls []url.URL) {
  272. urls = make([]url.URL, n)
  273. for i := 0; i < n; i++ {
  274. u, _ := url.Parse(fmt.Sprintf("unix://localhost:%d%06d", os.Getpid(), i))
  275. urls[i] = *u
  276. }
  277. return urls
  278. }
  279. func setupEmbedCfg(cfg *embed.Config, enableLog bool, curls, purls, ics []url.URL) {
  280. cfg.Logger = "zap"
  281. cfg.LogOutputs = []string{"/dev/null"}
  282. if enableLog {
  283. cfg.LogOutputs = []string{"stderr"}
  284. }
  285. cfg.Debug = false
  286. var err error
  287. cfg.Dir, err = ioutil.TempDir(os.TempDir(), fmt.Sprintf("%016X", time.Now().UnixNano()))
  288. if err != nil {
  289. panic(err)
  290. }
  291. os.RemoveAll(cfg.Dir)
  292. cfg.ClusterState = "new"
  293. cfg.LCUrls, cfg.ACUrls = curls, curls
  294. cfg.LPUrls, cfg.APUrls = purls, purls
  295. cfg.InitialCluster = ""
  296. for i := range ics {
  297. cfg.InitialCluster += fmt.Sprintf(",%d=%s", i, ics[i].String())
  298. }
  299. cfg.InitialCluster = cfg.InitialCluster[1:]
  300. }
  301. func aggSort(ss []string) (sorted []string) {
  302. set := make(map[string]struct{})
  303. for _, s := range ss {
  304. set[s] = struct{}{}
  305. }
  306. sorted = make([]string, 0, len(set))
  307. for k := range set {
  308. sorted = append(sorted, k)
  309. }
  310. sort.Strings(sorted)
  311. return sorted
  312. }
  313. func sortMap(set map[string]struct{}) (sorted []string) {
  314. sorted = make([]string, 0, len(set))
  315. for k := range set {
  316. sorted = append(sorted, k)
  317. }
  318. sort.Strings(sorted)
  319. return sorted
  320. }