client_metrics.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. package grpc_prometheus
  2. import (
  3. "io"
  4. prom "github.com/prometheus/client_golang/prometheus"
  5. "golang.org/x/net/context"
  6. "google.golang.org/grpc"
  7. "google.golang.org/grpc/codes"
  8. )
  9. // ClientMetrics represents a collection of metrics to be registered on a
  10. // Prometheus metrics registry for a gRPC client.
  11. type ClientMetrics struct {
  12. clientStartedCounter *prom.CounterVec
  13. clientHandledCounter *prom.CounterVec
  14. clientStreamMsgReceived *prom.CounterVec
  15. clientStreamMsgSent *prom.CounterVec
  16. clientHandledHistogramEnabled bool
  17. clientHandledHistogramOpts prom.HistogramOpts
  18. clientHandledHistogram *prom.HistogramVec
  19. }
  20. // NewClientMetrics returns a ClientMetrics object. Use a new instance of
  21. // ClientMetrics when not using the default Prometheus metrics registry, for
  22. // example when wanting to control which metrics are added to a registry as
  23. // opposed to automatically adding metrics via init functions.
  24. func NewClientMetrics() *ClientMetrics {
  25. return &ClientMetrics{
  26. clientStartedCounter: prom.NewCounterVec(
  27. prom.CounterOpts{
  28. Name: "grpc_client_started_total",
  29. Help: "Total number of RPCs started on the client.",
  30. }, []string{"grpc_type", "grpc_service", "grpc_method"}),
  31. clientHandledCounter: prom.NewCounterVec(
  32. prom.CounterOpts{
  33. Name: "grpc_client_handled_total",
  34. Help: "Total number of RPCs completed by the client, regardless of success or failure.",
  35. }, []string{"grpc_type", "grpc_service", "grpc_method", "grpc_code"}),
  36. clientStreamMsgReceived: prom.NewCounterVec(
  37. prom.CounterOpts{
  38. Name: "grpc_client_msg_received_total",
  39. Help: "Total number of RPC stream messages received by the client.",
  40. }, []string{"grpc_type", "grpc_service", "grpc_method"}),
  41. clientStreamMsgSent: prom.NewCounterVec(
  42. prom.CounterOpts{
  43. Name: "grpc_client_msg_sent_total",
  44. Help: "Total number of gRPC stream messages sent by the client.",
  45. }, []string{"grpc_type", "grpc_service", "grpc_method"}),
  46. clientHandledHistogramEnabled: false,
  47. clientHandledHistogramOpts: prom.HistogramOpts{
  48. Name: "grpc_client_handling_seconds",
  49. Help: "Histogram of response latency (seconds) of the gRPC until it is finished by the application.",
  50. Buckets: prom.DefBuckets,
  51. },
  52. clientHandledHistogram: nil,
  53. }
  54. }
  55. // Describe sends the super-set of all possible descriptors of metrics
  56. // collected by this Collector to the provided channel and returns once
  57. // the last descriptor has been sent.
  58. func (m *ClientMetrics) Describe(ch chan<- *prom.Desc) {
  59. m.clientStartedCounter.Describe(ch)
  60. m.clientHandledCounter.Describe(ch)
  61. m.clientStreamMsgReceived.Describe(ch)
  62. m.clientStreamMsgSent.Describe(ch)
  63. if m.clientHandledHistogramEnabled {
  64. m.clientHandledHistogram.Describe(ch)
  65. }
  66. }
  67. // Collect is called by the Prometheus registry when collecting
  68. // metrics. The implementation sends each collected metric via the
  69. // provided channel and returns once the last metric has been sent.
  70. func (m *ClientMetrics) Collect(ch chan<- prom.Metric) {
  71. m.clientStartedCounter.Collect(ch)
  72. m.clientHandledCounter.Collect(ch)
  73. m.clientStreamMsgReceived.Collect(ch)
  74. m.clientStreamMsgSent.Collect(ch)
  75. if m.clientHandledHistogramEnabled {
  76. m.clientHandledHistogram.Collect(ch)
  77. }
  78. }
  79. // EnableClientHandlingTimeHistogram turns on recording of handling time of RPCs.
  80. // Histogram metrics can be very expensive for Prometheus to retain and query.
  81. func (m *ClientMetrics) EnableClientHandlingTimeHistogram(opts ...HistogramOption) {
  82. for _, o := range opts {
  83. o(&m.clientHandledHistogramOpts)
  84. }
  85. if !m.clientHandledHistogramEnabled {
  86. m.clientHandledHistogram = prom.NewHistogramVec(
  87. m.clientHandledHistogramOpts,
  88. []string{"grpc_type", "grpc_service", "grpc_method"},
  89. )
  90. }
  91. m.clientHandledHistogramEnabled = true
  92. }
  93. // UnaryClientInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Unary RPCs.
  94. func (m *ClientMetrics) UnaryClientInterceptor() func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
  95. return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
  96. monitor := newClientReporter(m, Unary, method)
  97. monitor.SentMessage()
  98. err := invoker(ctx, method, req, reply, cc, opts...)
  99. if err != nil {
  100. monitor.ReceivedMessage()
  101. }
  102. monitor.Handled(grpc.Code(err))
  103. return err
  104. }
  105. }
  106. // StreamServerInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Streaming RPCs.
  107. func (m *ClientMetrics) StreamClientInterceptor() func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
  108. return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
  109. monitor := newClientReporter(m, clientStreamType(desc), method)
  110. clientStream, err := streamer(ctx, desc, cc, method, opts...)
  111. if err != nil {
  112. monitor.Handled(grpc.Code(err))
  113. return nil, err
  114. }
  115. return &monitoredClientStream{clientStream, monitor}, nil
  116. }
  117. }
  118. func clientStreamType(desc *grpc.StreamDesc) grpcType {
  119. if desc.ClientStreams && !desc.ServerStreams {
  120. return ClientStream
  121. } else if !desc.ClientStreams && desc.ServerStreams {
  122. return ServerStream
  123. }
  124. return BidiStream
  125. }
  126. // monitoredClientStream wraps grpc.ClientStream allowing each Sent/Recv of message to increment counters.
  127. type monitoredClientStream struct {
  128. grpc.ClientStream
  129. monitor *clientReporter
  130. }
  131. func (s *monitoredClientStream) SendMsg(m interface{}) error {
  132. err := s.ClientStream.SendMsg(m)
  133. if err == nil {
  134. s.monitor.SentMessage()
  135. }
  136. return err
  137. }
  138. func (s *monitoredClientStream) RecvMsg(m interface{}) error {
  139. err := s.ClientStream.RecvMsg(m)
  140. if err == nil {
  141. s.monitor.ReceivedMessage()
  142. } else if err == io.EOF {
  143. s.monitor.Handled(codes.OK)
  144. } else {
  145. s.monitor.Handled(grpc.Code(err))
  146. }
  147. return err
  148. }