server.go 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. // Copyright 2016 Michal Witkowski. All Rights Reserved.
  2. // See LICENSE for licensing terms.
  3. // gRPC Prometheus monitoring interceptors for server-side gRPC.
  4. package grpc_prometheus
  5. import (
  6. "golang.org/x/net/context"
  7. "google.golang.org/grpc"
  8. )
  9. // PreregisterServices takes a gRPC server and pre-initializes all counters to 0.
  10. // This allows for easier monitoring in Prometheus (no missing metrics), and should be called *after* all services have
  11. // been registered with the server.
  12. func Register(server *grpc.Server) {
  13. serviceInfo := server.GetServiceInfo()
  14. for serviceName, info := range serviceInfo {
  15. for _, mInfo := range info.Methods {
  16. preRegisterMethod(serviceName, &mInfo)
  17. }
  18. }
  19. }
  20. // UnaryServerInterceptor is a gRPC server-side interceptor that provides Prometheus monitoring for Unary RPCs.
  21. func UnaryServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
  22. monitor := newServerReporter(Unary, info.FullMethod)
  23. monitor.ReceivedMessage()
  24. resp, err := handler(ctx, req)
  25. monitor.Handled(grpc.Code(err))
  26. if err == nil {
  27. monitor.SentMessage()
  28. }
  29. return resp, err
  30. }
  31. // StreamServerInterceptor is a gRPC server-side interceptor that provides Prometheus monitoring for Streaming RPCs.
  32. func StreamServerInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
  33. monitor := newServerReporter(streamRpcType(info), info.FullMethod)
  34. err := handler(srv, &monitoredServerStream{ss, monitor})
  35. monitor.Handled(grpc.Code(err))
  36. return err
  37. }
  38. func streamRpcType(info *grpc.StreamServerInfo) grpcType {
  39. if info.IsClientStream && !info.IsServerStream {
  40. return ClientStream
  41. } else if !info.IsClientStream && info.IsServerStream {
  42. return ServerStream
  43. }
  44. return BidiStream
  45. }
  46. // monitoredStream wraps grpc.ServerStream allowing each Sent/Recv of message to increment counters.
  47. type monitoredServerStream struct {
  48. grpc.ServerStream
  49. monitor *serverReporter
  50. }
  51. func (s *monitoredServerStream) SendMsg(m interface{}) error {
  52. err := s.ServerStream.SendMsg(m)
  53. if err == nil {
  54. s.monitor.SentMessage()
  55. }
  56. return err
  57. }
  58. func (s *monitoredServerStream) RecvMsg(m interface{}) error {
  59. err := s.ServerStream.RecvMsg(m)
  60. if err == nil {
  61. s.monitor.ReceivedMessage()
  62. }
  63. return err
  64. }