123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170 |
- package grpc_prometheus
- import (
- "io"
- prom "github.com/prometheus/client_golang/prometheus"
- "golang.org/x/net/context"
- "google.golang.org/grpc"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
- )
- // ClientMetrics represents a collection of metrics to be registered on a
- // Prometheus metrics registry for a gRPC client.
- type ClientMetrics struct {
- clientStartedCounter *prom.CounterVec
- clientHandledCounter *prom.CounterVec
- clientStreamMsgReceived *prom.CounterVec
- clientStreamMsgSent *prom.CounterVec
- clientHandledHistogramEnabled bool
- clientHandledHistogramOpts prom.HistogramOpts
- clientHandledHistogram *prom.HistogramVec
- }
- // NewClientMetrics returns a ClientMetrics object. Use a new instance of
- // ClientMetrics when not using the default Prometheus metrics registry, for
- // example when wanting to control which metrics are added to a registry as
- // opposed to automatically adding metrics via init functions.
- func NewClientMetrics(counterOpts ...CounterOption) *ClientMetrics {
- opts := counterOptions(counterOpts)
- return &ClientMetrics{
- clientStartedCounter: prom.NewCounterVec(
- opts.apply(prom.CounterOpts{
- Name: "grpc_client_started_total",
- Help: "Total number of RPCs started on the client.",
- }), []string{"grpc_type", "grpc_service", "grpc_method"}),
- clientHandledCounter: prom.NewCounterVec(
- opts.apply(prom.CounterOpts{
- Name: "grpc_client_handled_total",
- Help: "Total number of RPCs completed by the client, regardless of success or failure.",
- }), []string{"grpc_type", "grpc_service", "grpc_method", "grpc_code"}),
- clientStreamMsgReceived: prom.NewCounterVec(
- opts.apply(prom.CounterOpts{
- Name: "grpc_client_msg_received_total",
- Help: "Total number of RPC stream messages received by the client.",
- }), []string{"grpc_type", "grpc_service", "grpc_method"}),
- clientStreamMsgSent: prom.NewCounterVec(
- opts.apply(prom.CounterOpts{
- Name: "grpc_client_msg_sent_total",
- Help: "Total number of gRPC stream messages sent by the client.",
- }), []string{"grpc_type", "grpc_service", "grpc_method"}),
- clientHandledHistogramEnabled: false,
- clientHandledHistogramOpts: prom.HistogramOpts{
- Name: "grpc_client_handling_seconds",
- Help: "Histogram of response latency (seconds) of the gRPC until it is finished by the application.",
- Buckets: prom.DefBuckets,
- },
- clientHandledHistogram: nil,
- }
- }
- // Describe sends the super-set of all possible descriptors of metrics
- // collected by this Collector to the provided channel and returns once
- // the last descriptor has been sent.
- func (m *ClientMetrics) Describe(ch chan<- *prom.Desc) {
- m.clientStartedCounter.Describe(ch)
- m.clientHandledCounter.Describe(ch)
- m.clientStreamMsgReceived.Describe(ch)
- m.clientStreamMsgSent.Describe(ch)
- if m.clientHandledHistogramEnabled {
- m.clientHandledHistogram.Describe(ch)
- }
- }
- // Collect is called by the Prometheus registry when collecting
- // metrics. The implementation sends each collected metric via the
- // provided channel and returns once the last metric has been sent.
- func (m *ClientMetrics) Collect(ch chan<- prom.Metric) {
- m.clientStartedCounter.Collect(ch)
- m.clientHandledCounter.Collect(ch)
- m.clientStreamMsgReceived.Collect(ch)
- m.clientStreamMsgSent.Collect(ch)
- if m.clientHandledHistogramEnabled {
- m.clientHandledHistogram.Collect(ch)
- }
- }
- // EnableClientHandlingTimeHistogram turns on recording of handling time of RPCs.
- // Histogram metrics can be very expensive for Prometheus to retain and query.
- func (m *ClientMetrics) EnableClientHandlingTimeHistogram(opts ...HistogramOption) {
- for _, o := range opts {
- o(&m.clientHandledHistogramOpts)
- }
- if !m.clientHandledHistogramEnabled {
- m.clientHandledHistogram = prom.NewHistogramVec(
- m.clientHandledHistogramOpts,
- []string{"grpc_type", "grpc_service", "grpc_method"},
- )
- }
- m.clientHandledHistogramEnabled = true
- }
- // UnaryClientInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Unary RPCs.
- func (m *ClientMetrics) UnaryClientInterceptor() func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
- return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
- monitor := newClientReporter(m, Unary, method)
- monitor.SentMessage()
- err := invoker(ctx, method, req, reply, cc, opts...)
- if err != nil {
- monitor.ReceivedMessage()
- }
- st, _ := status.FromError(err)
- monitor.Handled(st.Code())
- return err
- }
- }
- // StreamClientInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Streaming RPCs.
- func (m *ClientMetrics) StreamClientInterceptor() func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
- return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
- monitor := newClientReporter(m, clientStreamType(desc), method)
- clientStream, err := streamer(ctx, desc, cc, method, opts...)
- if err != nil {
- st, _ := status.FromError(err)
- monitor.Handled(st.Code())
- return nil, err
- }
- return &monitoredClientStream{clientStream, monitor}, nil
- }
- }
- func clientStreamType(desc *grpc.StreamDesc) grpcType {
- if desc.ClientStreams && !desc.ServerStreams {
- return ClientStream
- } else if !desc.ClientStreams && desc.ServerStreams {
- return ServerStream
- }
- return BidiStream
- }
- // monitoredClientStream wraps grpc.ClientStream allowing each Sent/Recv of message to increment counters.
- type monitoredClientStream struct {
- grpc.ClientStream
- monitor *clientReporter
- }
- func (s *monitoredClientStream) SendMsg(m interface{}) error {
- err := s.ClientStream.SendMsg(m)
- if err == nil {
- s.monitor.SentMessage()
- }
- return err
- }
- func (s *monitoredClientStream) RecvMsg(m interface{}) error {
- err := s.ClientStream.RecvMsg(m)
- if err == nil {
- s.monitor.ReceivedMessage()
- } else if err == io.EOF {
- s.monitor.Handled(codes.OK)
- } else {
- st, _ := status.FromError(err)
- s.monitor.Handled(st.Code())
- }
- return err
- }
|