metrics_test.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package integration
  15. import (
  16. "bufio"
  17. "context"
  18. "io"
  19. "net"
  20. "net/http"
  21. "strconv"
  22. "strings"
  23. "testing"
  24. "time"
  25. "go.etcd.io/etcd/clientv3"
  26. "go.etcd.io/etcd/integration"
  27. "go.etcd.io/etcd/pkg/testutil"
  28. "go.etcd.io/etcd/pkg/transport"
  29. grpcprom "github.com/grpc-ecosystem/go-grpc-prometheus"
  30. "github.com/prometheus/client_golang/prometheus/promhttp"
  31. "google.golang.org/grpc"
  32. )
  33. func TestV3ClientMetrics(t *testing.T) {
  34. defer testutil.AfterTest(t)
  35. var (
  36. addr = "localhost:27989"
  37. ln net.Listener
  38. )
  39. // listen for all Prometheus metrics
  40. donec := make(chan struct{})
  41. go func() {
  42. var err error
  43. defer close(donec)
  44. srv := &http.Server{Handler: promhttp.Handler()}
  45. srv.SetKeepAlivesEnabled(false)
  46. ln, err = transport.NewUnixListener(addr)
  47. if err != nil {
  48. t.Errorf("Error: %v occurred while listening on addr: %v", err, addr)
  49. }
  50. err = srv.Serve(ln)
  51. if err != nil && !transport.IsClosedConnError(err) {
  52. t.Errorf("Err serving http requests: %v", err)
  53. }
  54. }()
  55. url := "unix://" + addr + "/metrics"
  56. clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, SkipCreatingClient: true})
  57. defer clus.Terminate(t)
  58. cfg := clientv3.Config{
  59. Endpoints: []string{clus.Members[0].GRPCAddr()},
  60. DialOptions: []grpc.DialOption{
  61. grpc.WithUnaryInterceptor(grpcprom.UnaryClientInterceptor),
  62. grpc.WithStreamInterceptor(grpcprom.StreamClientInterceptor),
  63. },
  64. }
  65. cli, cerr := clientv3.New(cfg)
  66. if cerr != nil {
  67. t.Fatal(cerr)
  68. }
  69. defer cli.Close()
  70. wc := cli.Watch(context.Background(), "foo")
  71. wBefore := sumCountersForMetricAndLabels(t, url, "grpc_client_msg_received_total", "Watch", "bidi_stream")
  72. pBefore := sumCountersForMetricAndLabels(t, url, "grpc_client_started_total", "Put", "unary")
  73. _, err := cli.Put(context.Background(), "foo", "bar")
  74. if err != nil {
  75. t.Errorf("Error putting value in key store")
  76. }
  77. pAfter := sumCountersForMetricAndLabels(t, url, "grpc_client_started_total", "Put", "unary")
  78. if pBefore+1 != pAfter {
  79. t.Errorf("grpc_client_started_total expected %d, got %d", 1, pAfter-pBefore)
  80. }
  81. // consume watch response
  82. select {
  83. case <-wc:
  84. case <-time.After(10 * time.Second):
  85. t.Error("Timeout occurred for getting watch response")
  86. }
  87. wAfter := sumCountersForMetricAndLabels(t, url, "grpc_client_msg_received_total", "Watch", "bidi_stream")
  88. if wBefore+1 != wAfter {
  89. t.Errorf("grpc_client_msg_received_total expected %d, got %d", 1, wAfter-wBefore)
  90. }
  91. ln.Close()
  92. <-donec
  93. }
  94. func sumCountersForMetricAndLabels(t *testing.T, url string, metricName string, matchingLabelValues ...string) int {
  95. count := 0
  96. for _, line := range getHTTPBodyAsLines(t, url) {
  97. ok := true
  98. if !strings.HasPrefix(line, metricName) {
  99. continue
  100. }
  101. for _, labelValue := range matchingLabelValues {
  102. if !strings.Contains(line, `"`+labelValue+`"`) {
  103. ok = false
  104. break
  105. }
  106. }
  107. if !ok {
  108. continue
  109. }
  110. valueString := line[strings.LastIndex(line, " ")+1 : len(line)-1]
  111. valueFloat, err := strconv.ParseFloat(valueString, 32)
  112. if err != nil {
  113. t.Fatalf("failed parsing value for line: %v and matchingLabelValues: %v", line, matchingLabelValues)
  114. }
  115. count += int(valueFloat)
  116. }
  117. return count
  118. }
  119. func getHTTPBodyAsLines(t *testing.T, url string) []string {
  120. cfgtls := transport.TLSInfo{}
  121. tr, err := transport.NewTransport(cfgtls, time.Second)
  122. if err != nil {
  123. t.Fatalf("Error getting transport: %v", err)
  124. }
  125. tr.MaxIdleConns = -1
  126. tr.DisableKeepAlives = true
  127. cli := &http.Client{Transport: tr}
  128. resp, err := cli.Get(url)
  129. if err != nil {
  130. t.Fatalf("Error fetching: %v", err)
  131. }
  132. reader := bufio.NewReader(resp.Body)
  133. lines := []string{}
  134. for {
  135. line, err := reader.ReadString('\n')
  136. if err != nil {
  137. if err == io.EOF {
  138. break
  139. } else {
  140. t.Fatalf("error reading: %v", err)
  141. }
  142. }
  143. lines = append(lines, line)
  144. }
  145. resp.Body.Close()
  146. return lines
  147. }