Browse Source

Merge pull request #5543 from heyitsanthony/clientv3-unblock-reconnect

clientv3: don't hold client lock while dialing
Anthony Romano 9 years ago
parent
commit
88afb0b0a6

+ 57 - 31
clientv3/client.go

@@ -101,25 +101,39 @@ func NewFromConfigFile(path string) (*Client, error) {
 }
 
 // Close shuts down the client's etcd connections.
-func (c *Client) Close() error {
+func (c *Client) Close() (err error) {
 	c.mu.Lock()
 	defer c.mu.Unlock()
+
+	// acquire the cancel
 	if c.cancel == nil {
-		return nil
+		// already canceled
+		if c.lastConnErr != c.ctx.Err() {
+			err = c.lastConnErr
+		}
+		return
 	}
-	c.cancel()
+	cancel := c.cancel
 	c.cancel = nil
-	connc := c.newconnc
 	c.mu.Unlock()
-	c.connStartRetry(nil)
+
+	// close watcher and lease before terminating connection
+	// so they don't retry on a closed client
 	c.Watcher.Close()
 	c.Lease.Close()
+
+	// cancel reconnection loop
+	cancel()
+	c.mu.Lock()
+	connc := c.newconnc
+	c.mu.Unlock()
+	// connc on cancel() is left closed
 	<-connc
 	c.mu.Lock()
 	if c.lastConnErr != c.ctx.Err() {
-		return c.lastConnErr
+		err = c.lastConnErr
 	}
-	return nil
+	return
 }
 
 // Ctx is a context for "out of band" messages (e.g., for sending
@@ -278,34 +292,48 @@ func newClient(cfg *Config) (*Client, error) {
 func (c *Client) ActiveConnection() *grpc.ClientConn {
 	c.mu.RLock()
 	defer c.mu.RUnlock()
+	if c.conn == nil {
+		panic("trying to return nil active connection")
+	}
 	return c.conn
 }
 
 // retryConnection establishes a new connection
-func (c *Client) retryConnection(err error) (newConn *grpc.ClientConn, dialErr error) {
+func (c *Client) retryConnection(err error) {
+	oldconn := c.conn
+
+	// return holding lock so old connection can be cleaned up in this defer
+	defer func() {
+		if oldconn != nil {
+			oldconn.Close()
+			if st, _ := oldconn.State(); st != grpc.Shutdown {
+				// wait so grpc doesn't leak sleeping goroutines
+				oldconn.WaitForStateChange(context.Background(), st)
+			}
+		}
+		c.mu.Unlock()
+	}()
+
 	c.mu.Lock()
-	defer c.mu.Unlock()
 	if err != nil {
 		c.errors = append(c.errors, err)
 	}
-	if c.conn != nil {
-		c.conn.Close()
-		if st, _ := c.conn.State(); st != grpc.Shutdown {
-			// wait so grpc doesn't leak sleeping goroutines
-			c.conn.WaitForStateChange(context.Background(), st)
-		}
-		c.conn = nil
-	}
 	if c.cancel == nil {
 		// client has called Close() so don't try to dial out
-		return nil, c.ctx.Err()
+		return
 	}
+	c.mu.Unlock()
 
-	c.conn, dialErr = c.cfg.retryDialer(c)
+	nc, dialErr := c.cfg.retryDialer(c)
+
+	c.mu.Lock()
+	if nc != nil {
+		c.conn = nc
+	}
 	if dialErr != nil {
 		c.errors = append(c.errors, dialErr)
 	}
-	return c.conn, dialErr
+	c.lastConnErr = dialErr
 }
 
 // connStartRetry schedules a reconnect if one is not already running
@@ -321,17 +349,20 @@ func (c *Client) connStartRetry(err error) {
 
 // connWait waits for a reconnect to be processed
 func (c *Client) connWait(ctx context.Context, err error) (*grpc.ClientConn, error) {
-	c.mu.Lock()
+	c.mu.RLock()
 	ch := c.newconnc
-	c.mu.Unlock()
+	c.mu.RUnlock()
 	c.connStartRetry(err)
 	select {
 	case <-ctx.Done():
 		return nil, ctx.Err()
 	case <-ch:
 	}
-	c.mu.Lock()
-	defer c.mu.Unlock()
+	c.mu.RLock()
+	defer c.mu.RUnlock()
+	if c.cancel == nil {
+		return c.conn, rpctypes.ErrConnClosed
+	}
 	return c.conn, c.lastConnErr
 }
 
@@ -340,11 +371,8 @@ func (c *Client) connMonitor() {
 	var err error
 
 	defer func() {
-		_, err = c.retryConnection(c.ctx.Err())
-		c.mu.Lock()
-		c.lastConnErr = err
+		c.retryConnection(c.ctx.Err())
 		close(c.newconnc)
-		c.mu.Unlock()
 	}()
 
 	limiter := rate.NewLimiter(rate.Every(minConnRetryWait), 1)
@@ -354,10 +382,8 @@ func (c *Client) connMonitor() {
 		case <-c.ctx.Done():
 			return
 		}
-		conn, connErr := c.retryConnection(err)
+		c.retryConnection(err)
 		c.mu.Lock()
-		c.lastConnErr = connErr
-		c.conn = conn
 		close(c.newconnc)
 		c.newconnc = make(chan struct{})
 		c.reconnc = make(chan error, 1)

+ 18 - 2
clientv3/integration/kv_test.go

@@ -131,6 +131,13 @@ func TestKVPutWithRequireLeader(t *testing.T) {
 	if err != rpctypes.ErrNoLeader {
 		t.Fatal(err)
 	}
+
+	// clients may give timeout errors since the members are stopped; take
+	// the clients so that terminating the cluster won't complain
+	clus.Client(1).Close()
+	clus.Client(2).Close()
+	clus.TakeClient(1)
+	clus.TakeClient(2)
 }
 
 func TestKVRange(t *testing.T) {
@@ -633,13 +640,22 @@ func TestKVPutStoppedServerAndClose(t *testing.T) {
 	defer testutil.AfterTest(t)
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
 	defer clus.Terminate(t)
+	cli := clus.Client(0)
 	clus.Members[0].Stop(t)
 	// this Put fails and triggers an asynchronous connection retry
-	_, err := clus.Client(0).Put(context.TODO(), "abc", "123")
+	_, err := cli.Put(context.TODO(), "abc", "123")
 	if err == nil ||
 		(!strings.Contains(err.Error(), "connection is closing") &&
 			!strings.Contains(err.Error(), "transport is closing")) {
 		t.Fatal(err)
 	}
-	// cluster will terminate and close the client with the retry in-flight
+
+	// wait some so the client closes with the retry in-flight
+	time.Sleep(time.Second)
+
+	// get the timeout
+	clus.TakeClient(0)
+	if err := cli.Close(); err == nil || !strings.Contains(err.Error(), "timed out") {
+		t.Fatal(err)
+	}
 }

+ 1 - 1
clientv3/integration/txn_test.go

@@ -74,7 +74,7 @@ func TestTxnWriteFail(t *testing.T) {
 
 	dialTimeout := 5 * time.Second
 	select {
-	case <-time.After(2*dialTimeout + time.Second):
+	case <-time.After(dialTimeout + time.Second):
 		t.Fatalf("timed out waiting for txn to fail")
 	case <-donec:
 		// don't restart cluster until txn errors out

+ 3 - 1
clientv3/remote_client.go

@@ -88,9 +88,11 @@ func (r *remoteClient) acquire(ctx context.Context) error {
 		r.client.mu.RLock()
 		closed := r.client.cancel == nil
 		c := r.client.conn
+		lastConnErr := r.client.lastConnErr
 		match := r.conn == c
 		r.mu.Unlock()
-		if c != nil && match {
+		if lastConnErr == nil && match {
+			// new connection already
 			return nil
 		}
 		r.client.mu.RUnlock()

+ 1 - 1
clientv3/txn_test.go

@@ -20,7 +20,7 @@ import (
 )
 
 func TestTxnPanics(t *testing.T) {
-	kv := NewKV(&Client{})
+	kv := &kv{}
 
 	errc := make(chan string)
 	df := func() {

+ 3 - 0
clientv3/watch.go

@@ -521,6 +521,9 @@ func (w *watcher) openWatchClient() (ws pb.Watch_WatchClient, err error) {
 			return nil, v3rpc.Error(err)
 		}
 		w.rc.release()
+		if nerr := w.rc.reconnectWait(w.ctx, err); nerr != nil {
+			return nil, v3rpc.Error(nerr)
+		}
 	}
 	return ws, nil
 }

+ 1 - 9
integration/cluster.go

@@ -795,15 +795,7 @@ func (c *ClusterV3) Terminate(t *testing.T) {
 }
 
 func (c *ClusterV3) RandClient() *clientv3.Client {
-	for i := 0; i < 100; i++ {
-		cli := c.clients[rand.Intn(len(c.clients))]
-		if cli.ActiveConnection() == nil {
-			time.Sleep(10 * time.Millisecond)
-			continue
-		}
-		return cli
-	}
-	panic("failed to get a active client")
+	return c.clients[rand.Intn(len(c.clients))]
 }
 
 func (c *ClusterV3) Client(i int) *clientv3.Client {