client.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. // Copyright 2016 Michal Witkowski. All Rights Reserved.
  2. // See LICENSE for licensing terms.
  3. // gRPC Prometheus monitoring interceptors for client-side gRPC.
  4. package grpc_prometheus
  5. import (
  6. "io"
  7. "golang.org/x/net/context"
  8. "google.golang.org/grpc"
  9. "google.golang.org/grpc/codes"
  10. )
  11. // UnaryClientInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Unary RPCs.
  12. func UnaryClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
  13. monitor := newClientReporter(Unary, method)
  14. monitor.SentMessage()
  15. err := invoker(ctx, method, req, reply, cc, opts...)
  16. if err != nil {
  17. monitor.ReceivedMessage()
  18. }
  19. monitor.Handled(grpc.Code(err))
  20. return err
  21. }
  22. // StreamServerInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Streaming RPCs.
  23. func StreamClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
  24. monitor := newClientReporter(clientStreamType(desc), method)
  25. clientStream, err := streamer(ctx, desc, cc, method, opts...)
  26. if err != nil {
  27. monitor.Handled(grpc.Code(err))
  28. return nil, err
  29. }
  30. return &monitoredClientStream{clientStream, monitor}, nil
  31. }
  32. func clientStreamType(desc *grpc.StreamDesc) grpcType {
  33. if desc.ClientStreams && !desc.ServerStreams {
  34. return ClientStream
  35. } else if !desc.ClientStreams && desc.ServerStreams {
  36. return ServerStream
  37. }
  38. return BidiStream
  39. }
  40. // monitoredClientStream wraps grpc.ClientStream allowing each Sent/Recv of message to increment counters.
  41. type monitoredClientStream struct {
  42. grpc.ClientStream
  43. monitor *clientReporter
  44. }
  45. func (s *monitoredClientStream) SendMsg(m interface{}) error {
  46. err := s.ClientStream.SendMsg(m)
  47. if err == nil {
  48. s.monitor.SentMessage()
  49. }
  50. return err
  51. }
  52. func (s *monitoredClientStream) RecvMsg(m interface{}) error {
  53. err := s.ClientStream.RecvMsg(m)
  54. if err == nil {
  55. s.monitor.ReceivedMessage()
  56. } else if err == io.EOF {
  57. s.monitor.Handled(codes.OK)
  58. } else {
  59. s.monitor.Handled(grpc.Code(err))
  60. }
  61. return err
  62. }