server_test.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307
  1. // Copyright 2016 Michal Witkowski. All Rights Reserved.
  2. // See LICENSE for licensing terms.
  3. package grpc_prometheus
  4. import (
  5. "bufio"
  6. "io"
  7. "net"
  8. "net/http"
  9. "net/http/httptest"
  10. "strconv"
  11. "strings"
  12. "testing"
  13. "time"
  14. pb_testproto "github.com/grpc-ecosystem/go-grpc-prometheus/examples/testproto"
  15. "github.com/prometheus/client_golang/prometheus"
  16. "github.com/stretchr/testify/assert"
  17. "github.com/stretchr/testify/require"
  18. "github.com/stretchr/testify/suite"
  19. "golang.org/x/net/context"
  20. "google.golang.org/grpc"
  21. "google.golang.org/grpc/codes"
  22. )
  23. const (
  24. pingDefaultValue = "I like kittens."
  25. countListResponses = 20
  26. )
  27. func TestServerInterceptorSuite(t *testing.T) {
  28. suite.Run(t, &ServerInterceptorTestSuite{})
  29. }
  30. type ServerInterceptorTestSuite struct {
  31. suite.Suite
  32. serverListener net.Listener
  33. server *grpc.Server
  34. clientConn *grpc.ClientConn
  35. testClient pb_testproto.TestServiceClient
  36. ctx context.Context
  37. }
  38. func (s *ServerInterceptorTestSuite) SetupSuite() {
  39. var err error
  40. EnableHandlingTimeHistogram()
  41. s.serverListener, err = net.Listen("tcp", "127.0.0.1:0")
  42. require.NoError(s.T(), err, "must be able to allocate a port for serverListener")
  43. // This is the point where we hook up the interceptor
  44. s.server = grpc.NewServer(
  45. grpc.StreamInterceptor(StreamServerInterceptor),
  46. grpc.UnaryInterceptor(UnaryServerInterceptor),
  47. )
  48. pb_testproto.RegisterTestServiceServer(s.server, &testService{t: s.T()})
  49. go func() {
  50. s.server.Serve(s.serverListener)
  51. }()
  52. s.clientConn, err = grpc.Dial(s.serverListener.Addr().String(), grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(2*time.Second))
  53. require.NoError(s.T(), err, "must not error on client Dial")
  54. s.testClient = pb_testproto.NewTestServiceClient(s.clientConn)
  55. // Important! Pre-register stuff here.
  56. Register(s.server)
  57. }
  58. func (s *ServerInterceptorTestSuite) SetupTest() {
  59. // Make all RPC calls last at most 2 sec, meaning all async issues or deadlock will not kill tests.
  60. s.ctx, _ = context.WithTimeout(context.TODO(), 2*time.Second)
  61. }
  62. func (s *ServerInterceptorTestSuite) TearDownSuite() {
  63. if s.serverListener != nil {
  64. s.server.Stop()
  65. s.T().Logf("stopped grpc.Server at: %v", s.serverListener.Addr().String())
  66. s.serverListener.Close()
  67. }
  68. if s.clientConn != nil {
  69. s.clientConn.Close()
  70. }
  71. }
  72. func (s *ServerInterceptorTestSuite) TestRegisterPresetsStuff() {
  73. for testId, testCase := range []struct {
  74. metricName string
  75. existingLabels []string
  76. }{
  77. {"grpc_server_started_total", []string{"mwitkow.testproto.TestService", "PingEmpty", "unary"}},
  78. {"grpc_server_started_total", []string{"mwitkow.testproto.TestService", "PingList", "server_stream"}},
  79. {"grpc_server_msg_received_total", []string{"mwitkow.testproto.TestService", "PingList", "server_stream"}},
  80. {"grpc_server_msg_sent_total", []string{"mwitkow.testproto.TestService", "PingEmpty", "unary"}},
  81. {"grpc_server_handling_seconds_sum", []string{"mwitkow.testproto.TestService", "PingEmpty", "unary"}},
  82. {"grpc_server_handling_seconds_count", []string{"mwitkow.testproto.TestService", "PingList", "server_stream"}},
  83. {"grpc_server_handled_total", []string{"mwitkow.testproto.TestService", "PingList", "server_stream", "OutOfRange"}},
  84. {"grpc_server_handled_total", []string{"mwitkow.testproto.TestService", "PingList", "server_stream", "Aborted"}},
  85. {"grpc_server_handled_total", []string{"mwitkow.testproto.TestService", "PingEmpty", "unary", "FailedPrecondition"}},
  86. {"grpc_server_handled_total", []string{"mwitkow.testproto.TestService", "PingEmpty", "unary", "ResourceExhausted"}},
  87. } {
  88. lineCount := len(fetchPrometheusLines(s.T(), testCase.metricName, testCase.existingLabels...))
  89. assert.NotEqual(s.T(), 0, lineCount, "metrics must exist for test case %d", testId)
  90. }
  91. }
  92. func (s *ServerInterceptorTestSuite) TestUnaryIncrementsStarted() {
  93. var before int
  94. var after int
  95. before = sumCountersForMetricAndLabels(s.T(), "grpc_server_started_total", "PingEmpty", "unary")
  96. s.testClient.PingEmpty(s.ctx, &pb_testproto.Empty{})
  97. after = sumCountersForMetricAndLabels(s.T(), "grpc_server_started_total", "PingEmpty", "unary")
  98. assert.EqualValues(s.T(), before+1, after, "grpc_server_started_total should be incremented for PingEmpty")
  99. before = sumCountersForMetricAndLabels(s.T(), "grpc_server_started_total", "PingError", "unary")
  100. s.testClient.PingError(s.ctx, &pb_testproto.PingRequest{ErrorCodeReturned: uint32(codes.Unavailable)})
  101. after = sumCountersForMetricAndLabels(s.T(), "grpc_server_started_total", "PingError", "unary")
  102. assert.EqualValues(s.T(), before+1, after, "grpc_server_started_total should be incremented for PingError")
  103. }
  104. func (s *ServerInterceptorTestSuite) TestUnaryIncrementsHandled() {
  105. var before int
  106. var after int
  107. before = sumCountersForMetricAndLabels(s.T(), "grpc_server_handled_total", "PingEmpty", "unary", "OK")
  108. s.testClient.PingEmpty(s.ctx, &pb_testproto.Empty{}) // should return with code=OK
  109. after = sumCountersForMetricAndLabels(s.T(), "grpc_server_handled_total", "PingEmpty", "unary", "OK")
  110. assert.EqualValues(s.T(), before+1, after, "grpc_server_handled_count should be incremented for PingEmpty")
  111. before = sumCountersForMetricAndLabels(s.T(), "grpc_server_handled_total", "PingError", "unary", "FailedPrecondition")
  112. s.testClient.PingError(s.ctx, &pb_testproto.PingRequest{ErrorCodeReturned: uint32(codes.FailedPrecondition)}) // should return with code=FailedPrecondition
  113. after = sumCountersForMetricAndLabels(s.T(), "grpc_server_handled_total", "PingError", "unary", "FailedPrecondition")
  114. assert.EqualValues(s.T(), before+1, after, "grpc_server_handled_total should be incremented for PingError")
  115. }
  116. func (s *ServerInterceptorTestSuite) TestUnaryIncrementsHistograms() {
  117. var before int
  118. var after int
  119. before = sumCountersForMetricAndLabels(s.T(), "grpc_server_handling_seconds_count", "PingEmpty", "unary")
  120. s.testClient.PingEmpty(s.ctx, &pb_testproto.Empty{}) // should return with code=OK
  121. after = sumCountersForMetricAndLabels(s.T(), "grpc_server_handling_seconds_count", "PingEmpty", "unary")
  122. assert.EqualValues(s.T(), before+1, after, "grpc_server_handled_count should be incremented for PingEmpty")
  123. before = sumCountersForMetricAndLabels(s.T(), "grpc_server_handling_seconds_count", "PingError", "unary")
  124. s.testClient.PingError(s.ctx, &pb_testproto.PingRequest{ErrorCodeReturned: uint32(codes.FailedPrecondition)}) // should return with code=FailedPrecondition
  125. after = sumCountersForMetricAndLabels(s.T(), "grpc_server_handling_seconds_count", "PingError", "unary")
  126. assert.EqualValues(s.T(), before+1, after, "grpc_server_handling_seconds_count should be incremented for PingError")
  127. }
  128. func (s *ServerInterceptorTestSuite) TestStreamingIncrementsStarted() {
  129. var before int
  130. var after int
  131. before = sumCountersForMetricAndLabels(s.T(), "grpc_server_started_total", "PingList", "server_stream")
  132. s.testClient.PingList(s.ctx, &pb_testproto.PingRequest{})
  133. after = sumCountersForMetricAndLabels(s.T(), "grpc_server_started_total", "PingList", "server_stream")
  134. assert.EqualValues(s.T(), before+1, after, "grpc_server_started_total should be incremented for PingList")
  135. }
  136. func (s *ServerInterceptorTestSuite) TestStreamingIncrementsHistograms() {
  137. var before int
  138. var after int
  139. before = sumCountersForMetricAndLabels(s.T(), "grpc_server_handling_seconds_count", "PingList", "server_stream")
  140. ss, _ := s.testClient.PingList(s.ctx, &pb_testproto.PingRequest{}) // should return with code=OK
  141. // Do a read, just for kicks.
  142. for {
  143. _, err := ss.Recv()
  144. if err == io.EOF {
  145. break
  146. }
  147. require.NoError(s.T(), err, "reading pingList shouldn't fail")
  148. }
  149. after = sumCountersForMetricAndLabels(s.T(), "grpc_server_handling_seconds_count", "PingList", "server_stream")
  150. assert.EqualValues(s.T(), before+1, after, "grpc_server_handling_seconds_count should be incremented for PingList OK")
  151. before = sumCountersForMetricAndLabels(s.T(), "grpc_server_handling_seconds_count", "PingList", "server_stream")
  152. _, err := s.testClient.PingList(s.ctx, &pb_testproto.PingRequest{ErrorCodeReturned: uint32(codes.FailedPrecondition)}) // should return with code=FailedPrecondition
  153. require.NoError(s.T(), err, "PingList must not fail immedietely")
  154. after = sumCountersForMetricAndLabels(s.T(), "grpc_server_handling_seconds_count", "PingList", "server_stream")
  155. assert.EqualValues(s.T(), before+1, after, "grpc_server_handling_seconds_count should be incremented for PingList FailedPrecondition")
  156. }
  157. func (s *ServerInterceptorTestSuite) TestStreamingIncrementsHandled() {
  158. var before int
  159. var after int
  160. before = sumCountersForMetricAndLabels(s.T(), "grpc_server_handled_total", "PingList", "server_stream", "OK")
  161. ss, _ := s.testClient.PingList(s.ctx, &pb_testproto.PingRequest{}) // should return with code=OK
  162. // Do a read, just for kicks.
  163. for {
  164. _, err := ss.Recv()
  165. if err == io.EOF {
  166. break
  167. }
  168. require.NoError(s.T(), err, "reading pingList shouldn't fail")
  169. }
  170. after = sumCountersForMetricAndLabels(s.T(), "grpc_server_handled_total", "PingList", "server_stream", "OK")
  171. assert.EqualValues(s.T(), before+1, after, "grpc_server_handled_total should be incremented for PingList OK")
  172. before = sumCountersForMetricAndLabels(s.T(), "grpc_server_handled_total", "PingList", "server_stream", "FailedPrecondition")
  173. _, err := s.testClient.PingList(s.ctx, &pb_testproto.PingRequest{ErrorCodeReturned: uint32(codes.FailedPrecondition)}) // should return with code=FailedPrecondition
  174. require.NoError(s.T(), err, "PingList must not fail immedietely")
  175. after = sumCountersForMetricAndLabels(s.T(), "grpc_server_handled_total", "PingList", "server_stream", "FailedPrecondition")
  176. assert.EqualValues(s.T(), before+1, after, "grpc_server_handled_total should be incremented for PingList FailedPrecondition")
  177. }
  178. func (s *ServerInterceptorTestSuite) TestStreamingIncrementsMessageCounts() {
  179. beforeRecv := sumCountersForMetricAndLabels(s.T(), "grpc_server_msg_received_total", "PingList", "server_stream")
  180. beforeSent := sumCountersForMetricAndLabels(s.T(), "grpc_server_msg_sent_total", "PingList", "server_stream")
  181. ss, _ := s.testClient.PingList(s.ctx, &pb_testproto.PingRequest{}) // should return with code=OK
  182. // Do a read, just for kicks.
  183. count := 0
  184. for {
  185. _, err := ss.Recv()
  186. if err == io.EOF {
  187. break
  188. }
  189. require.NoError(s.T(), err, "reading pingList shouldn't fail")
  190. count += 1
  191. }
  192. require.EqualValues(s.T(), countListResponses, count, "Number of received msg on the wire must match")
  193. afterSent := sumCountersForMetricAndLabels(s.T(), "grpc_server_msg_sent_total", "PingList", "server_stream")
  194. afterRecv := sumCountersForMetricAndLabels(s.T(), "grpc_server_msg_received_total", "PingList", "server_stream")
  195. assert.EqualValues(s.T(), beforeSent+countListResponses, afterSent, "grpc_server_msg_sent_total should be incremented 20 times for PingList")
  196. assert.EqualValues(s.T(), beforeRecv+1, afterRecv, "grpc_server_msg_sent_total should be incremented ones for PingList ")
  197. }
  198. func fetchPrometheusLines(t *testing.T, metricName string, matchingLabelValues ...string) []string {
  199. resp := httptest.NewRecorder()
  200. req, err := http.NewRequest("GET", "/", nil)
  201. require.NoError(t, err, "failed creating request for Prometheus handler")
  202. prometheus.Handler().ServeHTTP(resp, req)
  203. reader := bufio.NewReader(resp.Body)
  204. ret := []string{}
  205. for {
  206. line, err := reader.ReadString('\n')
  207. if err == io.EOF {
  208. break
  209. } else {
  210. require.NoError(t, err, "error reading stuff")
  211. }
  212. if !strings.HasPrefix(line, metricName) {
  213. continue
  214. }
  215. matches := true
  216. for _, labelValue := range matchingLabelValues {
  217. if !strings.Contains(line, `"`+labelValue+`"`) {
  218. matches = false
  219. }
  220. }
  221. if matches {
  222. ret = append(ret, line)
  223. }
  224. }
  225. return ret
  226. }
  227. func sumCountersForMetricAndLabels(t *testing.T, metricName string, matchingLabelValues ...string) int {
  228. count := 0
  229. for _, line := range fetchPrometheusLines(t, metricName, matchingLabelValues...) {
  230. valueString := line[strings.LastIndex(line, " ")+1 : len(line)-1]
  231. valueFloat, err := strconv.ParseFloat(valueString, 32)
  232. require.NoError(t, err, "failed parsing value for line: %v", line)
  233. count += int(valueFloat)
  234. }
  235. return count
  236. }
  237. type testService struct {
  238. t *testing.T
  239. }
  240. func (s *testService) PingEmpty(ctx context.Context, _ *pb_testproto.Empty) (*pb_testproto.PingResponse, error) {
  241. return &pb_testproto.PingResponse{Value: pingDefaultValue, Counter: 42}, nil
  242. }
  243. func (s *testService) Ping(ctx context.Context, ping *pb_testproto.PingRequest) (*pb_testproto.PingResponse, error) {
  244. // Send user trailers and headers.
  245. return &pb_testproto.PingResponse{Value: ping.Value, Counter: 42}, nil
  246. }
  247. func (s *testService) PingError(ctx context.Context, ping *pb_testproto.PingRequest) (*pb_testproto.Empty, error) {
  248. code := codes.Code(ping.ErrorCodeReturned)
  249. return nil, grpc.Errorf(code, "Userspace error.")
  250. }
  251. func (s *testService) PingList(ping *pb_testproto.PingRequest, stream pb_testproto.TestService_PingListServer) error {
  252. if ping.ErrorCodeReturned != 0 {
  253. return grpc.Errorf(codes.Code(ping.ErrorCodeReturned), "foobar")
  254. }
  255. // Send user trailers and headers.
  256. for i := 0; i < countListResponses; i++ {
  257. stream.Send(&pb_testproto.PingResponse{Value: ping.Value, Counter: int32(i)})
  258. }
  259. return nil
  260. }