metrics_test.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package integration
  15. import (
  16. "bufio"
  17. "io"
  18. "net"
  19. "net/http"
  20. "strconv"
  21. "strings"
  22. "testing"
  23. "time"
  24. "github.com/coreos/etcd/clientv3"
  25. "github.com/coreos/etcd/integration"
  26. "github.com/coreos/etcd/pkg/testutil"
  27. "github.com/coreos/etcd/pkg/transport"
  28. "github.com/prometheus/client_golang/prometheus"
  29. "golang.org/x/net/context"
  30. )
  31. func TestV3ClientMetrics(t *testing.T) {
  32. defer testutil.AfterTest(t)
  33. var (
  34. addr string = "localhost:27989"
  35. ln net.Listener
  36. err error
  37. )
  38. // listen for all prometheus metrics
  39. donec := make(chan struct{})
  40. go func() {
  41. defer close(donec)
  42. srv := &http.Server{Handler: prometheus.Handler()}
  43. srv.SetKeepAlivesEnabled(false)
  44. ln, err = transport.NewUnixListener(addr)
  45. if err != nil {
  46. t.Fatalf("Error : %v occured while listening on addr : %v", err, addr)
  47. }
  48. err = srv.Serve(ln)
  49. if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
  50. t.Fatalf("Err serving http requests: %v", err)
  51. }
  52. }()
  53. url := "unix://" + addr + "/metrics"
  54. clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
  55. defer clus.Terminate(t)
  56. client := clus.Client(0)
  57. w := clientv3.NewWatcher(client)
  58. defer w.Close()
  59. kv := clientv3.NewKV(client)
  60. wc := w.Watch(context.Background(), "foo")
  61. wBefore := sumCountersForMetricAndLabels(t, url, "grpc_client_msg_received_total", "Watch", "bidi_stream")
  62. pBefore := sumCountersForMetricAndLabels(t, url, "grpc_client_started_total", "Put", "unary")
  63. _, err = kv.Put(context.Background(), "foo", "bar")
  64. if err != nil {
  65. t.Errorf("Error putting value in key store")
  66. }
  67. pAfter := sumCountersForMetricAndLabels(t, url, "grpc_client_started_total", "Put", "unary")
  68. if pBefore+1 != pAfter {
  69. t.Errorf("grpc_client_started_total expected : %d, got : %d", 1, pAfter-pBefore)
  70. }
  71. // consume watch response
  72. select {
  73. case <-wc:
  74. case <-time.After(10 * time.Second):
  75. t.Error("Timeout occured for getting watch response")
  76. }
  77. wAfter := sumCountersForMetricAndLabels(t, url, "grpc_client_msg_received_total", "Watch", "bidi_stream")
  78. if wBefore+1 != wAfter {
  79. t.Errorf("grpc_client_msg_received_total expected : %d, got %d", 1, wAfter-wBefore)
  80. }
  81. ln.Close()
  82. <-donec
  83. }
  84. func sumCountersForMetricAndLabels(t *testing.T, url string, metricName string, matchingLabelValues ...string) int {
  85. count := 0
  86. for _, line := range getHTTPBodyAsLines(t, url) {
  87. ok := true
  88. if !strings.HasPrefix(line, metricName) {
  89. continue
  90. }
  91. for _, labelValue := range matchingLabelValues {
  92. if !strings.Contains(line, `"`+labelValue+`"`) {
  93. ok = false
  94. break
  95. }
  96. }
  97. if !ok {
  98. continue
  99. }
  100. valueString := line[strings.LastIndex(line, " ")+1 : len(line)-1]
  101. valueFloat, err := strconv.ParseFloat(valueString, 32)
  102. if err != nil {
  103. t.Fatalf("failed parsing value for line: %v and matchingLabelValues : %v \n", line, matchingLabelValues)
  104. }
  105. count += int(valueFloat)
  106. }
  107. return count
  108. }
  109. func getHTTPBodyAsLines(t *testing.T, url string) []string {
  110. cfgtls := transport.TLSInfo{}
  111. tr, err := transport.NewTransport(cfgtls, time.Second)
  112. if err != nil {
  113. t.Fatalf("Error getting transport : %v", err)
  114. }
  115. tr.MaxIdleConns = -1
  116. tr.DisableKeepAlives = true
  117. cli := &http.Client{Transport: tr}
  118. resp, err := cli.Get(url)
  119. if err != nil {
  120. t.Fatalf("Error fetching: %v", err)
  121. }
  122. reader := bufio.NewReader(resp.Body)
  123. lines := []string{}
  124. for {
  125. line, err := reader.ReadString('\n')
  126. if err != nil {
  127. if err == io.EOF {
  128. break
  129. } else {
  130. t.Fatalf("error reading: %v", err)
  131. }
  132. }
  133. lines = append(lines, line)
  134. }
  135. resp.Body.Close()
  136. return lines
  137. }