|
@@ -25,9 +25,11 @@ import (
|
|
|
"github.com/coreos/etcd/pkg/types"
|
|
"github.com/coreos/etcd/pkg/types"
|
|
|
"github.com/coreos/etcd/raft"
|
|
"github.com/coreos/etcd/raft"
|
|
|
|
|
|
|
|
- prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
|
|
|
|
|
|
|
+ pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
|
|
|
|
+ "go.uber.org/zap"
|
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc"
|
|
|
"google.golang.org/grpc/metadata"
|
|
"google.golang.org/grpc/metadata"
|
|
|
|
|
+ "google.golang.org/grpc/peer"
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
const (
|
|
const (
|
|
@@ -40,7 +42,7 @@ type streamsMap struct {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
|
|
func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
|
|
|
- return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
|
|
|
|
|
|
|
+ return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
|
|
|
if !api.IsCapabilityEnabled(api.V3rpcCapability) {
|
|
if !api.IsCapabilityEnabled(api.V3rpcCapability) {
|
|
|
return nil, rpctypes.ErrGRPCNotCapable
|
|
return nil, rpctypes.ErrGRPCNotCapable
|
|
|
}
|
|
}
|
|
@@ -54,7 +56,126 @@ func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- return prometheus.UnaryServerInterceptor(ctx, req, info, handler)
|
|
|
|
|
|
|
+ resp, err := handler(ctx, req)
|
|
|
|
|
+ return resp, err
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func newLogUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
|
|
|
|
|
+ return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
|
|
|
|
|
+ startTime := time.Now()
|
|
|
|
|
+ resp, err := handler(ctx, req)
|
|
|
|
|
+ defer logUnaryRequestStats(ctx, s.Logger(), info, startTime, req, resp)
|
|
|
|
|
+ return resp, err
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func logUnaryRequestStats(ctx context.Context, lg *zap.Logger, info *grpc.UnaryServerInfo, startTime time.Time, req interface{}, resp interface{}) {
|
|
|
|
|
+ duration := time.Since(startTime)
|
|
|
|
|
+ remote := "No remote client info."
|
|
|
|
|
+ peerInfo, ok := peer.FromContext(ctx)
|
|
|
|
|
+ if ok {
|
|
|
|
|
+ remote = peerInfo.Addr.String()
|
|
|
|
|
+ }
|
|
|
|
|
+ var responseType string = info.FullMethod
|
|
|
|
|
+ var reqCount, respCount int64 = 0, 0
|
|
|
|
|
+ var reqSize, respSize int = 0, 0
|
|
|
|
|
+ var reqContent string
|
|
|
|
|
+ switch _resp := resp.(type) {
|
|
|
|
|
+ case *pb.RangeResponse:
|
|
|
|
|
+ _req, ok := req.(*pb.RangeRequest)
|
|
|
|
|
+ if ok {
|
|
|
|
|
+ reqCount = 0
|
|
|
|
|
+ reqSize = _req.Size()
|
|
|
|
|
+ reqContent = _req.String()
|
|
|
|
|
+ }
|
|
|
|
|
+ if _resp != nil {
|
|
|
|
|
+ respCount = _resp.GetCount()
|
|
|
|
|
+ respSize = _resp.Size()
|
|
|
|
|
+ }
|
|
|
|
|
+ case *pb.PutResponse:
|
|
|
|
|
+ _req, ok := req.(*pb.PutRequest)
|
|
|
|
|
+ if ok {
|
|
|
|
|
+ reqCount = 1
|
|
|
|
|
+ reqSize = _req.Size()
|
|
|
|
|
+ reqContent = pb.NewLoggablePutRequest(_req).String()
|
|
|
|
|
+ // redact value field from request content, see PR #9821
|
|
|
|
|
+ }
|
|
|
|
|
+ if _resp != nil {
|
|
|
|
|
+ respCount = 0
|
|
|
|
|
+ respSize = _resp.Size()
|
|
|
|
|
+ }
|
|
|
|
|
+ case *pb.DeleteRangeResponse:
|
|
|
|
|
+ _req, ok := req.(*pb.DeleteRangeRequest)
|
|
|
|
|
+ if ok {
|
|
|
|
|
+ reqCount = 0
|
|
|
|
|
+ reqSize = _req.Size()
|
|
|
|
|
+ reqContent = _req.String()
|
|
|
|
|
+ }
|
|
|
|
|
+ if _resp != nil {
|
|
|
|
|
+ respCount = _resp.GetDeleted()
|
|
|
|
|
+ respSize = _resp.Size()
|
|
|
|
|
+ }
|
|
|
|
|
+ case *pb.TxnResponse:
|
|
|
|
|
+ _req, ok := req.(*pb.TxnRequest)
|
|
|
|
|
+ if ok && _resp != nil {
|
|
|
|
|
+ if _resp.GetSucceeded() { // determine the 'actual' count and size of request based on success or failure
|
|
|
|
|
+ reqCount = int64(len(_req.GetSuccess()))
|
|
|
|
|
+ reqSize = 0
|
|
|
|
|
+ for _, r := range _req.GetSuccess() {
|
|
|
|
|
+ reqSize += r.Size()
|
|
|
|
|
+ }
|
|
|
|
|
+ } else {
|
|
|
|
|
+ reqCount = int64(len(_req.GetFailure()))
|
|
|
|
|
+ reqSize = 0
|
|
|
|
|
+ for _, r := range _req.GetFailure() {
|
|
|
|
|
+ reqSize += r.Size()
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ reqContent = pb.NewLoggableTxnRequest(_req).String()
|
|
|
|
|
+ // redact value field from request content, see PR #9821
|
|
|
|
|
+ }
|
|
|
|
|
+ if _resp != nil {
|
|
|
|
|
+ respCount = 0
|
|
|
|
|
+ respSize = _resp.Size()
|
|
|
|
|
+ }
|
|
|
|
|
+ default:
|
|
|
|
|
+ reqCount = -1
|
|
|
|
|
+ reqSize = -1
|
|
|
|
|
+ respCount = -1
|
|
|
|
|
+ respSize = -1
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ logGenericRequestStats(lg, startTime, duration, remote, responseType, reqCount, reqSize, respCount, respSize, reqContent)
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func logGenericRequestStats(lg *zap.Logger, startTime time.Time, duration time.Duration, remote string, responseType string,
|
|
|
|
|
+ reqCount int64, reqSize int, respCount int64, respSize int, reqContent string) {
|
|
|
|
|
+ if lg == nil {
|
|
|
|
|
+ plog.Debugf(
|
|
|
|
|
+ "start time = %v, " +
|
|
|
|
|
+ "time spent = %v, " +
|
|
|
|
|
+ "remote = %s, " +
|
|
|
|
|
+ "response type = %s, " +
|
|
|
|
|
+ "request count = %d, " +
|
|
|
|
|
+ "request size = %d, " +
|
|
|
|
|
+ "response count = %d, " +
|
|
|
|
|
+ "response size = %d, " +
|
|
|
|
|
+ "request content = %s",
|
|
|
|
|
+ startTime, duration, remote, responseType, reqCount, reqSize, respCount, respSize, reqContent,
|
|
|
|
|
+ )
|
|
|
|
|
+ } else {
|
|
|
|
|
+ lg.Debug("request stats",
|
|
|
|
|
+ zap.Time("start time", startTime),
|
|
|
|
|
+ zap.Duration("time spent", duration),
|
|
|
|
|
+ zap.String("remote", remote),
|
|
|
|
|
+ zap.String("response type", responseType),
|
|
|
|
|
+ zap.Int64("request count", reqCount),
|
|
|
|
|
+ zap.Int("request size", reqSize),
|
|
|
|
|
+ zap.Int64("response count", respCount),
|
|
|
|
|
+ zap.Int("response size", respSize),
|
|
|
|
|
+ zap.String("request content", reqContent),
|
|
|
|
|
+ )
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -90,7 +211,8 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- return prometheus.StreamServerInterceptor(srv, ss, info, handler)
|
|
|
|
|
|
|
+ err := handler(srv, ss)
|
|
|
|
|
+ return err
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|