123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185 |
- package grpc_prometheus
- import (
- prom "github.com/prometheus/client_golang/prometheus"
- "golang.org/x/net/context"
- "google.golang.org/grpc"
- "google.golang.org/grpc/status"
- )
- // ServerMetrics represents a collection of metrics to be registered on a
- // Prometheus metrics registry for a gRPC server.
- type ServerMetrics struct {
- serverStartedCounter *prom.CounterVec
- serverHandledCounter *prom.CounterVec
- serverStreamMsgReceived *prom.CounterVec
- serverStreamMsgSent *prom.CounterVec
- serverHandledHistogramEnabled bool
- serverHandledHistogramOpts prom.HistogramOpts
- serverHandledHistogram *prom.HistogramVec
- }
- // NewServerMetrics returns a ServerMetrics object. Use a new instance of
- // ServerMetrics when not using the default Prometheus metrics registry, for
- // example when wanting to control which metrics are added to a registry as
- // opposed to automatically adding metrics via init functions.
- func NewServerMetrics(counterOpts ...CounterOption) *ServerMetrics {
- opts := counterOptions(counterOpts)
- return &ServerMetrics{
- serverStartedCounter: prom.NewCounterVec(
- opts.apply(prom.CounterOpts{
- Name: "grpc_server_started_total",
- Help: "Total number of RPCs started on the server.",
- }), []string{"grpc_type", "grpc_service", "grpc_method"}),
- serverHandledCounter: prom.NewCounterVec(
- opts.apply(prom.CounterOpts{
- Name: "grpc_server_handled_total",
- Help: "Total number of RPCs completed on the server, regardless of success or failure.",
- }), []string{"grpc_type", "grpc_service", "grpc_method", "grpc_code"}),
- serverStreamMsgReceived: prom.NewCounterVec(
- opts.apply(prom.CounterOpts{
- Name: "grpc_server_msg_received_total",
- Help: "Total number of RPC stream messages received on the server.",
- }), []string{"grpc_type", "grpc_service", "grpc_method"}),
- serverStreamMsgSent: prom.NewCounterVec(
- opts.apply(prom.CounterOpts{
- Name: "grpc_server_msg_sent_total",
- Help: "Total number of gRPC stream messages sent by the server.",
- }), []string{"grpc_type", "grpc_service", "grpc_method"}),
- serverHandledHistogramEnabled: false,
- serverHandledHistogramOpts: prom.HistogramOpts{
- Name: "grpc_server_handling_seconds",
- Help: "Histogram of response latency (seconds) of gRPC that had been application-level handled by the server.",
- Buckets: prom.DefBuckets,
- },
- serverHandledHistogram: nil,
- }
- }
- // EnableHandlingTimeHistogram enables histograms being registered when
- // registering the ServerMetrics on a Prometheus registry. Histograms can be
- // expensive on Prometheus servers. It takes options to configure histogram
- // options such as the defined buckets.
- func (m *ServerMetrics) EnableHandlingTimeHistogram(opts ...HistogramOption) {
- for _, o := range opts {
- o(&m.serverHandledHistogramOpts)
- }
- if !m.serverHandledHistogramEnabled {
- m.serverHandledHistogram = prom.NewHistogramVec(
- m.serverHandledHistogramOpts,
- []string{"grpc_type", "grpc_service", "grpc_method"},
- )
- }
- m.serverHandledHistogramEnabled = true
- }
- // Describe sends the super-set of all possible descriptors of metrics
- // collected by this Collector to the provided channel and returns once
- // the last descriptor has been sent.
- func (m *ServerMetrics) Describe(ch chan<- *prom.Desc) {
- m.serverStartedCounter.Describe(ch)
- m.serverHandledCounter.Describe(ch)
- m.serverStreamMsgReceived.Describe(ch)
- m.serverStreamMsgSent.Describe(ch)
- if m.serverHandledHistogramEnabled {
- m.serverHandledHistogram.Describe(ch)
- }
- }
- // Collect is called by the Prometheus registry when collecting
- // metrics. The implementation sends each collected metric via the
- // provided channel and returns once the last metric has been sent.
- func (m *ServerMetrics) Collect(ch chan<- prom.Metric) {
- m.serverStartedCounter.Collect(ch)
- m.serverHandledCounter.Collect(ch)
- m.serverStreamMsgReceived.Collect(ch)
- m.serverStreamMsgSent.Collect(ch)
- if m.serverHandledHistogramEnabled {
- m.serverHandledHistogram.Collect(ch)
- }
- }
- // UnaryServerInterceptor is a gRPC server-side interceptor that provides Prometheus monitoring for Unary RPCs.
- func (m *ServerMetrics) UnaryServerInterceptor() func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
- return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
- monitor := newServerReporter(m, Unary, info.FullMethod)
- monitor.ReceivedMessage()
- resp, err := handler(ctx, req)
- st, _ := status.FromError(err)
- monitor.Handled(st.Code())
- if err == nil {
- monitor.SentMessage()
- }
- return resp, err
- }
- }
- // StreamServerInterceptor is a gRPC server-side interceptor that provides Prometheus monitoring for Streaming RPCs.
- func (m *ServerMetrics) StreamServerInterceptor() func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
- return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
- monitor := newServerReporter(m, streamRPCType(info), info.FullMethod)
- err := handler(srv, &monitoredServerStream{ss, monitor})
- st, _ := status.FromError(err)
- monitor.Handled(st.Code())
- return err
- }
- }
- // InitializeMetrics initializes all metrics, with their appropriate null
- // value, for all gRPC methods registered on a gRPC server. This is useful, to
- // ensure that all metrics exist when collecting and querying.
- func (m *ServerMetrics) InitializeMetrics(server *grpc.Server) {
- serviceInfo := server.GetServiceInfo()
- for serviceName, info := range serviceInfo {
- for _, mInfo := range info.Methods {
- preRegisterMethod(m, serviceName, &mInfo)
- }
- }
- }
- func streamRPCType(info *grpc.StreamServerInfo) grpcType {
- if info.IsClientStream && !info.IsServerStream {
- return ClientStream
- } else if !info.IsClientStream && info.IsServerStream {
- return ServerStream
- }
- return BidiStream
- }
- // monitoredStream wraps grpc.ServerStream allowing each Sent/Recv of message to increment counters.
- type monitoredServerStream struct {
- grpc.ServerStream
- monitor *serverReporter
- }
- func (s *monitoredServerStream) SendMsg(m interface{}) error {
- err := s.ServerStream.SendMsg(m)
- if err == nil {
- s.monitor.SentMessage()
- }
- return err
- }
- func (s *monitoredServerStream) RecvMsg(m interface{}) error {
- err := s.ServerStream.RecvMsg(m)
- if err == nil {
- s.monitor.ReceivedMessage()
- }
- return err
- }
- // preRegisterMethod is invoked on Register of a Server, allowing all gRPC services labels to be pre-populated.
- func preRegisterMethod(metrics *ServerMetrics, serviceName string, mInfo *grpc.MethodInfo) {
- methodName := mInfo.Name
- methodType := string(typeFromMethodInfo(mInfo))
- // These are just references (no increments), as just referencing will create the labels but not set values.
- metrics.serverStartedCounter.GetMetricWithLabelValues(methodType, serviceName, methodName)
- metrics.serverStreamMsgReceived.GetMetricWithLabelValues(methodType, serviceName, methodName)
- metrics.serverStreamMsgSent.GetMetricWithLabelValues(methodType, serviceName, methodName)
- if metrics.serverHandledHistogramEnabled {
- metrics.serverHandledHistogram.GetMetricWithLabelValues(methodType, serviceName, methodName)
- }
- for _, code := range allCodes {
- metrics.serverHandledCounter.GetMetricWithLabelValues(methodType, serviceName, methodName, code.String())
- }
- }
|