123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172 |
- // Copyright 2016 Michal Witkowski. All Rights Reserved.
- // See LICENSE for licensing terms.
- // gRPC Prometheus monitoring interceptors for client-side gRPC.
- package grpc_prometheus
- import (
- "io"
- "golang.org/x/net/context"
- "google.golang.org/grpc"
- "google.golang.org/grpc/codes"
- )
- // UnaryClientInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Unary RPCs.
- func UnaryClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
- monitor := newClientReporter(Unary, method)
- monitor.SentMessage()
- err := invoker(ctx, method, req, reply, cc, opts...)
- if err != nil {
- monitor.ReceivedMessage()
- }
- monitor.Handled(grpc.Code(err))
- return err
- }
- // StreamServerInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Streaming RPCs.
- func StreamClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
- monitor := newClientReporter(clientStreamType(desc), method)
- clientStream, err := streamer(ctx, desc, cc, method, opts...)
- if err != nil {
- monitor.Handled(grpc.Code(err))
- return nil, err
- }
- return &monitoredClientStream{clientStream, monitor}, nil
- }
- func clientStreamType(desc *grpc.StreamDesc) grpcType {
- if desc.ClientStreams && !desc.ServerStreams {
- return ClientStream
- } else if !desc.ClientStreams && desc.ServerStreams {
- return ServerStream
- }
- return BidiStream
- }
- // monitoredClientStream wraps grpc.ClientStream allowing each Sent/Recv of message to increment counters.
- type monitoredClientStream struct {
- grpc.ClientStream
- monitor *clientReporter
- }
- func (s *monitoredClientStream) SendMsg(m interface{}) error {
- err := s.ClientStream.SendMsg(m)
- if err == nil {
- s.monitor.SentMessage()
- }
- return err
- }
- func (s *monitoredClientStream) RecvMsg(m interface{}) error {
- err := s.ClientStream.RecvMsg(m)
- if err == nil {
- s.monitor.ReceivedMessage()
- } else if err == io.EOF {
- s.monitor.Handled(codes.OK)
- } else {
- s.monitor.Handled(grpc.Code(err))
- }
- return err
- }
|