|
|
@@ -73,7 +73,9 @@ var (
|
|
|
errConnDrain = errors.New("grpc: the connection is drained")
|
|
|
// errConnClosing indicates that the connection is closing.
|
|
|
errConnClosing = errors.New("grpc: the connection is closing")
|
|
|
- errNoAddr = errors.New("grpc: there is no address available to dial")
|
|
|
+ // errConnUnavailable indicates that the connection is unavailable.
|
|
|
+ errConnUnavailable = errors.New("grpc: the connection is unavailable")
|
|
|
+ errNoAddr = errors.New("grpc: there is no address available to dial")
|
|
|
// minimum time to give a connection to complete
|
|
|
minConnectTimeout = 20 * time.Second
|
|
|
)
|
|
|
@@ -213,9 +215,14 @@ func WithUserAgent(s string) DialOption {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-// Dial creates a client connection the given target.
|
|
|
+// Dial creates a client connection to the given target.
|
|
|
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
|
|
|
- ctx := context.Background()
|
|
|
+ return DialContext(context.Background(), target, opts...)
|
|
|
+}
|
|
|
+
|
|
|
+// DialContext creates a client connection to the given target
|
|
|
+// using the supplied context.
|
|
|
+func DialContext(ctx context.Context, target string, opts ...DialOption) (*ClientConn, error) {
|
|
|
cc := &ClientConn{
|
|
|
target: target,
|
|
|
conns: make(map[Address]*addrConn),
|
|
|
@@ -472,6 +479,10 @@ func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions)
|
|
|
if cc.dopts.balancer == nil {
|
|
|
// If balancer is nil, there should be only one addrConn available.
|
|
|
cc.mu.RLock()
|
|
|
+ if cc.conns == nil {
|
|
|
+ cc.mu.RUnlock()
|
|
|
+ return nil, nil, toRPCErr(ErrClientConnClosing)
|
|
|
+ }
|
|
|
for _, ac = range cc.conns {
|
|
|
// Break after the first iteration to get the first addrConn.
|
|
|
ok = true
|
|
|
@@ -501,11 +512,7 @@ func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions)
|
|
|
}
|
|
|
return nil, nil, errConnClosing
|
|
|
}
|
|
|
- // ac.wait should block on transient failure only if balancer is nil and RPC is non-failfast.
|
|
|
- // - If RPC is failfast, ac.wait should not block.
|
|
|
- // - If balancer is not nil, ac.wait should return errConnClosing on transient failure
|
|
|
- // so that non-failfast RPCs will try to get a new transport instead of waiting on ac.
|
|
|
- t, err := ac.wait(ctx, cc.dopts.balancer == nil && opts.BlockingWait)
|
|
|
+ t, err := ac.wait(ctx, cc.dopts.balancer != nil, !opts.BlockingWait)
|
|
|
if err != nil {
|
|
|
if put != nil {
|
|
|
put()
|
|
|
@@ -757,36 +764,42 @@ func (ac *addrConn) transportMonitor() {
|
|
|
}
|
|
|
|
|
|
// wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or
|
|
|
-// iv) transport is in TransientFailure and blocking is false.
|
|
|
-func (ac *addrConn) wait(ctx context.Context, blocking bool) (transport.ClientTransport, error) {
|
|
|
+// iv) transport is in TransientFailure and there's no balancer/failfast is true.
|
|
|
+func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (transport.ClientTransport, error) {
|
|
|
for {
|
|
|
ac.mu.Lock()
|
|
|
switch {
|
|
|
case ac.state == Shutdown:
|
|
|
- err := ac.tearDownErr
|
|
|
+ if failfast || !hasBalancer {
|
|
|
+ // RPC is failfast or balancer is nil. This RPC should fail with ac.tearDownErr.
|
|
|
+ err := ac.tearDownErr
|
|
|
+ ac.mu.Unlock()
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
ac.mu.Unlock()
|
|
|
- return nil, err
|
|
|
+ return nil, errConnClosing
|
|
|
case ac.state == Ready:
|
|
|
ct := ac.transport
|
|
|
ac.mu.Unlock()
|
|
|
return ct, nil
|
|
|
- case ac.state == TransientFailure && !blocking:
|
|
|
- ac.mu.Unlock()
|
|
|
- return nil, errConnClosing
|
|
|
- default:
|
|
|
- ready := ac.ready
|
|
|
- if ready == nil {
|
|
|
- ready = make(chan struct{})
|
|
|
- ac.ready = ready
|
|
|
- }
|
|
|
- ac.mu.Unlock()
|
|
|
- select {
|
|
|
- case <-ctx.Done():
|
|
|
- return nil, toRPCErr(ctx.Err())
|
|
|
- // Wait until the new transport is ready or failed.
|
|
|
- case <-ready:
|
|
|
+ case ac.state == TransientFailure:
|
|
|
+ if failfast || hasBalancer {
|
|
|
+ ac.mu.Unlock()
|
|
|
+ return nil, errConnUnavailable
|
|
|
}
|
|
|
}
|
|
|
+ ready := ac.ready
|
|
|
+ if ready == nil {
|
|
|
+ ready = make(chan struct{})
|
|
|
+ ac.ready = ready
|
|
|
+ }
|
|
|
+ ac.mu.Unlock()
|
|
|
+ select {
|
|
|
+ case <-ctx.Done():
|
|
|
+ return nil, toRPCErr(ctx.Err())
|
|
|
+ // Wait until the new transport is ready or failed.
|
|
|
+ case <-ready:
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|