Bläddra i källkod

vendor: update grpc

Fixes #5871
Anthony Romano 9 år sedan
förälder
incheckning
d9a8a326df

+ 18 - 9
cmd/Godeps/Godeps.json

@@ -234,39 +234,48 @@
 		},
 		{
 			"ImportPath": "google.golang.org/grpc",
-			"Rev": "e78224b060cf3215247b7be455f80ea22e469b66"
+			"Comment": "v1.0.0-6-g02fca89",
+			"Rev": "02fca896ff5f50c6bbbee0860345a49344b37a03"
 		},
 		{
 			"ImportPath": "google.golang.org/grpc/codes",
-			"Rev": "e78224b060cf3215247b7be455f80ea22e469b66"
+			"Comment": "v1.0.0-6-g02fca89",
+			"Rev": "02fca896ff5f50c6bbbee0860345a49344b37a03"
 		},
 		{
 			"ImportPath": "google.golang.org/grpc/credentials",
-			"Rev": "e78224b060cf3215247b7be455f80ea22e469b66"
+			"Comment": "v1.0.0-6-g02fca89",
+			"Rev": "02fca896ff5f50c6bbbee0860345a49344b37a03"
 		},
 		{
 			"ImportPath": "google.golang.org/grpc/grpclog",
-			"Rev": "e78224b060cf3215247b7be455f80ea22e469b66"
+			"Comment": "v1.0.0-6-g02fca89",
+			"Rev": "02fca896ff5f50c6bbbee0860345a49344b37a03"
 		},
 		{
 			"ImportPath": "google.golang.org/grpc/internal",
-			"Rev": "e78224b060cf3215247b7be455f80ea22e469b66"
+			"Comment": "v1.0.0-6-g02fca89",
+			"Rev": "02fca896ff5f50c6bbbee0860345a49344b37a03"
 		},
 		{
 			"ImportPath": "google.golang.org/grpc/metadata",
-			"Rev": "e78224b060cf3215247b7be455f80ea22e469b66"
+			"Comment": "v1.0.0-6-g02fca89",
+			"Rev": "02fca896ff5f50c6bbbee0860345a49344b37a03"
 		},
 		{
 			"ImportPath": "google.golang.org/grpc/naming",
-			"Rev": "e78224b060cf3215247b7be455f80ea22e469b66"
+			"Comment": "v1.0.0-6-g02fca89",
+			"Rev": "02fca896ff5f50c6bbbee0860345a49344b37a03"
 		},
 		{
 			"ImportPath": "google.golang.org/grpc/peer",
-			"Rev": "e78224b060cf3215247b7be455f80ea22e469b66"
+			"Comment": "v1.0.0-6-g02fca89",
+			"Rev": "02fca896ff5f50c6bbbee0860345a49344b37a03"
 		},
 		{
 			"ImportPath": "google.golang.org/grpc/transport",
-			"Rev": "e78224b060cf3215247b7be455f80ea22e469b66"
+			"Comment": "v1.0.0-6-g02fca89",
+			"Rev": "02fca896ff5f50c6bbbee0860345a49344b37a03"
 		},
 		{
 			"ImportPath": "gopkg.in/cheggaaa/pb.v1",

+ 110 - 65
cmd/vendor/google.golang.org/grpc/balancer.go

@@ -40,7 +40,6 @@ import (
 	"golang.org/x/net/context"
 	"google.golang.org/grpc/grpclog"
 	"google.golang.org/grpc/naming"
-	"google.golang.org/grpc/transport"
 )
 
 // Address represents a server the client connects to.
@@ -94,10 +93,10 @@ type Balancer interface {
 	// instead of blocking.
 	//
 	// The function returns put which is called once the rpc has completed or failed.
-	// put can collect and report RPC stats to a remote load balancer. gRPC internals
-	// will try to call this again if err is non-nil (unless err is ErrClientConnClosing).
+	// put can collect and report RPC stats to a remote load balancer.
 	//
-	// TODO: Add other non-recoverable errors?
+	// This function should only return the errors Balancer cannot recover by itself.
+	// gRPC internals will fail the RPC if an error is returned.
 	Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error)
 	// Notify returns a channel that is used by gRPC internals to watch the addresses
 	// gRPC needs to connect. The addresses might be from a name resolver or remote
@@ -139,35 +138,40 @@ func RoundRobin(r naming.Resolver) Balancer {
 	return &roundRobin{r: r}
 }
 
+type addrInfo struct {
+	addr      Address
+	connected bool
+}
+
 type roundRobin struct {
-	r         naming.Resolver
-	w         naming.Watcher
-	open      []Address // all the addresses the client should potentially connect
-	mu        sync.Mutex
-	addrCh    chan []Address // the channel to notify gRPC internals the list of addresses the client should connect to.
-	connected []Address      // all the connected addresses
-	next      int            // index of the next address to return for Get()
-	waitCh    chan struct{}  // the channel to block when there is no connected address available
-	done      bool           // The Balancer is closed.
+	r      naming.Resolver
+	w      naming.Watcher
+	addrs  []*addrInfo // all the addresses the client should potentially connect
+	mu     sync.Mutex
+	addrCh chan []Address // the channel to notify gRPC internals the list of addresses the client should connect to.
+	next   int            // index of the next address to return for Get()
+	waitCh chan struct{}  // the channel to block when there is no connected address available
+	done   bool           // The Balancer is closed.
 }
 
 func (rr *roundRobin) watchAddrUpdates() error {
 	updates, err := rr.w.Next()
 	if err != nil {
-		grpclog.Println("grpc: the naming watcher stops working due to %v.", err)
+		grpclog.Printf("grpc: the naming watcher stops working due to %v.\n", err)
 		return err
 	}
 	rr.mu.Lock()
 	defer rr.mu.Unlock()
 	for _, update := range updates {
 		addr := Address{
-			Addr: update.Addr,
+			Addr:     update.Addr,
+			Metadata: update.Metadata,
 		}
 		switch update.Op {
 		case naming.Add:
 			var exist bool
-			for _, v := range rr.open {
-				if addr == v {
+			for _, v := range rr.addrs {
+				if addr == v.addr {
 					exist = true
 					grpclog.Println("grpc: The name resolver wanted to add an existing address: ", addr)
 					break
@@ -176,12 +180,12 @@ func (rr *roundRobin) watchAddrUpdates() error {
 			if exist {
 				continue
 			}
-			rr.open = append(rr.open, addr)
+			rr.addrs = append(rr.addrs, &addrInfo{addr: addr})
 		case naming.Delete:
-			for i, v := range rr.open {
-				if v == addr {
-					copy(rr.open[i:], rr.open[i+1:])
-					rr.open = rr.open[:len(rr.open)-1]
+			for i, v := range rr.addrs {
+				if addr == v.addr {
+					copy(rr.addrs[i:], rr.addrs[i+1:])
+					rr.addrs = rr.addrs[:len(rr.addrs)-1]
 					break
 				}
 			}
@@ -189,9 +193,11 @@ func (rr *roundRobin) watchAddrUpdates() error {
 			grpclog.Println("Unknown update.Op ", update.Op)
 		}
 	}
-	// Make a copy of rr.open and write it onto rr.addrCh so that gRPC internals gets notified.
-	open := make([]Address, len(rr.open), len(rr.open))
-	copy(open, rr.open)
+	// Make a copy of rr.addrs and write it onto rr.addrCh so that gRPC internals gets notified.
+	open := make([]Address, len(rr.addrs))
+	for i, v := range rr.addrs {
+		open[i] = v.addr
+	}
 	if rr.done {
 		return ErrClientConnClosing
 	}
@@ -202,7 +208,9 @@ func (rr *roundRobin) watchAddrUpdates() error {
 func (rr *roundRobin) Start(target string) error {
 	if rr.r == nil {
 		// If there is no name resolver installed, it is not needed to
-		// do name resolution. In this case, rr.addrCh stays nil.
+		// do name resolution. In this case, target is added into rr.addrs
+		// as the only address available and rr.addrCh stays nil.
+		rr.addrs = append(rr.addrs, &addrInfo{addr: Address{Addr: target}})
 		return nil
 	}
 	w, err := rr.r.Resolve(target)
@@ -221,38 +229,41 @@ func (rr *roundRobin) Start(target string) error {
 	return nil
 }
 
-// Up appends addr to the end of rr.connected and sends notification if there
-// are pending Get() calls.
+// Up sets the connected state of addr and sends notification if there are pending
+// Get() calls.
 func (rr *roundRobin) Up(addr Address) func(error) {
 	rr.mu.Lock()
 	defer rr.mu.Unlock()
-	for _, a := range rr.connected {
-		if a == addr {
-			return nil
+	var cnt int
+	for _, a := range rr.addrs {
+		if a.addr == addr {
+			if a.connected {
+				return nil
+			}
+			a.connected = true
 		}
-	}
-	rr.connected = append(rr.connected, addr)
-	if len(rr.connected) == 1 {
-		// addr is only one available. Notify the Get() callers who are blocking.
-		if rr.waitCh != nil {
-			close(rr.waitCh)
-			rr.waitCh = nil
+		if a.connected {
+			cnt++
 		}
 	}
+	// addr is only one which is connected. Notify the Get() callers who are blocking.
+	if cnt == 1 && rr.waitCh != nil {
+		close(rr.waitCh)
+		rr.waitCh = nil
+	}
 	return func(err error) {
 		rr.down(addr, err)
 	}
 }
 
-// down removes addr from rr.connected and moves the remaining addrs forward.
+// down unsets the connected state of addr.
 func (rr *roundRobin) down(addr Address, err error) {
 	rr.mu.Lock()
 	defer rr.mu.Unlock()
-	for i, a := range rr.connected {
-		if a == addr {
-			copy(rr.connected[i:], rr.connected[i+1:])
-			rr.connected = rr.connected[:len(rr.connected)-1]
-			return
+	for _, a := range rr.addrs {
+		if addr == a.addr {
+			a.connected = false
+			break
 		}
 	}
 }
@@ -266,17 +277,40 @@ func (rr *roundRobin) Get(ctx context.Context, opts BalancerGetOptions) (addr Ad
 		err = ErrClientConnClosing
 		return
 	}
-	if rr.next >= len(rr.connected) {
-		rr.next = 0
+
+	if len(rr.addrs) > 0 {
+		if rr.next >= len(rr.addrs) {
+			rr.next = 0
+		}
+		next := rr.next
+		for {
+			a := rr.addrs[next]
+			next = (next + 1) % len(rr.addrs)
+			if a.connected {
+				addr = a.addr
+				rr.next = next
+				rr.mu.Unlock()
+				return
+			}
+			if next == rr.next {
+				// Has iterated all the possible address but none is connected.
+				break
+			}
+		}
 	}
-	if len(rr.connected) > 0 {
-		addr = rr.connected[rr.next]
+	if !opts.BlockingWait {
+		if len(rr.addrs) == 0 {
+			rr.mu.Unlock()
+			err = fmt.Errorf("there is no address available")
+			return
+		}
+		// Returns the next addr on rr.addrs for failfast RPCs.
+		addr = rr.addrs[rr.next].addr
 		rr.next++
 		rr.mu.Unlock()
 		return
 	}
-	// There is no address available. Wait on rr.waitCh.
-	// TODO(zhaoq): Handle the case when opts.BlockingWait is false.
+	// Wait on rr.waitCh for non-failfast RPCs.
 	if rr.waitCh == nil {
 		ch = make(chan struct{})
 		rr.waitCh = ch
@@ -287,7 +321,7 @@ func (rr *roundRobin) Get(ctx context.Context, opts BalancerGetOptions) (addr Ad
 	for {
 		select {
 		case <-ctx.Done():
-			err = transport.ContextErr(ctx.Err())
+			err = ctx.Err()
 			return
 		case <-ch:
 			rr.mu.Lock()
@@ -296,24 +330,35 @@ func (rr *roundRobin) Get(ctx context.Context, opts BalancerGetOptions) (addr Ad
 				err = ErrClientConnClosing
 				return
 			}
-			if len(rr.connected) == 0 {
-				// The newly added addr got removed by Down() again.
-				if rr.waitCh == nil {
-					ch = make(chan struct{})
-					rr.waitCh = ch
-				} else {
-					ch = rr.waitCh
+
+			if len(rr.addrs) > 0 {
+				if rr.next >= len(rr.addrs) {
+					rr.next = 0
+				}
+				next := rr.next
+				for {
+					a := rr.addrs[next]
+					next = (next + 1) % len(rr.addrs)
+					if a.connected {
+						addr = a.addr
+						rr.next = next
+						rr.mu.Unlock()
+						return
+					}
+					if next == rr.next {
+						// Has iterated all the possible address but none is connected.
+						break
+					}
 				}
-				rr.mu.Unlock()
-				continue
 			}
-			if rr.next >= len(rr.connected) {
-				rr.next = 0
+			// The newly added addr got removed by Down() again.
+			if rr.waitCh == nil {
+				ch = make(chan struct{})
+				rr.waitCh = ch
+			} else {
+				ch = rr.waitCh
 			}
-			addr = rr.connected[rr.next]
-			rr.next++
 			rr.mu.Unlock()
-			return
 		}
 	}
 }

+ 8 - 10
cmd/vendor/google.golang.org/grpc/call.go

@@ -101,7 +101,7 @@ func sendRequest(ctx context.Context, codec Codec, compressor Compressor, callHd
 // Invoke is called by generated code. Also users can call Invoke directly when it
 // is really needed in their use cases.
 func Invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (err error) {
-	var c callInfo
+	c := defaultCallInfo
 	for _, o := range opts {
 		if err := o.before(&c); err != nil {
 			return toRPCErr(err)
@@ -155,19 +155,17 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
 		t, put, err = cc.getTransport(ctx, gopts)
 		if err != nil {
 			// TODO(zhaoq): Probably revisit the error handling.
-			if err == ErrClientConnClosing {
-				return Errorf(codes.FailedPrecondition, "%v", err)
+			if _, ok := err.(*rpcError); ok {
+				return err
 			}
-			if _, ok := err.(transport.StreamError); ok {
-				return toRPCErr(err)
-			}
-			if _, ok := err.(transport.ConnectionError); ok {
+			if err == errConnClosing {
 				if c.failFast {
-					return toRPCErr(err)
+					return Errorf(codes.Unavailable, "%v", errConnClosing)
 				}
+				continue
 			}
-			// All the remaining cases are treated as retryable.
-			continue
+			// All the other errors are treated as Internal errors.
+			return Errorf(codes.Internal, "%v", err)
 		}
 		if c.traceInfo.tr != nil {
 			c.traceInfo.tr.LazyLog(&payload{sent: true, msg: args}, true)

+ 22 - 21
cmd/vendor/google.golang.org/grpc/clientconn.go

@@ -218,27 +218,26 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) {
 	for _, opt := range opts {
 		opt(&cc.dopts)
 	}
+
+	// Set defaults.
 	if cc.dopts.codec == nil {
-		// Set the default codec.
 		cc.dopts.codec = protoCodec{}
 	}
-
 	if cc.dopts.bs == nil {
 		cc.dopts.bs = DefaultBackoffConfig
 	}
-
-	cc.balancer = cc.dopts.balancer
-	if cc.balancer == nil {
-		cc.balancer = RoundRobin(nil)
+	if cc.dopts.balancer == nil {
+		cc.dopts.balancer = RoundRobin(nil)
 	}
-	if err := cc.balancer.Start(target); err != nil {
+
+	if err := cc.dopts.balancer.Start(target); err != nil {
 		return nil, err
 	}
 	var (
 		ok    bool
 		addrs []Address
 	)
-	ch := cc.balancer.Notify()
+	ch := cc.dopts.balancer.Notify()
 	if ch == nil {
 		// There is no name resolver installed.
 		addrs = append(addrs, Address{Addr: target})
@@ -319,7 +318,6 @@ func (s ConnectivityState) String() string {
 // ClientConn represents a client connection to an RPC server.
 type ClientConn struct {
 	target    string
-	balancer  Balancer
 	authority string
 	dopts     dialOptions
 
@@ -328,7 +326,7 @@ type ClientConn struct {
 }
 
 func (cc *ClientConn) lbWatcher() {
-	for addrs := range cc.balancer.Notify() {
+	for addrs := range cc.dopts.balancer.Notify() {
 		var (
 			add []Address   // Addresses need to setup connections.
 			del []*addrConn // Connections need to tear down.
@@ -424,15 +422,14 @@ func (cc *ClientConn) newAddrConn(addr Address, skipWait bool) error {
 }
 
 func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) {
-	// TODO(zhaoq): Implement fail-fast logic.
-	addr, put, err := cc.balancer.Get(ctx, opts)
+	addr, put, err := cc.dopts.balancer.Get(ctx, opts)
 	if err != nil {
-		return nil, nil, err
+		return nil, nil, toRPCErr(err)
 	}
 	cc.mu.RLock()
 	if cc.conns == nil {
 		cc.mu.RUnlock()
-		return nil, nil, ErrClientConnClosing
+		return nil, nil, toRPCErr(ErrClientConnClosing)
 	}
 	ac, ok := cc.conns[addr]
 	cc.mu.RUnlock()
@@ -440,9 +437,9 @@ func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions)
 		if put != nil {
 			put()
 		}
-		return nil, nil, transport.StreamErrorf(codes.Internal, "grpc: failed to find the transport to send the rpc")
+		return nil, nil, Errorf(codes.Internal, "grpc: failed to find the transport to send the rpc")
 	}
-	t, err := ac.wait(ctx)
+	t, err := ac.wait(ctx, !opts.BlockingWait)
 	if err != nil {
 		if put != nil {
 			put()
@@ -462,7 +459,7 @@ func (cc *ClientConn) Close() error {
 	conns := cc.conns
 	cc.conns = nil
 	cc.mu.Unlock()
-	cc.balancer.Close()
+	cc.dopts.balancer.Close()
 	for _, ac := range conns {
 		ac.tearDown(ErrClientConnClosing)
 	}
@@ -610,7 +607,7 @@ func (ac *addrConn) resetTransport(closeTransport bool) error {
 			close(ac.ready)
 			ac.ready = nil
 		}
-		ac.down = ac.cc.balancer.Up(ac.addr)
+		ac.down = ac.cc.dopts.balancer.Up(ac.addr)
 		ac.mu.Unlock()
 		return nil
 	}
@@ -649,8 +646,9 @@ func (ac *addrConn) transportMonitor() {
 	}
 }
 
-// wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed.
-func (ac *addrConn) wait(ctx context.Context) (transport.ClientTransport, error) {
+// wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or
+// iv) transport is in TransientFailure and the RPC is fail-fast.
+func (ac *addrConn) wait(ctx context.Context, failFast bool) (transport.ClientTransport, error) {
 	for {
 		ac.mu.Lock()
 		switch {
@@ -661,6 +659,9 @@ func (ac *addrConn) wait(ctx context.Context) (transport.ClientTransport, error)
 			ct := ac.transport
 			ac.mu.Unlock()
 			return ct, nil
+		case ac.state == TransientFailure && failFast:
+			ac.mu.Unlock()
+			return nil, Errorf(codes.Unavailable, "grpc: RPC failed fast due to transport failure")
 		default:
 			ready := ac.ready
 			if ready == nil {
@@ -670,7 +671,7 @@ func (ac *addrConn) wait(ctx context.Context) (transport.ClientTransport, error)
 			ac.mu.Unlock()
 			select {
 			case <-ctx.Done():
-				return nil, transport.ContextErr(ctx.Err())
+				return nil, toRPCErr(ctx.Err())
 			// Wait until the new transport is ready or failed.
 			case <-ready:
 			}

+ 8 - 6
cmd/vendor/google.golang.org/grpc/credentials/credentials.go

@@ -66,7 +66,7 @@ type PerRPCCredentials interface {
 	// TODO(zhaoq): Define the set of the qualified keys instead of leaving
 	// it as an arbitrary string.
 	GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error)
-	// RequireTransportSecurity indicates whether the credentails requires
+	// RequireTransportSecurity indicates whether the credentials requires
 	// transport security.
 	RequireTransportSecurity() bool
 }
@@ -116,7 +116,7 @@ func (t TLSInfo) AuthType() string {
 // tlsCreds is the credentials required for authenticating a connection using TLS.
 type tlsCreds struct {
 	// TLS configuration
-	config tls.Config
+	config *tls.Config
 }
 
 func (c tlsCreds) Info() ProtocolInfo {
@@ -151,14 +151,16 @@ func (c *tlsCreds) ClientHandshake(addr string, rawConn net.Conn, timeout time.D
 			errChannel <- timeoutError{}
 		})
 	}
+	// use local cfg to avoid clobbering ServerName if using multiple endpoints
+	cfg := *c.config
 	if c.config.ServerName == "" {
 		colonPos := strings.LastIndex(addr, ":")
 		if colonPos == -1 {
 			colonPos = len(addr)
 		}
-		c.config.ServerName = addr[:colonPos]
+		cfg.ServerName = addr[:colonPos]
 	}
-	conn := tls.Client(rawConn, &c.config)
+	conn := tls.Client(rawConn, &cfg)
 	if timeout == 0 {
 		err = conn.Handshake()
 	} else {
@@ -177,7 +179,7 @@ func (c *tlsCreds) ClientHandshake(addr string, rawConn net.Conn, timeout time.D
 }
 
 func (c *tlsCreds) ServerHandshake(rawConn net.Conn) (net.Conn, AuthInfo, error) {
-	conn := tls.Server(rawConn, &c.config)
+	conn := tls.Server(rawConn, c.config)
 	if err := conn.Handshake(); err != nil {
 		rawConn.Close()
 		return nil, nil, err
@@ -187,7 +189,7 @@ func (c *tlsCreds) ServerHandshake(rawConn net.Conn) (net.Conn, AuthInfo, error)
 
 // NewTLS uses c to construct a TransportCredentials based on TLS.
 func NewTLS(c *tls.Config) TransportCredentials {
-	tc := &tlsCreds{*c}
+	tc := &tlsCreds{c}
 	tc.config.NextProtos = alpnProtoStr
 	return tc
 }

+ 41 - 7
cmd/vendor/google.golang.org/grpc/rpc_util.go

@@ -141,6 +141,8 @@ type callInfo struct {
 	traceInfo traceInfo // in trace.go
 }
 
+var defaultCallInfo = callInfo{failFast: true}
+
 // CallOption configures a Call before it starts or extracts information from
 // a Call after it completes.
 type CallOption interface {
@@ -179,6 +181,19 @@ func Trailer(md *metadata.MD) CallOption {
 	})
 }
 
+// FailFast configures the action to take when an RPC is attempted on broken
+// connections or unreachable servers. If failfast is true, the RPC will fail
+// immediately. Otherwise, the RPC client will block the call until a
+// connection is available (or the call is canceled or times out) and will retry
+// the call if it fails due to a transient error. Please refer to
+// https://github.com/grpc/grpc/blob/master/doc/fail_fast.md
+func FailFast(failFast bool) CallOption {
+	return beforeCall(func(c *callInfo) error {
+		c.failFast = failFast
+		return nil
+	})
+}
+
 // The format of the payload: compressed or not?
 type payloadFormat uint8
 
@@ -319,7 +334,7 @@ type rpcError struct {
 	desc string
 }
 
-func (e rpcError) Error() string {
+func (e *rpcError) Error() string {
 	return fmt.Sprintf("rpc error: code = %d desc = %s", e.code, e.desc)
 }
 
@@ -329,7 +344,7 @@ func Code(err error) codes.Code {
 	if err == nil {
 		return codes.OK
 	}
-	if e, ok := err.(rpcError); ok {
+	if e, ok := err.(*rpcError); ok {
 		return e.code
 	}
 	return codes.Unknown
@@ -341,7 +356,7 @@ func ErrorDesc(err error) string {
 	if err == nil {
 		return ""
 	}
-	if e, ok := err.(rpcError); ok {
+	if e, ok := err.(*rpcError); ok {
 		return e.desc
 	}
 	return err.Error()
@@ -353,7 +368,7 @@ func Errorf(c codes.Code, format string, a ...interface{}) error {
 	if c == codes.OK {
 		return nil
 	}
-	return rpcError{
+	return &rpcError{
 		code: c,
 		desc: fmt.Sprintf(format, a...),
 	}
@@ -362,18 +377,37 @@ func Errorf(c codes.Code, format string, a ...interface{}) error {
 // toRPCErr converts an error into a rpcError.
 func toRPCErr(err error) error {
 	switch e := err.(type) {
-	case rpcError:
+	case *rpcError:
 		return err
 	case transport.StreamError:
-		return rpcError{
+		return &rpcError{
 			code: e.Code,
 			desc: e.Desc,
 		}
 	case transport.ConnectionError:
-		return rpcError{
+		return &rpcError{
 			code: codes.Internal,
 			desc: e.Desc,
 		}
+	default:
+		switch err {
+		case context.DeadlineExceeded:
+			return &rpcError{
+				code: codes.DeadlineExceeded,
+				desc: err.Error(),
+			}
+		case context.Canceled:
+			return &rpcError{
+				code: codes.Canceled,
+				desc: err.Error(),
+			}
+		case ErrClientConnClosing:
+			return &rpcError{
+				code: codes.FailedPrecondition,
+				desc: err.Error(),
+			}
+		}
+
 	}
 	return Errorf(codes.Unknown, "%v", err)
 }

+ 54 - 4
cmd/vendor/google.golang.org/grpc/server.go

@@ -82,6 +82,7 @@ type service struct {
 	server interface{} // the server for service methods
 	md     map[string]*MethodDesc
 	sd     map[string]*StreamDesc
+	mdata  interface{}
 }
 
 // Server is a gRPC server to serve RPC requests.
@@ -231,6 +232,7 @@ func (s *Server) register(sd *ServiceDesc, ss interface{}) {
 		server: ss,
 		md:     make(map[string]*MethodDesc),
 		sd:     make(map[string]*StreamDesc),
+		mdata:  sd.Metadata,
 	}
 	for i := range sd.Methods {
 		d := &sd.Methods[i]
@@ -243,6 +245,52 @@ func (s *Server) register(sd *ServiceDesc, ss interface{}) {
 	s.m[sd.ServiceName] = srv
 }
 
+// MethodInfo contains the information of an RPC including its method name and type.
+type MethodInfo struct {
+	// Name is the method name only, without the service name or package name.
+	Name string
+	// IsClientStream indicates whether the RPC is a client streaming RPC.
+	IsClientStream bool
+	// IsServerStream indicates whether the RPC is a server streaming RPC.
+	IsServerStream bool
+}
+
+// ServiceInfo contains unary RPC method info, streaming RPC methid info and metadata for a service.
+type ServiceInfo struct {
+	Methods []MethodInfo
+	// Metadata is the metadata specified in ServiceDesc when registering service.
+	Metadata interface{}
+}
+
+// GetServiceInfo returns a map from service names to ServiceInfo.
+// Service names include the package names, in the form of <package>.<service>.
+func (s *Server) GetServiceInfo() map[string]*ServiceInfo {
+	ret := make(map[string]*ServiceInfo)
+	for n, srv := range s.m {
+		methods := make([]MethodInfo, 0, len(srv.md)+len(srv.sd))
+		for m := range srv.md {
+			methods = append(methods, MethodInfo{
+				Name:           m,
+				IsClientStream: false,
+				IsServerStream: false,
+			})
+		}
+		for m, d := range srv.sd {
+			methods = append(methods, MethodInfo{
+				Name:           m,
+				IsClientStream: d.ClientStreams,
+				IsServerStream: d.ServerStreams,
+			})
+		}
+
+		ret[n] = &ServiceInfo{
+			Methods:  methods,
+			Metadata: srv.mdata,
+		}
+	}
+	return ret
+}
+
 var (
 	// ErrServerStopped indicates that the operation is now illegal because of
 	// the server being stopped.
@@ -272,9 +320,11 @@ func (s *Server) Serve(lis net.Listener) error {
 	s.lis[lis] = true
 	s.mu.Unlock()
 	defer func() {
-		lis.Close()
 		s.mu.Lock()
-		delete(s.lis, lis)
+		if s.lis != nil && s.lis[lis] {
+			lis.Close()
+			delete(s.lis, lis)
+		}
 		s.mu.Unlock()
 	}()
 	for {
@@ -529,7 +579,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
 		}
 		reply, appErr := md.Handler(srv.server, stream.Context(), df, s.opts.unaryInt)
 		if appErr != nil {
-			if err, ok := appErr.(rpcError); ok {
+			if err, ok := appErr.(*rpcError); ok {
 				statusCode = err.code
 				statusDesc = err.desc
 			} else {
@@ -614,7 +664,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
 		appErr = s.opts.streamInt(srv.server, ss, info, sd.Handler)
 	}
 	if appErr != nil {
-		if err, ok := appErr.(rpcError); ok {
+		if err, ok := appErr.(*rpcError); ok {
 			ss.statusCode = err.code
 			ss.statusDesc = err.desc
 		} else if err, ok := appErr.(transport.StreamError); ok {

+ 53 - 15
cmd/vendor/google.golang.org/grpc/stream.go

@@ -102,16 +102,15 @@ type ClientStream interface {
 func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
 	var (
 		t   transport.ClientTransport
+		s   *transport.Stream
 		err error
 		put func()
 	)
-	// TODO(zhaoq): CallOption is omitted. Add support when it is needed.
-	gopts := BalancerGetOptions{
-		BlockingWait: false,
-	}
-	t, put, err = cc.getTransport(ctx, gopts)
-	if err != nil {
-		return nil, toRPCErr(err)
+	c := defaultCallInfo
+	for _, o := range opts {
+		if err := o.before(&c); err != nil {
+			return nil, toRPCErr(err)
+		}
 	}
 	callHdr := &transport.CallHdr{
 		Host:   cc.authority,
@@ -122,8 +121,9 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
 		callHdr.SendCompress = cc.dopts.cp.Type()
 	}
 	cs := &clientStream{
+		opts:    opts,
+		c:       c,
 		desc:    desc,
-		put:     put,
 		codec:   cc.dopts.codec,
 		cp:      cc.dopts.cp,
 		dc:      cc.dopts.dc,
@@ -142,11 +142,44 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
 		cs.trInfo.tr.LazyLog(&cs.trInfo.firstLine, false)
 		ctx = trace.NewContext(ctx, cs.trInfo.tr)
 	}
-	s, err := t.NewStream(ctx, callHdr)
-	if err != nil {
-		cs.finish(err)
-		return nil, toRPCErr(err)
+	gopts := BalancerGetOptions{
+		BlockingWait: !c.failFast,
 	}
+	for {
+		t, put, err = cc.getTransport(ctx, gopts)
+		if err != nil {
+			// TODO(zhaoq): Probably revisit the error handling.
+			if _, ok := err.(*rpcError); ok {
+				return nil, err
+			}
+			if err == errConnClosing {
+				if c.failFast {
+					return nil, Errorf(codes.Unavailable, "%v", errConnClosing)
+				}
+				continue
+			}
+			// All the other errors are treated as Internal errors.
+			return nil, Errorf(codes.Internal, "%v", err)
+		}
+
+		s, err = t.NewStream(ctx, callHdr)
+		if err != nil {
+			if put != nil {
+				put()
+				put = nil
+			}
+			if _, ok := err.(transport.ConnectionError); ok {
+				if c.failFast {
+					cs.finish(err)
+					return nil, toRPCErr(err)
+				}
+				continue
+			}
+			return nil, toRPCErr(err)
+		}
+		break
+	}
+	cs.put = put
 	cs.t = t
 	cs.s = s
 	cs.p = &parser{r: s}
@@ -167,6 +200,8 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
 
 // clientStream implements a client side Stream.
 type clientStream struct {
+	opts  []CallOption
+	c     callInfo
 	t     transport.ClientTransport
 	s     *transport.Stream
 	p     *parser
@@ -312,15 +347,18 @@ func (cs *clientStream) closeTransportStream(err error) {
 }
 
 func (cs *clientStream) finish(err error) {
-	if !cs.tracing {
-		return
-	}
 	cs.mu.Lock()
 	defer cs.mu.Unlock()
+	for _, o := range cs.opts {
+		o.after(&cs.c)
+	}
 	if cs.put != nil {
 		cs.put()
 		cs.put = nil
 	}
+	if !cs.tracing {
+		return
+	}
 	if cs.trInfo.tr != nil {
 		if err == nil || err == io.EOF {
 			cs.trInfo.tr.LazyPrintf("RPC: [OK]")

+ 1 - 1
cmd/vendor/google.golang.org/grpc/transport/handler_server.go

@@ -312,7 +312,7 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream)) {
 		Addr: ht.RemoteAddr(),
 	}
 	if req.TLS != nil {
-		pr.AuthInfo = credentials.TLSInfo{*req.TLS}
+		pr.AuthInfo = credentials.TLSInfo{State: *req.TLS}
 	}
 	ctx = metadata.NewContext(ctx, ht.headerMD)
 	ctx = peer.NewContext(ctx, pr)

+ 4 - 1
cmd/vendor/google.golang.org/grpc/transport/http2_client.go

@@ -175,7 +175,10 @@ func newHTTP2Client(addr string, opts *ConnectOptions) (_ ClientTransport, err e
 		return nil, ConnectionErrorf("transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
 	}
 	if initialWindowSize != defaultWindowSize {
-		err = t.framer.writeSettings(true, http2.Setting{http2.SettingInitialWindowSize, uint32(initialWindowSize)})
+		err = t.framer.writeSettings(true, http2.Setting{
+			ID:  http2.SettingInitialWindowSize,
+			Val: uint32(initialWindowSize),
+		})
 	} else {
 		err = t.framer.writeSettings(true)
 	}

+ 7 - 2
cmd/vendor/google.golang.org/grpc/transport/http2_server.go

@@ -100,10 +100,15 @@ func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthI
 	if maxStreams == 0 {
 		maxStreams = math.MaxUint32
 	} else {
-		settings = append(settings, http2.Setting{http2.SettingMaxConcurrentStreams, maxStreams})
+		settings = append(settings, http2.Setting{
+			ID:  http2.SettingMaxConcurrentStreams,
+			Val: maxStreams,
+		})
 	}
 	if initialWindowSize != defaultWindowSize {
-		settings = append(settings, http2.Setting{http2.SettingInitialWindowSize, uint32(initialWindowSize)})
+		settings = append(settings, http2.Setting{
+			ID:  http2.SettingInitialWindowSize,
+			Val: uint32(initialWindowSize)})
 	}
 	if err := framer.writeSettings(true, settings...); err != nil {
 		return nil, ConnectionErrorf("transport: %v", err)