server_metrics.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. package grpc_prometheus
  2. import (
  3. prom "github.com/prometheus/client_golang/prometheus"
  4. "golang.org/x/net/context"
  5. "google.golang.org/grpc"
  6. "google.golang.org/grpc/status"
  7. )
  8. // ServerMetrics represents a collection of metrics to be registered on a
  9. // Prometheus metrics registry for a gRPC server.
  10. type ServerMetrics struct {
  11. serverStartedCounter *prom.CounterVec
  12. serverHandledCounter *prom.CounterVec
  13. serverStreamMsgReceived *prom.CounterVec
  14. serverStreamMsgSent *prom.CounterVec
  15. serverHandledHistogramEnabled bool
  16. serverHandledHistogramOpts prom.HistogramOpts
  17. serverHandledHistogram *prom.HistogramVec
  18. }
  19. // NewServerMetrics returns a ServerMetrics object. Use a new instance of
  20. // ServerMetrics when not using the default Prometheus metrics registry, for
  21. // example when wanting to control which metrics are added to a registry as
  22. // opposed to automatically adding metrics via init functions.
  23. func NewServerMetrics(counterOpts ...CounterOption) *ServerMetrics {
  24. opts := counterOptions(counterOpts)
  25. return &ServerMetrics{
  26. serverStartedCounter: prom.NewCounterVec(
  27. opts.apply(prom.CounterOpts{
  28. Name: "grpc_server_started_total",
  29. Help: "Total number of RPCs started on the server.",
  30. }), []string{"grpc_type", "grpc_service", "grpc_method"}),
  31. serverHandledCounter: prom.NewCounterVec(
  32. opts.apply(prom.CounterOpts{
  33. Name: "grpc_server_handled_total",
  34. Help: "Total number of RPCs completed on the server, regardless of success or failure.",
  35. }), []string{"grpc_type", "grpc_service", "grpc_method", "grpc_code"}),
  36. serverStreamMsgReceived: prom.NewCounterVec(
  37. opts.apply(prom.CounterOpts{
  38. Name: "grpc_server_msg_received_total",
  39. Help: "Total number of RPC stream messages received on the server.",
  40. }), []string{"grpc_type", "grpc_service", "grpc_method"}),
  41. serverStreamMsgSent: prom.NewCounterVec(
  42. opts.apply(prom.CounterOpts{
  43. Name: "grpc_server_msg_sent_total",
  44. Help: "Total number of gRPC stream messages sent by the server.",
  45. }), []string{"grpc_type", "grpc_service", "grpc_method"}),
  46. serverHandledHistogramEnabled: false,
  47. serverHandledHistogramOpts: prom.HistogramOpts{
  48. Name: "grpc_server_handling_seconds",
  49. Help: "Histogram of response latency (seconds) of gRPC that had been application-level handled by the server.",
  50. Buckets: prom.DefBuckets,
  51. },
  52. serverHandledHistogram: nil,
  53. }
  54. }
  55. // EnableHandlingTimeHistogram enables histograms being registered when
  56. // registering the ServerMetrics on a Prometheus registry. Histograms can be
  57. // expensive on Prometheus servers. It takes options to configure histogram
  58. // options such as the defined buckets.
  59. func (m *ServerMetrics) EnableHandlingTimeHistogram(opts ...HistogramOption) {
  60. for _, o := range opts {
  61. o(&m.serverHandledHistogramOpts)
  62. }
  63. if !m.serverHandledHistogramEnabled {
  64. m.serverHandledHistogram = prom.NewHistogramVec(
  65. m.serverHandledHistogramOpts,
  66. []string{"grpc_type", "grpc_service", "grpc_method"},
  67. )
  68. }
  69. m.serverHandledHistogramEnabled = true
  70. }
  71. // Describe sends the super-set of all possible descriptors of metrics
  72. // collected by this Collector to the provided channel and returns once
  73. // the last descriptor has been sent.
  74. func (m *ServerMetrics) Describe(ch chan<- *prom.Desc) {
  75. m.serverStartedCounter.Describe(ch)
  76. m.serverHandledCounter.Describe(ch)
  77. m.serverStreamMsgReceived.Describe(ch)
  78. m.serverStreamMsgSent.Describe(ch)
  79. if m.serverHandledHistogramEnabled {
  80. m.serverHandledHistogram.Describe(ch)
  81. }
  82. }
  83. // Collect is called by the Prometheus registry when collecting
  84. // metrics. The implementation sends each collected metric via the
  85. // provided channel and returns once the last metric has been sent.
  86. func (m *ServerMetrics) Collect(ch chan<- prom.Metric) {
  87. m.serverStartedCounter.Collect(ch)
  88. m.serverHandledCounter.Collect(ch)
  89. m.serverStreamMsgReceived.Collect(ch)
  90. m.serverStreamMsgSent.Collect(ch)
  91. if m.serverHandledHistogramEnabled {
  92. m.serverHandledHistogram.Collect(ch)
  93. }
  94. }
  95. // UnaryServerInterceptor is a gRPC server-side interceptor that provides Prometheus monitoring for Unary RPCs.
  96. func (m *ServerMetrics) UnaryServerInterceptor() func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
  97. return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
  98. monitor := newServerReporter(m, Unary, info.FullMethod)
  99. monitor.ReceivedMessage()
  100. resp, err := handler(ctx, req)
  101. st, _ := status.FromError(err)
  102. monitor.Handled(st.Code())
  103. if err == nil {
  104. monitor.SentMessage()
  105. }
  106. return resp, err
  107. }
  108. }
  109. // StreamServerInterceptor is a gRPC server-side interceptor that provides Prometheus monitoring for Streaming RPCs.
  110. func (m *ServerMetrics) StreamServerInterceptor() func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
  111. return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
  112. monitor := newServerReporter(m, streamRPCType(info), info.FullMethod)
  113. err := handler(srv, &monitoredServerStream{ss, monitor})
  114. st, _ := status.FromError(err)
  115. monitor.Handled(st.Code())
  116. return err
  117. }
  118. }
  119. // InitializeMetrics initializes all metrics, with their appropriate null
  120. // value, for all gRPC methods registered on a gRPC server. This is useful, to
  121. // ensure that all metrics exist when collecting and querying.
  122. func (m *ServerMetrics) InitializeMetrics(server *grpc.Server) {
  123. serviceInfo := server.GetServiceInfo()
  124. for serviceName, info := range serviceInfo {
  125. for _, mInfo := range info.Methods {
  126. preRegisterMethod(m, serviceName, &mInfo)
  127. }
  128. }
  129. }
  130. func streamRPCType(info *grpc.StreamServerInfo) grpcType {
  131. if info.IsClientStream && !info.IsServerStream {
  132. return ClientStream
  133. } else if !info.IsClientStream && info.IsServerStream {
  134. return ServerStream
  135. }
  136. return BidiStream
  137. }
  138. // monitoredStream wraps grpc.ServerStream allowing each Sent/Recv of message to increment counters.
  139. type monitoredServerStream struct {
  140. grpc.ServerStream
  141. monitor *serverReporter
  142. }
  143. func (s *monitoredServerStream) SendMsg(m interface{}) error {
  144. err := s.ServerStream.SendMsg(m)
  145. if err == nil {
  146. s.monitor.SentMessage()
  147. }
  148. return err
  149. }
  150. func (s *monitoredServerStream) RecvMsg(m interface{}) error {
  151. err := s.ServerStream.RecvMsg(m)
  152. if err == nil {
  153. s.monitor.ReceivedMessage()
  154. }
  155. return err
  156. }
  157. // preRegisterMethod is invoked on Register of a Server, allowing all gRPC services labels to be pre-populated.
  158. func preRegisterMethod(metrics *ServerMetrics, serviceName string, mInfo *grpc.MethodInfo) {
  159. methodName := mInfo.Name
  160. methodType := string(typeFromMethodInfo(mInfo))
  161. // These are just references (no increments), as just referencing will create the labels but not set values.
  162. metrics.serverStartedCounter.GetMetricWithLabelValues(methodType, serviceName, methodName)
  163. metrics.serverStreamMsgReceived.GetMetricWithLabelValues(methodType, serviceName, methodName)
  164. metrics.serverStreamMsgSent.GetMetricWithLabelValues(methodType, serviceName, methodName)
  165. if metrics.serverHandledHistogramEnabled {
  166. metrics.serverHandledHistogram.GetMetricWithLabelValues(methodType, serviceName, methodName)
  167. }
  168. for _, code := range allCodes {
  169. metrics.serverHandledCounter.GetMetricWithLabelValues(methodType, serviceName, methodName, code.String())
  170. }
  171. }