Jelajahi Sumber

Merge pull request #4706 from heyitsanthony/fix-client-close-deadlock

clientv3: don't deadlock on Close with broken connection
Anthony Romano 9 tahun lalu
induk
melakukan
632461cc50
2 mengubah file dengan 25 tambahan dan 10 penghapusan
  1. 23 7
      clientv3/client.go
  2. 2 3
      integration/v3_grpc_test.go

+ 23 - 7
clientv3/client.go

@@ -87,12 +87,13 @@ func NewFromURL(url string) (*Client, error) {
 // Close shuts down the client's etcd connections.
 func (c *Client) Close() error {
 	c.mu.Lock()
-	defer c.mu.Unlock()
 	if c.cancel == nil {
+		c.mu.Unlock()
 		return nil
 	}
 	c.cancel()
 	c.cancel = nil
+	c.mu.Unlock()
 	c.Watcher.Close()
 	c.Lease.Close()
 	return c.conn.Close()
@@ -126,14 +127,22 @@ func (c *Client) Dial(endpoint string) (*grpc.ClientConn, error) {
 	} else {
 		opts = append(opts, grpc.WithInsecure())
 	}
+
+	proto := "tcp"
 	if url, uerr := url.Parse(endpoint); uerr == nil && url.Scheme == "unix" {
-		f := func(a string, t time.Duration) (net.Conn, error) {
-			return net.DialTimeout("unix", a, t)
-		}
+		proto = "unix"
 		// strip unix:// prefix so certs work
 		endpoint = url.Host
-		opts = append(opts, grpc.WithDialer(f))
 	}
+	f := func(a string, t time.Duration) (net.Conn, error) {
+		select {
+		case <-c.ctx.Done():
+			return nil, c.ctx.Err()
+		default:
+		}
+		return net.DialTimeout(proto, a, t)
+	}
+	opts = append(opts, grpc.WithDialer(f))
 
 	conn, err := grpc.Dial(endpoint, opts...)
 	if err != nil {
@@ -156,11 +165,11 @@ func newClient(cfg *Config) (*Client, error) {
 		creds = &c
 	}
 	// use a temporary skeleton client to bootstrap first connection
-	conn, err := cfg.RetryDialer(&Client{cfg: *cfg, creds: creds})
+	ctx, cancel := context.WithCancel(context.TODO())
+	conn, err := cfg.RetryDialer(&Client{cfg: *cfg, creds: creds, ctx: ctx})
 	if err != nil {
 		return nil, err
 	}
-	ctx, cancel := context.WithCancel(context.TODO())
 	client := &Client{
 		conn:   conn,
 		cfg:    *cfg,
@@ -198,6 +207,13 @@ func (c *Client) retryConnection(oldConn *grpc.ClientConn, err error) (*grpc.Cli
 		// conn has already been updated
 		return c.conn, nil
 	}
+
+	oldConn.Close()
+	if st, _ := oldConn.State(); st != grpc.Shutdown {
+		// wait for shutdown so grpc doesn't leak sleeping goroutines
+		oldConn.WaitForStateChange(c.ctx, st)
+	}
+
 	conn, dialErr := c.cfg.RetryDialer(c)
 	if dialErr != nil {
 		c.errors = append(c.errors, dialErr)

+ 2 - 3
integration/v3_grpc_test.go

@@ -675,10 +675,9 @@ func TestTLSGRPCRejectInsecureClient(t *testing.T) {
 	st, err = conn.WaitForStateChange(ctx, st)
 	if err != nil {
 		t.Fatalf("unexpected error waiting for change (%v)", err)
-	} else if st != grpc.Connecting && st != grpc.TransientFailure {
-		t.Fatalf("expected connecting or transient failure state, got %v", st)
+	} else if st == grpc.Ready {
+		t.Fatalf("expected failure state, got %v", st)
 	}
-
 	cancel()
 	if perr := <-donec; perr == nil {
 		t.Fatalf("expected client error on put")