فهرست منبع

Merge pull request #10025 from jingyih/automated-cherry-pick-of-#9990-origin-release-3.2-1534373481

etcdserver: cherry pick of #9990 to release-3.2
Joe Betz 7 سال پیش
والد
کامیت
73b1a2b8db
2فایلهای تغییر یافته به همراه114 افزوده شده و 5 حذف شده
  1. 111 2
      etcdserver/api/v3rpc/interceptor.go
  2. 3 3
      etcdserver/etcdserverpb/raft_internal_stringer.go

+ 111 - 2
etcdserver/api/v3rpc/interceptor.go

@@ -24,10 +24,13 @@ import (
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/raft"
 	"github.com/coreos/etcd/raft"
 
 
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	"github.com/coreos/pkg/capnslog"
 	prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
 	prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
 	"golang.org/x/net/context"
 	"golang.org/x/net/context"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/metadata"
 	"google.golang.org/grpc/metadata"
+	"google.golang.org/grpc/peer"
 )
 )
 
 
 const (
 const (
@@ -40,7 +43,7 @@ type streamsMap struct {
 }
 }
 
 
 func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
 func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
-	return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
+	return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
 		if !api.IsCapabilityEnabled(api.V3rpcCapability) {
 		if !api.IsCapabilityEnabled(api.V3rpcCapability) {
 			return nil, rpctypes.ErrGRPCNotCapable
 			return nil, rpctypes.ErrGRPCNotCapable
 		}
 		}
@@ -54,10 +57,116 @@ func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
 			}
 			}
 		}
 		}
 
 
-		return prometheus.UnaryServerInterceptor(ctx, req, info, handler)
+		return logUnaryInterceptor(ctx, req, info, handler)
+		// interceptors are chained manually during backporting, for better readability refer to PR #9990
 	}
 	}
 }
 }
 
 
+// logUnaryInterceptor is a gRPC server-side interceptor that prints info on incoming requests for debugging purpose
+func logUnaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
+	startTime := time.Now()
+	resp, err := prometheus.UnaryServerInterceptor(ctx, req, info, handler)
+	// interceptors are chained manually during backporting, for better readability refer to PR #9990
+	if plog.LevelAt(capnslog.DEBUG) {
+		defer logUnaryRequestStats(ctx, info, startTime, req, resp)
+	}
+	return resp, err
+}
+
+func logUnaryRequestStats(ctx context.Context, info *grpc.UnaryServerInfo, startTime time.Time, req interface{}, resp interface{}) {
+	duration := time.Since(startTime)
+	remote := "No remote client info."
+	peerInfo, ok := peer.FromContext(ctx)
+	if ok {
+		remote = peerInfo.Addr.String()
+	}
+	var responseType string = info.FullMethod
+	var reqCount, respCount int64
+	var reqSize, respSize int
+	var reqContent string
+	switch _resp := resp.(type) {
+	case *pb.RangeResponse:
+		_req, ok := req.(*pb.RangeRequest)
+		if ok {
+			reqCount = 0
+			reqSize = _req.Size()
+			reqContent = _req.String()
+		}
+		if _resp != nil {
+			respCount = _resp.GetCount()
+			respSize = _resp.Size()
+		}
+	case *pb.PutResponse:
+		_req, ok := req.(*pb.PutRequest)
+		if ok {
+			reqCount = 1
+			reqSize = _req.Size()
+			reqContent = pb.NewLoggablePutRequest(_req).String()
+			// redact value field from request content, see PR #9821
+		}
+		if _resp != nil {
+			respCount = 0
+			respSize = _resp.Size()
+		}
+	case *pb.DeleteRangeResponse:
+		_req, ok := req.(*pb.DeleteRangeRequest)
+		if ok {
+			reqCount = 0
+			reqSize = _req.Size()
+			reqContent = _req.String()
+		}
+		if _resp != nil {
+			respCount = _resp.GetDeleted()
+			respSize = _resp.Size()
+		}
+	case *pb.TxnResponse:
+		_req, ok := req.(*pb.TxnRequest)
+		if ok && _resp != nil {
+			if _resp.GetSucceeded() { // determine the 'actual' count and size of request based on success or failure
+				reqCount = int64(len(_req.GetSuccess()))
+				reqSize = 0
+				for _, r := range _req.GetSuccess() {
+					reqSize += r.Size()
+				}
+			} else {
+				reqCount = int64(len(_req.GetFailure()))
+				reqSize = 0
+				for _, r := range _req.GetFailure() {
+					reqSize += r.Size()
+				}
+			}
+			reqContent = pb.NewLoggableTxnRequest(_req).String()
+			// redact value field from request content, see PR #9821
+		}
+		if _resp != nil {
+			respCount = 0
+			respSize = _resp.Size()
+		}
+	default:
+		reqCount = -1
+		reqSize = -1
+		respCount = -1
+		respSize = -1
+	}
+
+	logGenericRequestStats(startTime, duration, remote, responseType, reqCount, reqSize, respCount, respSize, reqContent)
+}
+
+func logGenericRequestStats(startTime time.Time, duration time.Duration, remote string, responseType string,
+	reqCount int64, reqSize int, respCount int64, respSize int, reqContent string) {
+	plog.Debugf("start time = %v, "+
+		"time spent = %v, "+
+		"remote = %s, "+
+		"response type = %s, "+
+		"request count = %d, "+
+		"request size = %d, "+
+		"response count = %d, "+
+		"response size = %d, "+
+		"request content = %s",
+		startTime, duration, remote, responseType, reqCount, reqSize, respCount, respSize, reqContent,
+	)
+}
+
 func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor {
 func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor {
 	smap := monitorLeader(s)
 	smap := monitorLeader(s)
 
 

+ 3 - 3
etcdserver/etcdserverpb/raft_internal_stringer.go

@@ -59,7 +59,7 @@ func (as *InternalRaftStringer) String() string {
 	case as.Request.Put != nil:
 	case as.Request.Put != nil:
 		return fmt.Sprintf("header:<%s> put:<%s>",
 		return fmt.Sprintf("header:<%s> put:<%s>",
 			as.Request.Header.String(),
 			as.Request.Header.String(),
-			newLoggablePutRequest(as.Request.Put).String(),
+			NewLoggablePutRequest(as.Request.Put).String(),
 		)
 		)
 	case as.Request.Txn != nil:
 	case as.Request.Txn != nil:
 		return fmt.Sprintf("header:<%s> txn:<%s>",
 		return fmt.Sprintf("header:<%s> txn:<%s>",
@@ -121,7 +121,7 @@ func newLoggableRequestOp(op *RequestOp) *requestOpStringer {
 func (as *requestOpStringer) String() string {
 func (as *requestOpStringer) String() string {
 	switch op := as.Op.Request.(type) {
 	switch op := as.Op.Request.(type) {
 	case *RequestOp_RequestPut:
 	case *RequestOp_RequestPut:
-		return fmt.Sprintf("request_put:<%s>", newLoggablePutRequest(op.RequestPut).String())
+		return fmt.Sprintf("request_put:<%s>", NewLoggablePutRequest(op.RequestPut).String())
 	default:
 	default:
 		// nothing to redact
 		// nothing to redact
 	}
 	}
@@ -163,7 +163,7 @@ type loggablePutRequest struct {
 	IgnoreLease bool   `protobuf:"varint,6,opt,name=ignore_lease,proto3"`
 	IgnoreLease bool   `protobuf:"varint,6,opt,name=ignore_lease,proto3"`
 }
 }
 
 
-func newLoggablePutRequest(request *PutRequest) *loggablePutRequest {
+func NewLoggablePutRequest(request *PutRequest) *loggablePutRequest {
 	return &loggablePutRequest{
 	return &loggablePutRequest{
 		request.Key,
 		request.Key,
 		len(request.Value),
 		len(request.Value),