|
@@ -21,18 +21,20 @@ package grpc
|
|
|
import (
|
|
import (
|
|
|
"bytes"
|
|
"bytes"
|
|
|
"compress/gzip"
|
|
"compress/gzip"
|
|
|
- stdctx "context"
|
|
|
|
|
"encoding/binary"
|
|
"encoding/binary"
|
|
|
|
|
+ "fmt"
|
|
|
"io"
|
|
"io"
|
|
|
"io/ioutil"
|
|
"io/ioutil"
|
|
|
"math"
|
|
"math"
|
|
|
- "os"
|
|
|
|
|
|
|
+ "strings"
|
|
|
"sync"
|
|
"sync"
|
|
|
"time"
|
|
"time"
|
|
|
|
|
|
|
|
"golang.org/x/net/context"
|
|
"golang.org/x/net/context"
|
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/codes"
|
|
|
"google.golang.org/grpc/credentials"
|
|
"google.golang.org/grpc/credentials"
|
|
|
|
|
+ "google.golang.org/grpc/encoding"
|
|
|
|
|
+ "google.golang.org/grpc/encoding/proto"
|
|
|
"google.golang.org/grpc/metadata"
|
|
"google.golang.org/grpc/metadata"
|
|
|
"google.golang.org/grpc/peer"
|
|
"google.golang.org/grpc/peer"
|
|
|
"google.golang.org/grpc/stats"
|
|
"google.golang.org/grpc/stats"
|
|
@@ -54,13 +56,29 @@ type gzipCompressor struct {
|
|
|
|
|
|
|
|
// NewGZIPCompressor creates a Compressor based on GZIP.
|
|
// NewGZIPCompressor creates a Compressor based on GZIP.
|
|
|
func NewGZIPCompressor() Compressor {
|
|
func NewGZIPCompressor() Compressor {
|
|
|
|
|
+ c, _ := NewGZIPCompressorWithLevel(gzip.DefaultCompression)
|
|
|
|
|
+ return c
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+// NewGZIPCompressorWithLevel is like NewGZIPCompressor but specifies the gzip compression level instead
|
|
|
|
|
+// of assuming DefaultCompression.
|
|
|
|
|
+//
|
|
|
|
|
+// The error returned will be nil if the level is valid.
|
|
|
|
|
+func NewGZIPCompressorWithLevel(level int) (Compressor, error) {
|
|
|
|
|
+ if level < gzip.DefaultCompression || level > gzip.BestCompression {
|
|
|
|
|
+ return nil, fmt.Errorf("grpc: invalid compression level: %d", level)
|
|
|
|
|
+ }
|
|
|
return &gzipCompressor{
|
|
return &gzipCompressor{
|
|
|
pool: sync.Pool{
|
|
pool: sync.Pool{
|
|
|
New: func() interface{} {
|
|
New: func() interface{} {
|
|
|
- return gzip.NewWriter(ioutil.Discard)
|
|
|
|
|
|
|
+ w, err := gzip.NewWriterLevel(ioutil.Discard, level)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ panic(err)
|
|
|
|
|
+ }
|
|
|
|
|
+ return w
|
|
|
},
|
|
},
|
|
|
},
|
|
},
|
|
|
- }
|
|
|
|
|
|
|
+ }, nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (c *gzipCompressor) Do(w io.Writer, p []byte) error {
|
|
func (c *gzipCompressor) Do(w io.Writer, p []byte) error {
|
|
@@ -124,14 +142,15 @@ func (d *gzipDecompressor) Type() string {
|
|
|
|
|
|
|
|
// callInfo contains all related configuration and information about an RPC.
|
|
// callInfo contains all related configuration and information about an RPC.
|
|
|
type callInfo struct {
|
|
type callInfo struct {
|
|
|
|
|
+ compressorType string
|
|
|
failFast bool
|
|
failFast bool
|
|
|
- headerMD metadata.MD
|
|
|
|
|
- trailerMD metadata.MD
|
|
|
|
|
- peer *peer.Peer
|
|
|
|
|
|
|
+ stream *clientStream
|
|
|
traceInfo traceInfo // in trace.go
|
|
traceInfo traceInfo // in trace.go
|
|
|
maxReceiveMessageSize *int
|
|
maxReceiveMessageSize *int
|
|
|
maxSendMessageSize *int
|
|
maxSendMessageSize *int
|
|
|
creds credentials.PerRPCCredentials
|
|
creds credentials.PerRPCCredentials
|
|
|
|
|
+ contentSubtype string
|
|
|
|
|
+ codec baseCodec
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func defaultCallInfo() *callInfo {
|
|
func defaultCallInfo() *callInfo {
|
|
@@ -158,80 +177,232 @@ type EmptyCallOption struct{}
|
|
|
func (EmptyCallOption) before(*callInfo) error { return nil }
|
|
func (EmptyCallOption) before(*callInfo) error { return nil }
|
|
|
func (EmptyCallOption) after(*callInfo) {}
|
|
func (EmptyCallOption) after(*callInfo) {}
|
|
|
|
|
|
|
|
-type beforeCall func(c *callInfo) error
|
|
|
|
|
-
|
|
|
|
|
-func (o beforeCall) before(c *callInfo) error { return o(c) }
|
|
|
|
|
-func (o beforeCall) after(c *callInfo) {}
|
|
|
|
|
-
|
|
|
|
|
-type afterCall func(c *callInfo)
|
|
|
|
|
-
|
|
|
|
|
-func (o afterCall) before(c *callInfo) error { return nil }
|
|
|
|
|
-func (o afterCall) after(c *callInfo) { o(c) }
|
|
|
|
|
-
|
|
|
|
|
// Header returns a CallOptions that retrieves the header metadata
|
|
// Header returns a CallOptions that retrieves the header metadata
|
|
|
// for a unary RPC.
|
|
// for a unary RPC.
|
|
|
func Header(md *metadata.MD) CallOption {
|
|
func Header(md *metadata.MD) CallOption {
|
|
|
- return afterCall(func(c *callInfo) {
|
|
|
|
|
- *md = c.headerMD
|
|
|
|
|
- })
|
|
|
|
|
|
|
+ return HeaderCallOption{HeaderAddr: md}
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+// HeaderCallOption is a CallOption for collecting response header metadata.
|
|
|
|
|
+// The metadata field will be populated *after* the RPC completes.
|
|
|
|
|
+// This is an EXPERIMENTAL API.
|
|
|
|
|
+type HeaderCallOption struct {
|
|
|
|
|
+ HeaderAddr *metadata.MD
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (o HeaderCallOption) before(c *callInfo) error { return nil }
|
|
|
|
|
+func (o HeaderCallOption) after(c *callInfo) {
|
|
|
|
|
+ if c.stream != nil {
|
|
|
|
|
+ *o.HeaderAddr, _ = c.stream.Header()
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// Trailer returns a CallOptions that retrieves the trailer metadata
|
|
// Trailer returns a CallOptions that retrieves the trailer metadata
|
|
|
// for a unary RPC.
|
|
// for a unary RPC.
|
|
|
func Trailer(md *metadata.MD) CallOption {
|
|
func Trailer(md *metadata.MD) CallOption {
|
|
|
- return afterCall(func(c *callInfo) {
|
|
|
|
|
- *md = c.trailerMD
|
|
|
|
|
- })
|
|
|
|
|
|
|
+ return TrailerCallOption{TrailerAddr: md}
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+// TrailerCallOption is a CallOption for collecting response trailer metadata.
|
|
|
|
|
+// The metadata field will be populated *after* the RPC completes.
|
|
|
|
|
+// This is an EXPERIMENTAL API.
|
|
|
|
|
+type TrailerCallOption struct {
|
|
|
|
|
+ TrailerAddr *metadata.MD
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (o TrailerCallOption) before(c *callInfo) error { return nil }
|
|
|
|
|
+func (o TrailerCallOption) after(c *callInfo) {
|
|
|
|
|
+ if c.stream != nil {
|
|
|
|
|
+ *o.TrailerAddr = c.stream.Trailer()
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// Peer returns a CallOption that retrieves peer information for a
|
|
// Peer returns a CallOption that retrieves peer information for a
|
|
|
// unary RPC.
|
|
// unary RPC.
|
|
|
-func Peer(peer *peer.Peer) CallOption {
|
|
|
|
|
- return afterCall(func(c *callInfo) {
|
|
|
|
|
- if c.peer != nil {
|
|
|
|
|
- *peer = *c.peer
|
|
|
|
|
|
|
+func Peer(p *peer.Peer) CallOption {
|
|
|
|
|
+ return PeerCallOption{PeerAddr: p}
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+// PeerCallOption is a CallOption for collecting the identity of the remote
|
|
|
|
|
+// peer. The peer field will be populated *after* the RPC completes.
|
|
|
|
|
+// This is an EXPERIMENTAL API.
|
|
|
|
|
+type PeerCallOption struct {
|
|
|
|
|
+ PeerAddr *peer.Peer
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (o PeerCallOption) before(c *callInfo) error { return nil }
|
|
|
|
|
+func (o PeerCallOption) after(c *callInfo) {
|
|
|
|
|
+ if c.stream != nil {
|
|
|
|
|
+ if x, ok := peer.FromContext(c.stream.Context()); ok {
|
|
|
|
|
+ *o.PeerAddr = *x
|
|
|
}
|
|
}
|
|
|
- })
|
|
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// FailFast configures the action to take when an RPC is attempted on broken
|
|
// FailFast configures the action to take when an RPC is attempted on broken
|
|
|
-// connections or unreachable servers. If failfast is true, the RPC will fail
|
|
|
|
|
|
|
+// connections or unreachable servers. If failFast is true, the RPC will fail
|
|
|
// immediately. Otherwise, the RPC client will block the call until a
|
|
// immediately. Otherwise, the RPC client will block the call until a
|
|
|
-// connection is available (or the call is canceled or times out) and will retry
|
|
|
|
|
-// the call if it fails due to a transient error. Please refer to
|
|
|
|
|
|
|
+// connection is available (or the call is canceled or times out) and will
|
|
|
|
|
+// retry the call if it fails due to a transient error. gRPC will not retry if
|
|
|
|
|
+// data was written to the wire unless the server indicates it did not process
|
|
|
|
|
+// the data. Please refer to
|
|
|
// https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md.
|
|
// https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md.
|
|
|
-// Note: failFast is default to true.
|
|
|
|
|
|
|
+//
|
|
|
|
|
+// By default, RPCs are "Fail Fast".
|
|
|
func FailFast(failFast bool) CallOption {
|
|
func FailFast(failFast bool) CallOption {
|
|
|
- return beforeCall(func(c *callInfo) error {
|
|
|
|
|
- c.failFast = failFast
|
|
|
|
|
- return nil
|
|
|
|
|
- })
|
|
|
|
|
|
|
+ return FailFastCallOption{FailFast: failFast}
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+// FailFastCallOption is a CallOption for indicating whether an RPC should fail
|
|
|
|
|
+// fast or not.
|
|
|
|
|
+// This is an EXPERIMENTAL API.
|
|
|
|
|
+type FailFastCallOption struct {
|
|
|
|
|
+ FailFast bool
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (o FailFastCallOption) before(c *callInfo) error {
|
|
|
|
|
+ c.failFast = o.FailFast
|
|
|
|
|
+ return nil
|
|
|
}
|
|
}
|
|
|
|
|
+func (o FailFastCallOption) after(c *callInfo) { return }
|
|
|
|
|
|
|
|
// MaxCallRecvMsgSize returns a CallOption which sets the maximum message size the client can receive.
|
|
// MaxCallRecvMsgSize returns a CallOption which sets the maximum message size the client can receive.
|
|
|
func MaxCallRecvMsgSize(s int) CallOption {
|
|
func MaxCallRecvMsgSize(s int) CallOption {
|
|
|
- return beforeCall(func(o *callInfo) error {
|
|
|
|
|
- o.maxReceiveMessageSize = &s
|
|
|
|
|
- return nil
|
|
|
|
|
- })
|
|
|
|
|
|
|
+ return MaxRecvMsgSizeCallOption{MaxRecvMsgSize: s}
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+// MaxRecvMsgSizeCallOption is a CallOption that indicates the maximum message
|
|
|
|
|
+// size the client can receive.
|
|
|
|
|
+// This is an EXPERIMENTAL API.
|
|
|
|
|
+type MaxRecvMsgSizeCallOption struct {
|
|
|
|
|
+ MaxRecvMsgSize int
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+func (o MaxRecvMsgSizeCallOption) before(c *callInfo) error {
|
|
|
|
|
+ c.maxReceiveMessageSize = &o.MaxRecvMsgSize
|
|
|
|
|
+ return nil
|
|
|
|
|
+}
|
|
|
|
|
+func (o MaxRecvMsgSizeCallOption) after(c *callInfo) { return }
|
|
|
|
|
+
|
|
|
// MaxCallSendMsgSize returns a CallOption which sets the maximum message size the client can send.
|
|
// MaxCallSendMsgSize returns a CallOption which sets the maximum message size the client can send.
|
|
|
func MaxCallSendMsgSize(s int) CallOption {
|
|
func MaxCallSendMsgSize(s int) CallOption {
|
|
|
- return beforeCall(func(o *callInfo) error {
|
|
|
|
|
- o.maxSendMessageSize = &s
|
|
|
|
|
- return nil
|
|
|
|
|
- })
|
|
|
|
|
|
|
+ return MaxSendMsgSizeCallOption{MaxSendMsgSize: s}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+// MaxSendMsgSizeCallOption is a CallOption that indicates the maximum message
|
|
|
|
|
+// size the client can send.
|
|
|
|
|
+// This is an EXPERIMENTAL API.
|
|
|
|
|
+type MaxSendMsgSizeCallOption struct {
|
|
|
|
|
+ MaxSendMsgSize int
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (o MaxSendMsgSizeCallOption) before(c *callInfo) error {
|
|
|
|
|
+ c.maxSendMessageSize = &o.MaxSendMsgSize
|
|
|
|
|
+ return nil
|
|
|
|
|
+}
|
|
|
|
|
+func (o MaxSendMsgSizeCallOption) after(c *callInfo) { return }
|
|
|
|
|
+
|
|
|
// PerRPCCredentials returns a CallOption that sets credentials.PerRPCCredentials
|
|
// PerRPCCredentials returns a CallOption that sets credentials.PerRPCCredentials
|
|
|
// for a call.
|
|
// for a call.
|
|
|
func PerRPCCredentials(creds credentials.PerRPCCredentials) CallOption {
|
|
func PerRPCCredentials(creds credentials.PerRPCCredentials) CallOption {
|
|
|
- return beforeCall(func(c *callInfo) error {
|
|
|
|
|
- c.creds = creds
|
|
|
|
|
- return nil
|
|
|
|
|
- })
|
|
|
|
|
|
|
+ return PerRPCCredsCallOption{Creds: creds}
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+// PerRPCCredsCallOption is a CallOption that indicates the per-RPC
|
|
|
|
|
+// credentials to use for the call.
|
|
|
|
|
+// This is an EXPERIMENTAL API.
|
|
|
|
|
+type PerRPCCredsCallOption struct {
|
|
|
|
|
+ Creds credentials.PerRPCCredentials
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (o PerRPCCredsCallOption) before(c *callInfo) error {
|
|
|
|
|
+ c.creds = o.Creds
|
|
|
|
|
+ return nil
|
|
|
}
|
|
}
|
|
|
|
|
+func (o PerRPCCredsCallOption) after(c *callInfo) { return }
|
|
|
|
|
+
|
|
|
|
|
+// UseCompressor returns a CallOption which sets the compressor used when
|
|
|
|
|
+// sending the request. If WithCompressor is also set, UseCompressor has
|
|
|
|
|
+// higher priority.
|
|
|
|
|
+//
|
|
|
|
|
+// This API is EXPERIMENTAL.
|
|
|
|
|
+func UseCompressor(name string) CallOption {
|
|
|
|
|
+ return CompressorCallOption{CompressorType: name}
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+// CompressorCallOption is a CallOption that indicates the compressor to use.
|
|
|
|
|
+// This is an EXPERIMENTAL API.
|
|
|
|
|
+type CompressorCallOption struct {
|
|
|
|
|
+ CompressorType string
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (o CompressorCallOption) before(c *callInfo) error {
|
|
|
|
|
+ c.compressorType = o.CompressorType
|
|
|
|
|
+ return nil
|
|
|
|
|
+}
|
|
|
|
|
+func (o CompressorCallOption) after(c *callInfo) { return }
|
|
|
|
|
+
|
|
|
|
|
+// CallContentSubtype returns a CallOption that will set the content-subtype
|
|
|
|
|
+// for a call. For example, if content-subtype is "json", the Content-Type over
|
|
|
|
|
+// the wire will be "application/grpc+json". The content-subtype is converted
|
|
|
|
|
+// to lowercase before being included in Content-Type. See Content-Type on
|
|
|
|
|
+// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
|
|
|
|
|
+// more details.
|
|
|
|
|
+//
|
|
|
|
|
+// If CallCustomCodec is not also used, the content-subtype will be used to
|
|
|
|
|
+// look up the Codec to use in the registry controlled by RegisterCodec. See
|
|
|
|
|
+// the documention on RegisterCodec for details on registration. The lookup
|
|
|
|
|
+// of content-subtype is case-insensitive. If no such Codec is found, the call
|
|
|
|
|
+// will result in an error with code codes.Internal.
|
|
|
|
|
+//
|
|
|
|
|
+// If CallCustomCodec is also used, that Codec will be used for all request and
|
|
|
|
|
+// response messages, with the content-subtype set to the given contentSubtype
|
|
|
|
|
+// here for requests.
|
|
|
|
|
+func CallContentSubtype(contentSubtype string) CallOption {
|
|
|
|
|
+ return ContentSubtypeCallOption{ContentSubtype: strings.ToLower(contentSubtype)}
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+// ContentSubtypeCallOption is a CallOption that indicates the content-subtype
|
|
|
|
|
+// used for marshaling messages.
|
|
|
|
|
+// This is an EXPERIMENTAL API.
|
|
|
|
|
+type ContentSubtypeCallOption struct {
|
|
|
|
|
+ ContentSubtype string
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (o ContentSubtypeCallOption) before(c *callInfo) error {
|
|
|
|
|
+ c.contentSubtype = o.ContentSubtype
|
|
|
|
|
+ return nil
|
|
|
|
|
+}
|
|
|
|
|
+func (o ContentSubtypeCallOption) after(c *callInfo) { return }
|
|
|
|
|
+
|
|
|
|
|
+// CallCustomCodec returns a CallOption that will set the given Codec to be
|
|
|
|
|
+// used for all request and response messages for a call. The result of calling
|
|
|
|
|
+// String() will be used as the content-subtype in a case-insensitive manner.
|
|
|
|
|
+//
|
|
|
|
|
+// See Content-Type on
|
|
|
|
|
+// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
|
|
|
|
|
+// more details. Also see the documentation on RegisterCodec and
|
|
|
|
|
+// CallContentSubtype for more details on the interaction between Codec and
|
|
|
|
|
+// content-subtype.
|
|
|
|
|
+//
|
|
|
|
|
+// This function is provided for advanced users; prefer to use only
|
|
|
|
|
+// CallContentSubtype to select a registered codec instead.
|
|
|
|
|
+func CallCustomCodec(codec Codec) CallOption {
|
|
|
|
|
+ return CustomCodecCallOption{Codec: codec}
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+// CustomCodecCallOption is a CallOption that indicates the codec used for
|
|
|
|
|
+// marshaling messages.
|
|
|
|
|
+// This is an EXPERIMENTAL API.
|
|
|
|
|
+type CustomCodecCallOption struct {
|
|
|
|
|
+ Codec Codec
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (o CustomCodecCallOption) before(c *callInfo) error {
|
|
|
|
|
+ c.codec = o.Codec
|
|
|
|
|
+ return nil
|
|
|
|
|
+}
|
|
|
|
|
+func (o CustomCodecCallOption) after(c *callInfo) { return }
|
|
|
|
|
|
|
|
// The format of the payload: compressed or not?
|
|
// The format of the payload: compressed or not?
|
|
|
type payloadFormat uint8
|
|
type payloadFormat uint8
|
|
@@ -248,8 +419,8 @@ type parser struct {
|
|
|
// error types.
|
|
// error types.
|
|
|
r io.Reader
|
|
r io.Reader
|
|
|
|
|
|
|
|
- // The header of a gRPC message. Find more detail
|
|
|
|
|
- // at https://grpc.io/docs/guides/wire.html.
|
|
|
|
|
|
|
+ // The header of a gRPC message. Find more detail at
|
|
|
|
|
+ // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
|
|
|
header [5]byte
|
|
header [5]byte
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -277,8 +448,11 @@ func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byt
|
|
|
if length == 0 {
|
|
if length == 0 {
|
|
|
return pf, nil, nil
|
|
return pf, nil, nil
|
|
|
}
|
|
}
|
|
|
- if length > uint32(maxReceiveMessageSize) {
|
|
|
|
|
- return 0, nil, Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", length, maxReceiveMessageSize)
|
|
|
|
|
|
|
+ if int64(length) > int64(maxInt) {
|
|
|
|
|
+ return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max length allowed on current machine (%d vs. %d)", length, maxInt)
|
|
|
|
|
+ }
|
|
|
|
|
+ if int(length) > maxReceiveMessageSize {
|
|
|
|
|
+ return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", length, maxReceiveMessageSize)
|
|
|
}
|
|
}
|
|
|
// TODO(bradfitz,zhaoq): garbage. reuse buffer after proto decoding instead
|
|
// TODO(bradfitz,zhaoq): garbage. reuse buffer after proto decoding instead
|
|
|
// of making it for each message:
|
|
// of making it for each message:
|
|
@@ -294,18 +468,21 @@ func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byt
|
|
|
|
|
|
|
|
// encode serializes msg and returns a buffer of message header and a buffer of msg.
|
|
// encode serializes msg and returns a buffer of message header and a buffer of msg.
|
|
|
// If msg is nil, it generates the message header and an empty msg buffer.
|
|
// If msg is nil, it generates the message header and an empty msg buffer.
|
|
|
-func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer, outPayload *stats.OutPayload) ([]byte, []byte, error) {
|
|
|
|
|
- var b []byte
|
|
|
|
|
|
|
+// TODO(ddyihai): eliminate extra Compressor parameter.
|
|
|
|
|
+func encode(c baseCodec, msg interface{}, cp Compressor, outPayload *stats.OutPayload, compressor encoding.Compressor) ([]byte, []byte, error) {
|
|
|
|
|
+ var (
|
|
|
|
|
+ b []byte
|
|
|
|
|
+ cbuf *bytes.Buffer
|
|
|
|
|
+ )
|
|
|
const (
|
|
const (
|
|
|
payloadLen = 1
|
|
payloadLen = 1
|
|
|
sizeLen = 4
|
|
sizeLen = 4
|
|
|
)
|
|
)
|
|
|
-
|
|
|
|
|
if msg != nil {
|
|
if msg != nil {
|
|
|
var err error
|
|
var err error
|
|
|
b, err = c.Marshal(msg)
|
|
b, err = c.Marshal(msg)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
- return nil, nil, Errorf(codes.Internal, "grpc: error while marshaling: %v", err.Error())
|
|
|
|
|
|
|
+ return nil, nil, status.Errorf(codes.Internal, "grpc: error while marshaling: %v", err.Error())
|
|
|
}
|
|
}
|
|
|
if outPayload != nil {
|
|
if outPayload != nil {
|
|
|
outPayload.Payload = msg
|
|
outPayload.Payload = msg
|
|
@@ -313,24 +490,35 @@ func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer, outPayl
|
|
|
outPayload.Data = b
|
|
outPayload.Data = b
|
|
|
outPayload.Length = len(b)
|
|
outPayload.Length = len(b)
|
|
|
}
|
|
}
|
|
|
- if cp != nil {
|
|
|
|
|
- if err := cp.Do(cbuf, b); err != nil {
|
|
|
|
|
- return nil, nil, Errorf(codes.Internal, "grpc: error while compressing: %v", err.Error())
|
|
|
|
|
|
|
+ if compressor != nil || cp != nil {
|
|
|
|
|
+ cbuf = new(bytes.Buffer)
|
|
|
|
|
+ // Has compressor, check Compressor is set by UseCompressor first.
|
|
|
|
|
+ if compressor != nil {
|
|
|
|
|
+ z, _ := compressor.Compress(cbuf)
|
|
|
|
|
+ if _, err := z.Write(b); err != nil {
|
|
|
|
|
+ return nil, nil, status.Errorf(codes.Internal, "grpc: error while compressing: %v", err.Error())
|
|
|
|
|
+ }
|
|
|
|
|
+ z.Close()
|
|
|
|
|
+ } else {
|
|
|
|
|
+ // If Compressor is not set by UseCompressor, use default Compressor
|
|
|
|
|
+ if err := cp.Do(cbuf, b); err != nil {
|
|
|
|
|
+ return nil, nil, status.Errorf(codes.Internal, "grpc: error while compressing: %v", err.Error())
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
b = cbuf.Bytes()
|
|
b = cbuf.Bytes()
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
if uint(len(b)) > math.MaxUint32 {
|
|
if uint(len(b)) > math.MaxUint32 {
|
|
|
- return nil, nil, Errorf(codes.ResourceExhausted, "grpc: message too large (%d bytes)", len(b))
|
|
|
|
|
|
|
+ return nil, nil, status.Errorf(codes.ResourceExhausted, "grpc: message too large (%d bytes)", len(b))
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
bufHeader := make([]byte, payloadLen+sizeLen)
|
|
bufHeader := make([]byte, payloadLen+sizeLen)
|
|
|
- if cp == nil {
|
|
|
|
|
- bufHeader[0] = byte(compressionNone)
|
|
|
|
|
- } else {
|
|
|
|
|
|
|
+ if compressor != nil || cp != nil {
|
|
|
bufHeader[0] = byte(compressionMade)
|
|
bufHeader[0] = byte(compressionMade)
|
|
|
|
|
+ } else {
|
|
|
|
|
+ bufHeader[0] = byte(compressionNone)
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
// Write length of b into buf
|
|
// Write length of b into buf
|
|
|
binary.BigEndian.PutUint32(bufHeader[payloadLen:], uint32(len(b)))
|
|
binary.BigEndian.PutUint32(bufHeader[payloadLen:], uint32(len(b)))
|
|
|
if outPayload != nil {
|
|
if outPayload != nil {
|
|
@@ -339,20 +527,26 @@ func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer, outPayl
|
|
|
return bufHeader, b, nil
|
|
return bufHeader, b, nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func checkRecvPayload(pf payloadFormat, recvCompress string, dc Decompressor) error {
|
|
|
|
|
|
|
+func checkRecvPayload(pf payloadFormat, recvCompress string, haveCompressor bool) *status.Status {
|
|
|
switch pf {
|
|
switch pf {
|
|
|
case compressionNone:
|
|
case compressionNone:
|
|
|
case compressionMade:
|
|
case compressionMade:
|
|
|
- if dc == nil || recvCompress != dc.Type() {
|
|
|
|
|
- return Errorf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", recvCompress)
|
|
|
|
|
|
|
+ if recvCompress == "" || recvCompress == encoding.Identity {
|
|
|
|
|
+ return status.New(codes.Internal, "grpc: compressed flag set with identity or empty encoding")
|
|
|
|
|
+ }
|
|
|
|
|
+ if !haveCompressor {
|
|
|
|
|
+ return status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", recvCompress)
|
|
|
}
|
|
}
|
|
|
default:
|
|
default:
|
|
|
- return Errorf(codes.Internal, "grpc: received unexpected payload format %d", pf)
|
|
|
|
|
|
|
+ return status.Newf(codes.Internal, "grpc: received unexpected payload format %d", pf)
|
|
|
}
|
|
}
|
|
|
return nil
|
|
return nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{}, maxReceiveMessageSize int, inPayload *stats.InPayload) error {
|
|
|
|
|
|
|
+// For the two compressor parameters, both should not be set, but if they are,
|
|
|
|
|
+// dc takes precedence over compressor.
|
|
|
|
|
+// TODO(dfawley): wrap the old compressor/decompressor using the new API?
|
|
|
|
|
+func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interface{}, maxReceiveMessageSize int, inPayload *stats.InPayload, compressor encoding.Compressor) error {
|
|
|
pf, d, err := p.recvMsg(maxReceiveMessageSize)
|
|
pf, d, err := p.recvMsg(maxReceiveMessageSize)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return err
|
|
return err
|
|
@@ -360,22 +554,37 @@ func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{
|
|
|
if inPayload != nil {
|
|
if inPayload != nil {
|
|
|
inPayload.WireLength = len(d)
|
|
inPayload.WireLength = len(d)
|
|
|
}
|
|
}
|
|
|
- if err := checkRecvPayload(pf, s.RecvCompress(), dc); err != nil {
|
|
|
|
|
- return err
|
|
|
|
|
|
|
+
|
|
|
|
|
+ if st := checkRecvPayload(pf, s.RecvCompress(), compressor != nil || dc != nil); st != nil {
|
|
|
|
|
+ return st.Err()
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
if pf == compressionMade {
|
|
if pf == compressionMade {
|
|
|
- d, err = dc.Do(bytes.NewReader(d))
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- return Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
|
|
|
|
|
|
|
+ // To match legacy behavior, if the decompressor is set by WithDecompressor or RPCDecompressor,
|
|
|
|
|
+ // use this decompressor as the default.
|
|
|
|
|
+ if dc != nil {
|
|
|
|
|
+ d, err = dc.Do(bytes.NewReader(d))
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
|
|
|
|
|
+ }
|
|
|
|
|
+ } else {
|
|
|
|
|
+ dcReader, err := compressor.Decompress(bytes.NewReader(d))
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
|
|
|
|
|
+ }
|
|
|
|
|
+ d, err = ioutil.ReadAll(dcReader)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
if len(d) > maxReceiveMessageSize {
|
|
if len(d) > maxReceiveMessageSize {
|
|
|
// TODO: Revisit the error code. Currently keep it consistent with java
|
|
// TODO: Revisit the error code. Currently keep it consistent with java
|
|
|
// implementation.
|
|
// implementation.
|
|
|
- return Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", len(d), maxReceiveMessageSize)
|
|
|
|
|
|
|
+ return status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", len(d), maxReceiveMessageSize)
|
|
|
}
|
|
}
|
|
|
if err := c.Unmarshal(d, m); err != nil {
|
|
if err := c.Unmarshal(d, m); err != nil {
|
|
|
- return Errorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err)
|
|
|
|
|
|
|
+ return status.Errorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err)
|
|
|
}
|
|
}
|
|
|
if inPayload != nil {
|
|
if inPayload != nil {
|
|
|
inPayload.RecvTime = time.Now()
|
|
inPayload.RecvTime = time.Now()
|
|
@@ -388,9 +597,7 @@ func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
type rpcInfo struct {
|
|
type rpcInfo struct {
|
|
|
- failfast bool
|
|
|
|
|
- bytesSent bool
|
|
|
|
|
- bytesReceived bool
|
|
|
|
|
|
|
+ failfast bool
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
type rpcInfoContextKey struct{}
|
|
type rpcInfoContextKey struct{}
|
|
@@ -404,69 +611,10 @@ func rpcInfoFromContext(ctx context.Context) (s *rpcInfo, ok bool) {
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func updateRPCInfoInContext(ctx context.Context, s rpcInfo) {
|
|
|
|
|
- if ss, ok := rpcInfoFromContext(ctx); ok {
|
|
|
|
|
- ss.bytesReceived = s.bytesReceived
|
|
|
|
|
- ss.bytesSent = s.bytesSent
|
|
|
|
|
- }
|
|
|
|
|
- return
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-// toRPCErr converts an error into an error from the status package.
|
|
|
|
|
-func toRPCErr(err error) error {
|
|
|
|
|
- if _, ok := status.FromError(err); ok {
|
|
|
|
|
- return err
|
|
|
|
|
- }
|
|
|
|
|
- switch e := err.(type) {
|
|
|
|
|
- case transport.StreamError:
|
|
|
|
|
- return status.Error(e.Code, e.Desc)
|
|
|
|
|
- case transport.ConnectionError:
|
|
|
|
|
- return status.Error(codes.Unavailable, e.Desc)
|
|
|
|
|
- default:
|
|
|
|
|
- switch err {
|
|
|
|
|
- case context.DeadlineExceeded, stdctx.DeadlineExceeded:
|
|
|
|
|
- return status.Error(codes.DeadlineExceeded, err.Error())
|
|
|
|
|
- case context.Canceled, stdctx.Canceled:
|
|
|
|
|
- return status.Error(codes.Canceled, err.Error())
|
|
|
|
|
- case ErrClientConnClosing:
|
|
|
|
|
- return status.Error(codes.FailedPrecondition, err.Error())
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- return status.Error(codes.Unknown, err.Error())
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-// convertCode converts a standard Go error into its canonical code. Note that
|
|
|
|
|
-// this is only used to translate the error returned by the server applications.
|
|
|
|
|
-func convertCode(err error) codes.Code {
|
|
|
|
|
- switch err {
|
|
|
|
|
- case nil:
|
|
|
|
|
- return codes.OK
|
|
|
|
|
- case io.EOF:
|
|
|
|
|
- return codes.OutOfRange
|
|
|
|
|
- case io.ErrClosedPipe, io.ErrNoProgress, io.ErrShortBuffer, io.ErrShortWrite, io.ErrUnexpectedEOF:
|
|
|
|
|
- return codes.FailedPrecondition
|
|
|
|
|
- case os.ErrInvalid:
|
|
|
|
|
- return codes.InvalidArgument
|
|
|
|
|
- case context.Canceled, stdctx.Canceled:
|
|
|
|
|
- return codes.Canceled
|
|
|
|
|
- case context.DeadlineExceeded, stdctx.DeadlineExceeded:
|
|
|
|
|
- return codes.DeadlineExceeded
|
|
|
|
|
- }
|
|
|
|
|
- switch {
|
|
|
|
|
- case os.IsExist(err):
|
|
|
|
|
- return codes.AlreadyExists
|
|
|
|
|
- case os.IsNotExist(err):
|
|
|
|
|
- return codes.NotFound
|
|
|
|
|
- case os.IsPermission(err):
|
|
|
|
|
- return codes.PermissionDenied
|
|
|
|
|
- }
|
|
|
|
|
- return codes.Unknown
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
// Code returns the error code for err if it was produced by the rpc system.
|
|
// Code returns the error code for err if it was produced by the rpc system.
|
|
|
// Otherwise, it returns codes.Unknown.
|
|
// Otherwise, it returns codes.Unknown.
|
|
|
//
|
|
//
|
|
|
-// Deprecated; use status.FromError and Code method instead.
|
|
|
|
|
|
|
+// Deprecated: use status.FromError and Code method instead.
|
|
|
func Code(err error) codes.Code {
|
|
func Code(err error) codes.Code {
|
|
|
if s, ok := status.FromError(err); ok {
|
|
if s, ok := status.FromError(err); ok {
|
|
|
return s.Code()
|
|
return s.Code()
|
|
@@ -477,7 +625,7 @@ func Code(err error) codes.Code {
|
|
|
// ErrorDesc returns the error description of err if it was produced by the rpc system.
|
|
// ErrorDesc returns the error description of err if it was produced by the rpc system.
|
|
|
// Otherwise, it returns err.Error() or empty string when err is nil.
|
|
// Otherwise, it returns err.Error() or empty string when err is nil.
|
|
|
//
|
|
//
|
|
|
-// Deprecated; use status.FromError and Message method instead.
|
|
|
|
|
|
|
+// Deprecated: use status.FromError and Message method instead.
|
|
|
func ErrorDesc(err error) string {
|
|
func ErrorDesc(err error) string {
|
|
|
if s, ok := status.FromError(err); ok {
|
|
if s, ok := status.FromError(err); ok {
|
|
|
return s.Message()
|
|
return s.Message()
|
|
@@ -488,85 +636,47 @@ func ErrorDesc(err error) string {
|
|
|
// Errorf returns an error containing an error code and a description;
|
|
// Errorf returns an error containing an error code and a description;
|
|
|
// Errorf returns nil if c is OK.
|
|
// Errorf returns nil if c is OK.
|
|
|
//
|
|
//
|
|
|
-// Deprecated; use status.Errorf instead.
|
|
|
|
|
|
|
+// Deprecated: use status.Errorf instead.
|
|
|
func Errorf(c codes.Code, format string, a ...interface{}) error {
|
|
func Errorf(c codes.Code, format string, a ...interface{}) error {
|
|
|
return status.Errorf(c, format, a...)
|
|
return status.Errorf(c, format, a...)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-// MethodConfig defines the configuration recommended by the service providers for a
|
|
|
|
|
-// particular method.
|
|
|
|
|
-// This is EXPERIMENTAL and subject to change.
|
|
|
|
|
-type MethodConfig struct {
|
|
|
|
|
- // WaitForReady indicates whether RPCs sent to this method should wait until
|
|
|
|
|
- // the connection is ready by default (!failfast). The value specified via the
|
|
|
|
|
- // gRPC client API will override the value set here.
|
|
|
|
|
- WaitForReady *bool
|
|
|
|
|
- // Timeout is the default timeout for RPCs sent to this method. The actual
|
|
|
|
|
- // deadline used will be the minimum of the value specified here and the value
|
|
|
|
|
- // set by the application via the gRPC client API. If either one is not set,
|
|
|
|
|
- // then the other will be used. If neither is set, then the RPC has no deadline.
|
|
|
|
|
- Timeout *time.Duration
|
|
|
|
|
- // MaxReqSize is the maximum allowed payload size for an individual request in a
|
|
|
|
|
- // stream (client->server) in bytes. The size which is measured is the serialized
|
|
|
|
|
- // payload after per-message compression (but before stream compression) in bytes.
|
|
|
|
|
- // The actual value used is the minimum of the value specified here and the value set
|
|
|
|
|
- // by the application via the gRPC client API. If either one is not set, then the other
|
|
|
|
|
- // will be used. If neither is set, then the built-in default is used.
|
|
|
|
|
- MaxReqSize *int
|
|
|
|
|
- // MaxRespSize is the maximum allowed payload size for an individual response in a
|
|
|
|
|
- // stream (server->client) in bytes.
|
|
|
|
|
- MaxRespSize *int
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-// ServiceConfig is provided by the service provider and contains parameters for how
|
|
|
|
|
-// clients that connect to the service should behave.
|
|
|
|
|
-// This is EXPERIMENTAL and subject to change.
|
|
|
|
|
-type ServiceConfig struct {
|
|
|
|
|
- // LB is the load balancer the service providers recommends. The balancer specified
|
|
|
|
|
- // via grpc.WithBalancer will override this.
|
|
|
|
|
- LB Balancer
|
|
|
|
|
- // Methods contains a map for the methods in this service.
|
|
|
|
|
- // If there is an exact match for a method (i.e. /service/method) in the map, use the corresponding MethodConfig.
|
|
|
|
|
- // If there's no exact match, look for the default config for the service (/service/) and use the corresponding MethodConfig if it exists.
|
|
|
|
|
- // Otherwise, the method has no MethodConfig to use.
|
|
|
|
|
- Methods map[string]MethodConfig
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-func min(a, b *int) *int {
|
|
|
|
|
- if *a < *b {
|
|
|
|
|
- return a
|
|
|
|
|
|
|
+// setCallInfoCodec should only be called after CallOptions have been applied.
|
|
|
|
|
+func setCallInfoCodec(c *callInfo) error {
|
|
|
|
|
+ if c.codec != nil {
|
|
|
|
|
+ // codec was already set by a CallOption; use it.
|
|
|
|
|
+ return nil
|
|
|
}
|
|
}
|
|
|
- return b
|
|
|
|
|
-}
|
|
|
|
|
|
|
|
|
|
-func getMaxSize(mcMax, doptMax *int, defaultVal int) *int {
|
|
|
|
|
- if mcMax == nil && doptMax == nil {
|
|
|
|
|
- return &defaultVal
|
|
|
|
|
- }
|
|
|
|
|
- if mcMax != nil && doptMax != nil {
|
|
|
|
|
- return min(mcMax, doptMax)
|
|
|
|
|
|
|
+ if c.contentSubtype == "" {
|
|
|
|
|
+ // No codec specified in CallOptions; use proto by default.
|
|
|
|
|
+ c.codec = encoding.GetCodec(proto.Name)
|
|
|
|
|
+ return nil
|
|
|
}
|
|
}
|
|
|
- if mcMax != nil {
|
|
|
|
|
- return mcMax
|
|
|
|
|
|
|
+
|
|
|
|
|
+ // c.contentSubtype is already lowercased in CallContentSubtype
|
|
|
|
|
+ c.codec = encoding.GetCodec(c.contentSubtype)
|
|
|
|
|
+ if c.codec == nil {
|
|
|
|
|
+ return status.Errorf(codes.Internal, "no codec registered for content-subtype %s", c.contentSubtype)
|
|
|
}
|
|
}
|
|
|
- return doptMax
|
|
|
|
|
|
|
+ return nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-// SupportPackageIsVersion3 is referenced from generated protocol buffer files.
|
|
|
|
|
-// The latest support package version is 4.
|
|
|
|
|
-// SupportPackageIsVersion3 is kept for compatibility. It will be removed in the
|
|
|
|
|
-// next support package version update.
|
|
|
|
|
-const SupportPackageIsVersion3 = true
|
|
|
|
|
-
|
|
|
|
|
-// SupportPackageIsVersion4 is referenced from generated protocol buffer files
|
|
|
|
|
-// to assert that that code is compatible with this version of the grpc package.
|
|
|
|
|
|
|
+// The SupportPackageIsVersion variables are referenced from generated protocol
|
|
|
|
|
+// buffer files to ensure compatibility with the gRPC version used. The latest
|
|
|
|
|
+// support package version is 5.
|
|
|
//
|
|
//
|
|
|
-// This constant may be renamed in the future if a change in the generated code
|
|
|
|
|
-// requires a synchronised update of grpc-go and protoc-gen-go. This constant
|
|
|
|
|
-// should not be referenced from any other code.
|
|
|
|
|
-const SupportPackageIsVersion4 = true
|
|
|
|
|
|
|
+// Older versions are kept for compatibility. They may be removed if
|
|
|
|
|
+// compatibility cannot be maintained.
|
|
|
|
|
+//
|
|
|
|
|
+// These constants should not be referenced from any other code.
|
|
|
|
|
+const (
|
|
|
|
|
+ SupportPackageIsVersion3 = true
|
|
|
|
|
+ SupportPackageIsVersion4 = true
|
|
|
|
|
+ SupportPackageIsVersion5 = true
|
|
|
|
|
+)
|
|
|
|
|
|
|
|
// Version is the current grpc version.
|
|
// Version is the current grpc version.
|
|
|
-const Version = "1.7.5"
|
|
|
|
|
|
|
+const Version = "1.11.1"
|
|
|
|
|
|
|
|
const grpcUA = "grpc-go/" + Version
|
|
const grpcUA = "grpc-go/" + Version
|