Просмотр исходного кода

Merge pull request #10004 from jingyih/automated-cherry-pick-of-#9990-origin-release-3.3

Automated cherry pick of #9990
Gyuho Lee 7 лет назад
Родитель
Сommit
f8fc923fc0

+ 27 - 0
bill-of-materials.json

@@ -336,6 +336,33 @@
 			}
 		]
 	},
+	{
+		"project": "go.uber.org/atomic",
+		"licenses": [
+			{
+				"type": "MIT License",
+				"confidence": 0.9891304347826086
+			}
+		]
+	},
+	{
+		"project": "go.uber.org/multierr",
+		"licenses": [
+			{
+				"type": "MIT License",
+				"confidence": 0.9891304347826086
+			}
+		]
+	},
+	{
+		"project": "go.uber.org/zap",
+		"licenses": [
+			{
+				"type": "MIT License",
+				"confidence": 0.9891304347826086
+			}
+		]
+	},
 	{
 		"project": "golang.org/x/crypto",
 		"licenses": [

+ 10 - 2
etcdserver/api/v3rpc/grpc.go

@@ -24,6 +24,7 @@ import (
 	"github.com/coreos/etcd/etcdserver"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 
+	"github.com/grpc-ecosystem/go-grpc-middleware"
 	"github.com/grpc-ecosystem/go-grpc-prometheus"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/credentials"
@@ -47,8 +48,15 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config, gopts ...grpc.ServerOptio
 	if tls != nil {
 		opts = append(opts, grpc.Creds(credentials.NewTLS(tls)))
 	}
-	opts = append(opts, grpc.UnaryInterceptor(newUnaryInterceptor(s)))
-	opts = append(opts, grpc.StreamInterceptor(newStreamInterceptor(s)))
+	opts = append(opts, grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
+		newLogUnaryInterceptor(s),
+		newUnaryInterceptor(s),
+		grpc_prometheus.UnaryServerInterceptor,
+	)))
+	opts = append(opts, grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
+		newStreamInterceptor(s),
+		grpc_prometheus.StreamServerInterceptor,
+	)))
 	opts = append(opts, grpc.MaxRecvMsgSize(int(s.Cfg.MaxRequestBytes+grpcOverheadBytes)))
 	opts = append(opts, grpc.MaxSendMsgSize(maxSendBytes))
 	opts = append(opts, grpc.MaxConcurrentStreams(maxStreams))

+ 123 - 4
etcdserver/api/v3rpc/interceptor.go

@@ -25,9 +25,11 @@ import (
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/raft"
 
-	prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	"go.uber.org/zap"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/metadata"
+	"google.golang.org/grpc/peer"
 )
 
 const (
@@ -40,7 +42,7 @@ type streamsMap struct {
 }
 
 func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
-	return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
+	return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
 		if !api.IsCapabilityEnabled(api.V3rpcCapability) {
 			return nil, rpctypes.ErrGRPCNotCapable
 		}
@@ -54,7 +56,124 @@ func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
 			}
 		}
 
-		return prometheus.UnaryServerInterceptor(ctx, req, info, handler)
+		return handler(ctx, req)
+	}
+}
+
+func newLogUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
+	return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
+		startTime := time.Now()
+		resp, err := handler(ctx, req)
+		defer logUnaryRequestStats(ctx, nil, info, startTime, req, resp)
+		return resp, err
+	}
+}
+
+func logUnaryRequestStats(ctx context.Context, lg *zap.Logger, info *grpc.UnaryServerInfo, startTime time.Time, req interface{}, resp interface{}) {
+	duration := time.Since(startTime)
+	remote := "No remote client info."
+	peerInfo, ok := peer.FromContext(ctx)
+	if ok {
+		remote = peerInfo.Addr.String()
+	}
+	var responseType string = info.FullMethod
+	var reqCount, respCount int64
+	var reqSize, respSize int
+	var reqContent string
+	switch _resp := resp.(type) {
+	case *pb.RangeResponse:
+		_req, ok := req.(*pb.RangeRequest)
+		if ok {
+			reqCount = 0
+			reqSize = _req.Size()
+			reqContent = _req.String()
+		}
+		if _resp != nil {
+			respCount = _resp.GetCount()
+			respSize = _resp.Size()
+		}
+	case *pb.PutResponse:
+		_req, ok := req.(*pb.PutRequest)
+		if ok {
+			reqCount = 1
+			reqSize = _req.Size()
+			reqContent = pb.NewLoggablePutRequest(_req).String()
+			// redact value field from request content, see PR #9821
+		}
+		if _resp != nil {
+			respCount = 0
+			respSize = _resp.Size()
+		}
+	case *pb.DeleteRangeResponse:
+		_req, ok := req.(*pb.DeleteRangeRequest)
+		if ok {
+			reqCount = 0
+			reqSize = _req.Size()
+			reqContent = _req.String()
+		}
+		if _resp != nil {
+			respCount = _resp.GetDeleted()
+			respSize = _resp.Size()
+		}
+	case *pb.TxnResponse:
+		_req, ok := req.(*pb.TxnRequest)
+		if ok && _resp != nil {
+			if _resp.GetSucceeded() { // determine the 'actual' count and size of request based on success or failure
+				reqCount = int64(len(_req.GetSuccess()))
+				reqSize = 0
+				for _, r := range _req.GetSuccess() {
+					reqSize += r.Size()
+				}
+			} else {
+				reqCount = int64(len(_req.GetFailure()))
+				reqSize = 0
+				for _, r := range _req.GetFailure() {
+					reqSize += r.Size()
+				}
+			}
+			reqContent = pb.NewLoggableTxnRequest(_req).String()
+			// redact value field from request content, see PR #9821
+		}
+		if _resp != nil {
+			respCount = 0
+			respSize = _resp.Size()
+		}
+	default:
+		reqCount = -1
+		reqSize = -1
+		respCount = -1
+		respSize = -1
+	}
+
+	logGenericRequestStats(lg, startTime, duration, remote, responseType, reqCount, reqSize, respCount, respSize, reqContent)
+}
+
+func logGenericRequestStats(lg *zap.Logger, startTime time.Time, duration time.Duration, remote string, responseType string,
+	reqCount int64, reqSize int, respCount int64, respSize int, reqContent string) {
+	if lg == nil {
+		plog.Debugf("start time = %v, "+
+			"time spent = %v, "+
+			"remote = %s, "+
+			"response type = %s, "+
+			"request count = %d, "+
+			"request size = %d, "+
+			"response count = %d, "+
+			"response size = %d, "+
+			"request content = %s",
+			startTime, duration, remote, responseType, reqCount, reqSize, respCount, respSize, reqContent,
+		)
+	} else {
+		lg.Debug("request stats",
+			zap.Time("start time", startTime),
+			zap.Duration("time spent", duration),
+			zap.String("remote", remote),
+			zap.String("response type", responseType),
+			zap.Int64("request count", reqCount),
+			zap.Int("request size", reqSize),
+			zap.Int64("response count", respCount),
+			zap.Int("response size", respSize),
+			zap.String("request content", reqContent),
+		)
 	}
 }
 
@@ -90,7 +209,7 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor
 			}
 		}
 
-		return prometheus.StreamServerInterceptor(srv, ss, info, handler)
+		return handler(srv, ss)
 	}
 }
 

+ 3 - 3
etcdserver/etcdserverpb/raft_internal_stringer.go

@@ -59,7 +59,7 @@ func (as *InternalRaftStringer) String() string {
 	case as.Request.Put != nil:
 		return fmt.Sprintf("header:<%s> put:<%s>",
 			as.Request.Header.String(),
-			newLoggablePutRequest(as.Request.Put).String(),
+			NewLoggablePutRequest(as.Request.Put).String(),
 		)
 	case as.Request.Txn != nil:
 		return fmt.Sprintf("header:<%s> txn:<%s>",
@@ -121,7 +121,7 @@ func newLoggableRequestOp(op *RequestOp) *requestOpStringer {
 func (as *requestOpStringer) String() string {
 	switch op := as.Op.Request.(type) {
 	case *RequestOp_RequestPut:
-		return fmt.Sprintf("request_put:<%s>", newLoggablePutRequest(op.RequestPut).String())
+		return fmt.Sprintf("request_put:<%s>", NewLoggablePutRequest(op.RequestPut).String())
 	case *RequestOp_RequestTxn:
 		return fmt.Sprintf("request_txn:<%s>", NewLoggableTxnRequest(op.RequestTxn).String())
 	default:
@@ -167,7 +167,7 @@ type loggablePutRequest struct {
 	IgnoreLease bool   `protobuf:"varint,6,opt,name=ignore_lease,proto3"`
 }
 
-func newLoggablePutRequest(request *PutRequest) *loggablePutRequest {
+func NewLoggablePutRequest(request *PutRequest) *loggablePutRequest {
 	return &loggablePutRequest{
 		request.Key,
 		len(request.Value),

+ 183 - 0
vendor/github.com/grpc-ecosystem/go-grpc-middleware/chain.go

@@ -0,0 +1,183 @@
+// Copyright 2016 Michal Witkowski. All Rights Reserved.
+// See LICENSE for licensing terms.
+
+// gRPC Server Interceptor chaining middleware.
+
+package grpc_middleware
+
+import (
+	"golang.org/x/net/context"
+	"google.golang.org/grpc"
+)
+
+// ChainUnaryServer creates a single interceptor out of a chain of many interceptors.
+//
+// Execution is done in left-to-right order, including passing of context.
+// For example ChainUnaryServer(one, two, three) will execute one before two before three, and three
+// will see context changes of one and two.
+func ChainUnaryServer(interceptors ...grpc.UnaryServerInterceptor) grpc.UnaryServerInterceptor {
+	n := len(interceptors)
+
+	if n > 1 {
+		lastI := n - 1
+		return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
+			var (
+				chainHandler grpc.UnaryHandler
+				curI         int
+			)
+
+			chainHandler = func(currentCtx context.Context, currentReq interface{}) (interface{}, error) {
+				if curI == lastI {
+					return handler(currentCtx, currentReq)
+				}
+				curI++
+				resp, err := interceptors[curI](currentCtx, currentReq, info, chainHandler)
+				curI--
+				return resp, err
+			}
+
+			return interceptors[0](ctx, req, info, chainHandler)
+		}
+	}
+
+	if n == 1 {
+		return interceptors[0]
+	}
+
+	// n == 0; Dummy interceptor maintained for backward compatibility to avoid returning nil.
+	return func(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
+		return handler(ctx, req)
+	}
+}
+
+// ChainStreamServer creates a single interceptor out of a chain of many interceptors.
+//
+// Execution is done in left-to-right order, including passing of context.
+// For example ChainUnaryServer(one, two, three) will execute one before two before three.
+// If you want to pass context between interceptors, use WrapServerStream.
+func ChainStreamServer(interceptors ...grpc.StreamServerInterceptor) grpc.StreamServerInterceptor {
+	n := len(interceptors)
+
+	if n > 1 {
+		lastI := n - 1
+		return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
+			var (
+				chainHandler grpc.StreamHandler
+				curI         int
+			)
+
+			chainHandler = func(currentSrv interface{}, currentStream grpc.ServerStream) error {
+				if curI == lastI {
+					return handler(currentSrv, currentStream)
+				}
+				curI++
+				err := interceptors[curI](currentSrv, currentStream, info, chainHandler)
+				curI--
+				return err
+			}
+
+			return interceptors[0](srv, stream, info, chainHandler)
+		}
+	}
+
+	if n == 1 {
+		return interceptors[0]
+	}
+
+	// n == 0; Dummy interceptor maintained for backward compatibility to avoid returning nil.
+	return func(srv interface{}, stream grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
+		return handler(srv, stream)
+	}
+}
+
+// ChainUnaryClient creates a single interceptor out of a chain of many interceptors.
+//
+// Execution is done in left-to-right order, including passing of context.
+// For example ChainUnaryClient(one, two, three) will execute one before two before three.
+func ChainUnaryClient(interceptors ...grpc.UnaryClientInterceptor) grpc.UnaryClientInterceptor {
+	n := len(interceptors)
+
+	if n > 1 {
+		lastI := n - 1
+		return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
+			var (
+				chainHandler grpc.UnaryInvoker
+				curI         int
+			)
+
+			chainHandler = func(currentCtx context.Context, currentMethod string, currentReq, currentRepl interface{}, currentConn *grpc.ClientConn, currentOpts ...grpc.CallOption) error {
+				if curI == lastI {
+					return invoker(currentCtx, currentMethod, currentReq, currentRepl, currentConn, currentOpts...)
+				}
+				curI++
+				err := interceptors[curI](currentCtx, currentMethod, currentReq, currentRepl, currentConn, chainHandler, currentOpts...)
+				curI--
+				return err
+			}
+
+			return interceptors[0](ctx, method, req, reply, cc, chainHandler, opts...)
+		}
+	}
+
+	if n == 1 {
+		return interceptors[0]
+	}
+
+	// n == 0; Dummy interceptor maintained for backward compatibility to avoid returning nil.
+	return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
+		return invoker(ctx, method, req, reply, cc, opts...)
+	}
+}
+
+// ChainStreamClient creates a single interceptor out of a chain of many interceptors.
+//
+// Execution is done in left-to-right order, including passing of context.
+// For example ChainStreamClient(one, two, three) will execute one before two before three.
+func ChainStreamClient(interceptors ...grpc.StreamClientInterceptor) grpc.StreamClientInterceptor {
+	n := len(interceptors)
+
+	if n > 1 {
+		lastI := n - 1
+		return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
+			var (
+				chainHandler grpc.Streamer
+				curI         int
+			)
+
+			chainHandler = func(currentCtx context.Context, currentDesc *grpc.StreamDesc, currentConn *grpc.ClientConn, currentMethod string, currentOpts ...grpc.CallOption) (grpc.ClientStream, error) {
+				if curI == lastI {
+					return streamer(currentCtx, currentDesc, currentConn, currentMethod, currentOpts...)
+				}
+				curI++
+				stream, err := interceptors[curI](currentCtx, currentDesc, currentConn, currentMethod, chainHandler, currentOpts...)
+				curI--
+				return stream, err
+			}
+
+			return interceptors[0](ctx, desc, cc, method, chainHandler, opts...)
+		}
+	}
+
+	if n == 1 {
+		return interceptors[0]
+	}
+
+	// n == 0; Dummy interceptor maintained for backward compatibility to avoid returning nil.
+	return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
+		return streamer(ctx, desc, cc, method, opts...)
+	}
+}
+
+// Chain creates a single interceptor out of a chain of many interceptors.
+//
+// WithUnaryServerChain is a grpc.Server config option that accepts multiple unary interceptors.
+// Basically syntactic sugar.
+func WithUnaryServerChain(interceptors ...grpc.UnaryServerInterceptor) grpc.ServerOption {
+	return grpc.UnaryInterceptor(ChainUnaryServer(interceptors...))
+}
+
+// WithStreamServerChain is a grpc.Server config option that accepts multiple stream interceptors.
+// Basically syntactic sugar.
+func WithStreamServerChain(interceptors ...grpc.StreamServerInterceptor) grpc.ServerOption {
+	return grpc.StreamInterceptor(ChainStreamServer(interceptors...))
+}

+ 69 - 0
vendor/github.com/grpc-ecosystem/go-grpc-middleware/doc.go

@@ -0,0 +1,69 @@
+// Copyright 2016 Michal Witkowski. All Rights Reserved.
+// See LICENSE for licensing terms.
+
+/*
+`grpc_middleware` is a collection of gRPC middleware packages: interceptors, helpers and tools.
+
+Middleware
+
+gRPC is a fantastic RPC middleware, which sees a lot of adoption in the Golang world. However, the
+upstream gRPC codebase is relatively bare bones.
+
+This package, and most of its child packages provides commonly needed middleware for gRPC:
+client-side interceptors for retires, server-side interceptors for input validation and auth,
+functions for chaining said interceptors, metadata convenience methods and more.
+
+Chaining
+
+By default, gRPC doesn't allow one to have more than one interceptor either on the client nor on
+the server side. `grpc_middleware` provides convenient chaining methods
+
+Simple way of turning a multiple interceptors into a single interceptor. Here's an example for
+server chaining:
+
+	myServer := grpc.NewServer(
+	    grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(loggingStream, monitoringStream, authStream)),
+	    grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(loggingUnary, monitoringUnary, authUnary),
+	)
+
+These interceptors will be executed from left to right: logging, monitoring and auth.
+
+Here's an example for client side chaining:
+
+	clientConn, err = grpc.Dial(
+	    address,
+	        grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(monitoringClientUnary, retryUnary)),
+	        grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(monitoringClientStream, retryStream)),
+	)
+	client = pb_testproto.NewTestServiceClient(clientConn)
+	resp, err := client.PingEmpty(s.ctx, &myservice.Request{Msg: "hello"})
+
+These interceptors will be executed from left to right: monitoring and then retry logic.
+
+The retry interceptor will call every interceptor that follows it whenever when a retry happens.
+
+Writing Your Own
+
+Implementing your own interceptor is pretty trivial: there are interfaces for that. But the interesting
+bit exposing common data to handlers (and other middleware), similarly to HTTP Middleware design.
+For example, you may want to pass the identity of the caller from the auth interceptor all the way
+to the handling function.
+
+For example, a client side interceptor example for auth looks like:
+
+	func FakeAuthUnaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
+	   newCtx := context.WithValue(ctx, "user_id", "john@example.com")
+	   return handler(newCtx, req)
+	}
+
+Unfortunately, it's not as easy for streaming RPCs. These have the `context.Context` embedded within
+the `grpc.ServerStream` object. To pass values through context, a wrapper (`WrappedServerStream`) is
+needed. For example:
+
+	func FakeAuthStreamingInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
+	   newStream := grpc_middleware.WrapServerStream(stream)
+	   newStream.WrappedContext = context.WithValue(ctx, "user_id", "john@example.com")
+	   return handler(srv, stream)
+	}
+*/
+package grpc_middleware

+ 29 - 0
vendor/github.com/grpc-ecosystem/go-grpc-middleware/wrappers.go

@@ -0,0 +1,29 @@
+// Copyright 2016 Michal Witkowski. All Rights Reserved.
+// See LICENSE for licensing terms.
+
+package grpc_middleware
+
+import (
+	"golang.org/x/net/context"
+	"google.golang.org/grpc"
+)
+
+// WrappedServerStream is a thin wrapper around grpc.ServerStream that allows modifying context.
+type WrappedServerStream struct {
+	grpc.ServerStream
+	// WrappedContext is the wrapper's own Context. You can assign it.
+	WrappedContext context.Context
+}
+
+// Context returns the wrapper's WrappedContext, overwriting the nested grpc.ServerStream.Context()
+func (w *WrappedServerStream) Context() context.Context {
+	return w.WrappedContext
+}
+
+// WrapServerStream returns a ServerStream that has the ability to overwrite context.
+func WrapServerStream(stream grpc.ServerStream) *WrappedServerStream {
+	if existing, ok := stream.(*WrappedServerStream); ok {
+		return existing
+	}
+	return &WrappedServerStream{ServerStream: stream, WrappedContext: stream.Context()}
+}