1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374 |
- // Copyright 2016 Michal Witkowski. All Rights Reserved.
- // See LICENSE for licensing terms.
- // gRPC Prometheus monitoring interceptors for server-side gRPC.
- package grpc_prometheus
- import (
- "golang.org/x/net/context"
- "google.golang.org/grpc"
- )
- // PreregisterServices takes a gRPC server and pre-initializes all counters to 0.
- // This allows for easier monitoring in Prometheus (no missing metrics), and should be called *after* all services have
- // been registered with the server.
- func Register(server *grpc.Server) {
- serviceInfo := server.GetServiceInfo()
- for serviceName, info := range serviceInfo {
- for _, mInfo := range info.Methods {
- preRegisterMethod(serviceName, &mInfo)
- }
- }
- }
- // UnaryServerInterceptor is a gRPC server-side interceptor that provides Prometheus monitoring for Unary RPCs.
- func UnaryServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
- monitor := newServerReporter(Unary, info.FullMethod)
- monitor.ReceivedMessage()
- resp, err := handler(ctx, req)
- monitor.Handled(grpc.Code(err))
- if err == nil {
- monitor.SentMessage()
- }
- return resp, err
- }
- // StreamServerInterceptor is a gRPC server-side interceptor that provides Prometheus monitoring for Streaming RPCs.
- func StreamServerInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
- monitor := newServerReporter(streamRpcType(info), info.FullMethod)
- err := handler(srv, &monitoredServerStream{ss, monitor})
- monitor.Handled(grpc.Code(err))
- return err
- }
- func streamRpcType(info *grpc.StreamServerInfo) grpcType {
- if info.IsClientStream && !info.IsServerStream {
- return ClientStream
- } else if !info.IsClientStream && info.IsServerStream {
- return ServerStream
- }
- return BidiStream
- }
- // monitoredStream wraps grpc.ServerStream allowing each Sent/Recv of message to increment counters.
- type monitoredServerStream struct {
- grpc.ServerStream
- monitor *serverReporter
- }
- func (s *monitoredServerStream) SendMsg(m interface{}) error {
- err := s.ServerStream.SendMsg(m)
- if err == nil {
- s.monitor.SentMessage()
- }
- return err
- }
- func (s *monitoredServerStream) RecvMsg(m interface{}) error {
- err := s.ServerStream.RecvMsg(m)
- if err == nil {
- s.monitor.ReceivedMessage()
- }
- return err
- }
|