client_metrics.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. package grpc_prometheus
  2. import (
  3. "io"
  4. prom "github.com/prometheus/client_golang/prometheus"
  5. "golang.org/x/net/context"
  6. "google.golang.org/grpc"
  7. "google.golang.org/grpc/codes"
  8. "google.golang.org/grpc/status"
  9. )
  10. // ClientMetrics represents a collection of metrics to be registered on a
  11. // Prometheus metrics registry for a gRPC client.
  12. type ClientMetrics struct {
  13. clientStartedCounter *prom.CounterVec
  14. clientHandledCounter *prom.CounterVec
  15. clientStreamMsgReceived *prom.CounterVec
  16. clientStreamMsgSent *prom.CounterVec
  17. clientHandledHistogramEnabled bool
  18. clientHandledHistogramOpts prom.HistogramOpts
  19. clientHandledHistogram *prom.HistogramVec
  20. }
  21. // NewClientMetrics returns a ClientMetrics object. Use a new instance of
  22. // ClientMetrics when not using the default Prometheus metrics registry, for
  23. // example when wanting to control which metrics are added to a registry as
  24. // opposed to automatically adding metrics via init functions.
  25. func NewClientMetrics(counterOpts ...CounterOption) *ClientMetrics {
  26. opts := counterOptions(counterOpts)
  27. return &ClientMetrics{
  28. clientStartedCounter: prom.NewCounterVec(
  29. opts.apply(prom.CounterOpts{
  30. Name: "grpc_client_started_total",
  31. Help: "Total number of RPCs started on the client.",
  32. }), []string{"grpc_type", "grpc_service", "grpc_method"}),
  33. clientHandledCounter: prom.NewCounterVec(
  34. opts.apply(prom.CounterOpts{
  35. Name: "grpc_client_handled_total",
  36. Help: "Total number of RPCs completed by the client, regardless of success or failure.",
  37. }), []string{"grpc_type", "grpc_service", "grpc_method", "grpc_code"}),
  38. clientStreamMsgReceived: prom.NewCounterVec(
  39. opts.apply(prom.CounterOpts{
  40. Name: "grpc_client_msg_received_total",
  41. Help: "Total number of RPC stream messages received by the client.",
  42. }), []string{"grpc_type", "grpc_service", "grpc_method"}),
  43. clientStreamMsgSent: prom.NewCounterVec(
  44. opts.apply(prom.CounterOpts{
  45. Name: "grpc_client_msg_sent_total",
  46. Help: "Total number of gRPC stream messages sent by the client.",
  47. }), []string{"grpc_type", "grpc_service", "grpc_method"}),
  48. clientHandledHistogramEnabled: false,
  49. clientHandledHistogramOpts: prom.HistogramOpts{
  50. Name: "grpc_client_handling_seconds",
  51. Help: "Histogram of response latency (seconds) of the gRPC until it is finished by the application.",
  52. Buckets: prom.DefBuckets,
  53. },
  54. clientHandledHistogram: nil,
  55. }
  56. }
  57. // Describe sends the super-set of all possible descriptors of metrics
  58. // collected by this Collector to the provided channel and returns once
  59. // the last descriptor has been sent.
  60. func (m *ClientMetrics) Describe(ch chan<- *prom.Desc) {
  61. m.clientStartedCounter.Describe(ch)
  62. m.clientHandledCounter.Describe(ch)
  63. m.clientStreamMsgReceived.Describe(ch)
  64. m.clientStreamMsgSent.Describe(ch)
  65. if m.clientHandledHistogramEnabled {
  66. m.clientHandledHistogram.Describe(ch)
  67. }
  68. }
  69. // Collect is called by the Prometheus registry when collecting
  70. // metrics. The implementation sends each collected metric via the
  71. // provided channel and returns once the last metric has been sent.
  72. func (m *ClientMetrics) Collect(ch chan<- prom.Metric) {
  73. m.clientStartedCounter.Collect(ch)
  74. m.clientHandledCounter.Collect(ch)
  75. m.clientStreamMsgReceived.Collect(ch)
  76. m.clientStreamMsgSent.Collect(ch)
  77. if m.clientHandledHistogramEnabled {
  78. m.clientHandledHistogram.Collect(ch)
  79. }
  80. }
  81. // EnableClientHandlingTimeHistogram turns on recording of handling time of RPCs.
  82. // Histogram metrics can be very expensive for Prometheus to retain and query.
  83. func (m *ClientMetrics) EnableClientHandlingTimeHistogram(opts ...HistogramOption) {
  84. for _, o := range opts {
  85. o(&m.clientHandledHistogramOpts)
  86. }
  87. if !m.clientHandledHistogramEnabled {
  88. m.clientHandledHistogram = prom.NewHistogramVec(
  89. m.clientHandledHistogramOpts,
  90. []string{"grpc_type", "grpc_service", "grpc_method"},
  91. )
  92. }
  93. m.clientHandledHistogramEnabled = true
  94. }
  95. // UnaryClientInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Unary RPCs.
  96. func (m *ClientMetrics) UnaryClientInterceptor() func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
  97. return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
  98. monitor := newClientReporter(m, Unary, method)
  99. monitor.SentMessage()
  100. err := invoker(ctx, method, req, reply, cc, opts...)
  101. if err != nil {
  102. monitor.ReceivedMessage()
  103. }
  104. st, _ := status.FromError(err)
  105. monitor.Handled(st.Code())
  106. return err
  107. }
  108. }
  109. // StreamClientInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Streaming RPCs.
  110. func (m *ClientMetrics) StreamClientInterceptor() func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
  111. return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
  112. monitor := newClientReporter(m, clientStreamType(desc), method)
  113. clientStream, err := streamer(ctx, desc, cc, method, opts...)
  114. if err != nil {
  115. st, _ := status.FromError(err)
  116. monitor.Handled(st.Code())
  117. return nil, err
  118. }
  119. return &monitoredClientStream{clientStream, monitor}, nil
  120. }
  121. }
  122. func clientStreamType(desc *grpc.StreamDesc) grpcType {
  123. if desc.ClientStreams && !desc.ServerStreams {
  124. return ClientStream
  125. } else if !desc.ClientStreams && desc.ServerStreams {
  126. return ServerStream
  127. }
  128. return BidiStream
  129. }
  130. // monitoredClientStream wraps grpc.ClientStream allowing each Sent/Recv of message to increment counters.
  131. type monitoredClientStream struct {
  132. grpc.ClientStream
  133. monitor *clientReporter
  134. }
  135. func (s *monitoredClientStream) SendMsg(m interface{}) error {
  136. err := s.ClientStream.SendMsg(m)
  137. if err == nil {
  138. s.monitor.SentMessage()
  139. }
  140. return err
  141. }
  142. func (s *monitoredClientStream) RecvMsg(m interface{}) error {
  143. err := s.ClientStream.RecvMsg(m)
  144. if err == nil {
  145. s.monitor.ReceivedMessage()
  146. } else if err == io.EOF {
  147. s.monitor.Handled(codes.OK)
  148. } else {
  149. st, _ := status.FromError(err)
  150. s.monitor.Handled(st.Code())
  151. }
  152. return err
  153. }