server_metrics.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. package grpc_prometheus
  2. import (
  3. prom "github.com/prometheus/client_golang/prometheus"
  4. "golang.org/x/net/context"
  5. "google.golang.org/grpc"
  6. )
  7. // ServerMetrics represents a collection of metrics to be registered on a
  8. // Prometheus metrics registry for a gRPC server.
  9. type ServerMetrics struct {
  10. serverStartedCounter *prom.CounterVec
  11. serverHandledCounter *prom.CounterVec
  12. serverStreamMsgReceived *prom.CounterVec
  13. serverStreamMsgSent *prom.CounterVec
  14. serverHandledHistogramEnabled bool
  15. serverHandledHistogramOpts prom.HistogramOpts
  16. serverHandledHistogram *prom.HistogramVec
  17. }
  18. // NewServerMetrics returns a ServerMetrics object. Use a new instance of
  19. // ServerMetrics when not using the default Prometheus metrics registry, for
  20. // example when wanting to control which metrics are added to a registry as
  21. // opposed to automatically adding metrics via init functions.
  22. func NewServerMetrics() *ServerMetrics {
  23. return &ServerMetrics{
  24. serverStartedCounter: prom.NewCounterVec(
  25. prom.CounterOpts{
  26. Name: "grpc_server_started_total",
  27. Help: "Total number of RPCs started on the server.",
  28. }, []string{"grpc_type", "grpc_service", "grpc_method"}),
  29. serverHandledCounter: prom.NewCounterVec(
  30. prom.CounterOpts{
  31. Name: "grpc_server_handled_total",
  32. Help: "Total number of RPCs completed on the server, regardless of success or failure.",
  33. }, []string{"grpc_type", "grpc_service", "grpc_method", "grpc_code"}),
  34. serverStreamMsgReceived: prom.NewCounterVec(
  35. prom.CounterOpts{
  36. Name: "grpc_server_msg_received_total",
  37. Help: "Total number of RPC stream messages received on the server.",
  38. }, []string{"grpc_type", "grpc_service", "grpc_method"}),
  39. serverStreamMsgSent: prom.NewCounterVec(
  40. prom.CounterOpts{
  41. Name: "grpc_server_msg_sent_total",
  42. Help: "Total number of gRPC stream messages sent by the server.",
  43. }, []string{"grpc_type", "grpc_service", "grpc_method"}),
  44. serverHandledHistogramEnabled: false,
  45. serverHandledHistogramOpts: prom.HistogramOpts{
  46. Name: "grpc_server_handling_seconds",
  47. Help: "Histogram of response latency (seconds) of gRPC that had been application-level handled by the server.",
  48. Buckets: prom.DefBuckets,
  49. },
  50. serverHandledHistogram: nil,
  51. }
  52. }
  53. type HistogramOption func(*prom.HistogramOpts)
  54. // WithHistogramBuckets allows you to specify custom bucket ranges for histograms if EnableHandlingTimeHistogram is on.
  55. func WithHistogramBuckets(buckets []float64) HistogramOption {
  56. return func(o *prom.HistogramOpts) { o.Buckets = buckets }
  57. }
  58. // EnableHandlingTimeHistogram enables histograms being registered when
  59. // registering the ServerMetrics on a Prometheus registry. Histograms can be
  60. // expensive on Prometheus servers. It takes options to configure histogram
  61. // options such as the defined buckets.
  62. func (m *ServerMetrics) EnableHandlingTimeHistogram(opts ...HistogramOption) {
  63. for _, o := range opts {
  64. o(&m.serverHandledHistogramOpts)
  65. }
  66. if !m.serverHandledHistogramEnabled {
  67. m.serverHandledHistogram = prom.NewHistogramVec(
  68. m.serverHandledHistogramOpts,
  69. []string{"grpc_type", "grpc_service", "grpc_method"},
  70. )
  71. }
  72. m.serverHandledHistogramEnabled = true
  73. }
  74. // Describe sends the super-set of all possible descriptors of metrics
  75. // collected by this Collector to the provided channel and returns once
  76. // the last descriptor has been sent.
  77. func (m *ServerMetrics) Describe(ch chan<- *prom.Desc) {
  78. m.serverStartedCounter.Describe(ch)
  79. m.serverHandledCounter.Describe(ch)
  80. m.serverStreamMsgReceived.Describe(ch)
  81. m.serverStreamMsgSent.Describe(ch)
  82. if m.serverHandledHistogramEnabled {
  83. m.serverHandledHistogram.Describe(ch)
  84. }
  85. }
  86. // Collect is called by the Prometheus registry when collecting
  87. // metrics. The implementation sends each collected metric via the
  88. // provided channel and returns once the last metric has been sent.
  89. func (m *ServerMetrics) Collect(ch chan<- prom.Metric) {
  90. m.serverStartedCounter.Collect(ch)
  91. m.serverHandledCounter.Collect(ch)
  92. m.serverStreamMsgReceived.Collect(ch)
  93. m.serverStreamMsgSent.Collect(ch)
  94. if m.serverHandledHistogramEnabled {
  95. m.serverHandledHistogram.Collect(ch)
  96. }
  97. }
  98. // UnaryServerInterceptor is a gRPC server-side interceptor that provides Prometheus monitoring for Unary RPCs.
  99. func (m *ServerMetrics) UnaryServerInterceptor() func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
  100. return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
  101. monitor := newServerReporter(m, Unary, info.FullMethod)
  102. monitor.ReceivedMessage()
  103. resp, err := handler(ctx, req)
  104. monitor.Handled(grpc.Code(err))
  105. if err == nil {
  106. monitor.SentMessage()
  107. }
  108. return resp, err
  109. }
  110. }
  111. // StreamServerInterceptor is a gRPC server-side interceptor that provides Prometheus monitoring for Streaming RPCs.
  112. func (m *ServerMetrics) StreamServerInterceptor() func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
  113. return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
  114. monitor := newServerReporter(m, streamRpcType(info), info.FullMethod)
  115. err := handler(srv, &monitoredServerStream{ss, monitor})
  116. monitor.Handled(grpc.Code(err))
  117. return err
  118. }
  119. }
  120. // InitializeMetrics initializes all metrics, with their appropriate null
  121. // value, for all gRPC methods registered on a gRPC server. This is useful, to
  122. // ensure that all metrics exist when collecting and querying.
  123. func (m *ServerMetrics) InitializeMetrics(server *grpc.Server) {
  124. serviceInfo := server.GetServiceInfo()
  125. for serviceName, info := range serviceInfo {
  126. for _, mInfo := range info.Methods {
  127. preRegisterMethod(m, serviceName, &mInfo)
  128. }
  129. }
  130. }
  131. // Register registers all server metrics in a given metrics registry. Depending
  132. // on histogram options and whether they are enabled, histogram metrics are
  133. // also registered.
  134. //
  135. // Deprecated: ServerMetrics implements Prometheus Collector interface. You can
  136. // register an instance of ServerMetrics directly by using
  137. // prometheus.Register(m).
  138. func (m *ServerMetrics) Register(r prom.Registerer) error {
  139. return r.Register(m)
  140. }
  141. // MustRegister tries to register all server metrics and panics on an error.
  142. //
  143. // Deprecated: ServerMetrics implements Prometheus Collector interface. You can
  144. // register an instance of ServerMetrics directly by using
  145. // prometheus.MustRegister(m).
  146. func (m *ServerMetrics) MustRegister(r prom.Registerer) {
  147. r.MustRegister(m)
  148. }
  149. func streamRpcType(info *grpc.StreamServerInfo) grpcType {
  150. if info.IsClientStream && !info.IsServerStream {
  151. return ClientStream
  152. } else if !info.IsClientStream && info.IsServerStream {
  153. return ServerStream
  154. }
  155. return BidiStream
  156. }
  157. // monitoredStream wraps grpc.ServerStream allowing each Sent/Recv of message to increment counters.
  158. type monitoredServerStream struct {
  159. grpc.ServerStream
  160. monitor *serverReporter
  161. }
  162. func (s *monitoredServerStream) SendMsg(m interface{}) error {
  163. err := s.ServerStream.SendMsg(m)
  164. if err == nil {
  165. s.monitor.SentMessage()
  166. }
  167. return err
  168. }
  169. func (s *monitoredServerStream) RecvMsg(m interface{}) error {
  170. err := s.ServerStream.RecvMsg(m)
  171. if err == nil {
  172. s.monitor.ReceivedMessage()
  173. }
  174. return err
  175. }
  176. // preRegisterMethod is invoked on Register of a Server, allowing all gRPC services labels to be pre-populated.
  177. func preRegisterMethod(metrics *ServerMetrics, serviceName string, mInfo *grpc.MethodInfo) {
  178. methodName := mInfo.Name
  179. methodType := string(typeFromMethodInfo(mInfo))
  180. // These are just references (no increments), as just referencing will create the labels but not set values.
  181. metrics.serverStartedCounter.GetMetricWithLabelValues(methodType, serviceName, methodName)
  182. metrics.serverStreamMsgReceived.GetMetricWithLabelValues(methodType, serviceName, methodName)
  183. metrics.serverStreamMsgSent.GetMetricWithLabelValues(methodType, serviceName, methodName)
  184. if metrics.serverHandledHistogramEnabled {
  185. metrics.serverHandledHistogram.GetMetricWithLabelValues(methodType, serviceName, methodName)
  186. }
  187. for _, code := range allCodes {
  188. metrics.serverHandledCounter.GetMetricWithLabelValues(methodType, serviceName, methodName, code.String())
  189. }
  190. }