Sfoglia il codice sorgente

Merge pull request #5278 from heyitsanthony/fix-clientv3-disconnects

clientv3: fix disconnect breakage
Anthony Romano 9 anni fa
parent
commit
879cfe7666
8 ha cambiato i file con 226 aggiunte e 189 eliminazioni
  1. 78 20
      clientv3/client.go
  2. 11 37
      clientv3/cluster.go
  3. 11 42
      clientv3/kv.go
  4. 9 35
      clientv3/lease.go
  5. 10 26
      clientv3/maintenance.go
  6. 79 0
      clientv3/remote_client.go
  7. 3 4
      clientv3/txn.go
  8. 25 25
      clientv3/watch.go

+ 78 - 20
clientv3/client.go

@@ -50,6 +50,14 @@ type Client struct {
 
 	ctx    context.Context
 	cancel context.CancelFunc
+
+	// fields below are managed by connMonitor
+
+	// reconnc accepts writes which signal the client should reconnect
+	reconnc chan error
+	// newconnc is closed on successful connect and set to a fresh channel
+	newconnc    chan struct{}
+	lastConnErr error
 }
 
 // New creates a new etcdv3 client from a given configuration.
@@ -87,10 +95,13 @@ func (c *Client) Close() error {
 	}
 	c.cancel()
 	c.cancel = nil
+	err := c.conn.Close()
+	connc := c.newconnc
 	c.mu.Unlock()
 	c.Watcher.Close()
 	c.Lease.Close()
-	return c.conn.Close()
+	<-connc
+	return err
 }
 
 // Ctx is a context for "out of band" messages (e.g., for sending
@@ -161,12 +172,17 @@ func newClient(cfg *Config) (*Client, error) {
 		return nil, err
 	}
 	client := &Client{
-		conn:   conn,
-		cfg:    *cfg,
-		creds:  creds,
-		ctx:    ctx,
-		cancel: cancel,
+		conn:     conn,
+		cfg:      *cfg,
+		creds:    creds,
+		ctx:      ctx,
+		cancel:   cancel,
+		reconnc:  make(chan error),
+		newconnc: make(chan struct{}),
 	}
+
+	go client.connMonitor()
+
 	client.Cluster = NewCluster(client)
 	client.KV = NewKV(client)
 	client.Lease = NewLease(client)
@@ -191,7 +207,7 @@ func (c *Client) ActiveConnection() *grpc.ClientConn {
 }
 
 // retryConnection establishes a new connection
-func (c *Client) retryConnection(oldConn *grpc.ClientConn, err error) (*grpc.ClientConn, error) {
+func (c *Client) retryConnection(err error) (newConn *grpc.ClientConn, dialErr error) {
 	c.mu.Lock()
 	defer c.mu.Unlock()
 	if err != nil {
@@ -200,24 +216,66 @@ func (c *Client) retryConnection(oldConn *grpc.ClientConn, err error) (*grpc.Cli
 	if c.cancel == nil {
 		return nil, c.ctx.Err()
 	}
-	if oldConn != c.conn {
-		// conn has already been updated
-		return c.conn, nil
-	}
-
-	oldConn.Close()
-	if st, _ := oldConn.State(); st != grpc.Shutdown {
-		// wait for shutdown so grpc doesn't leak sleeping goroutines
-		oldConn.WaitForStateChange(c.ctx, st)
+	if c.conn != nil {
+		c.conn.Close()
+		if st, _ := c.conn.State(); st != grpc.Shutdown {
+			// wait so grpc doesn't leak sleeping goroutines
+			c.conn.WaitForStateChange(c.ctx, st)
+		}
 	}
 
-	conn, dialErr := c.cfg.RetryDialer(c)
+	c.conn, dialErr = c.cfg.RetryDialer(c)
 	if dialErr != nil {
 		c.errors = append(c.errors, dialErr)
-		return nil, dialErr
 	}
-	c.conn = conn
-	return c.conn, nil
+	return c.conn, dialErr
+}
+
+// connStartRetry schedules a reconnect if one is not already running
+func (c *Client) connStartRetry(err error) {
+	select {
+	case c.reconnc <- err:
+	default:
+	}
+}
+
+// connWait waits for a reconnect to be processed
+func (c *Client) connWait(ctx context.Context, err error) (*grpc.ClientConn, error) {
+	c.mu.Lock()
+	ch := c.newconnc
+	c.mu.Unlock()
+	c.connStartRetry(err)
+	select {
+	case <-ctx.Done():
+		return nil, ctx.Err()
+	case <-ch:
+	}
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	return c.conn, c.lastConnErr
+}
+
+// connMonitor monitors the connection and handles retries
+func (c *Client) connMonitor() {
+	var err error
+	for {
+		select {
+		case err = <-c.reconnc:
+		case <-c.ctx.Done():
+			c.mu.Lock()
+			c.lastConnErr = c.ctx.Err()
+			close(c.newconnc)
+			c.mu.Unlock()
+			return
+		}
+		conn, connErr := c.retryConnection(err)
+		c.mu.Lock()
+		c.lastConnErr = connErr
+		c.conn = conn
+		close(c.newconnc)
+		c.newconnc = make(chan struct{})
+		c.mu.Unlock()
+	}
 }
 
 // dialEndpointList attempts to connect to each endpoint in order until a

+ 11 - 37
clientv3/cluster.go

@@ -15,8 +15,6 @@
 package clientv3
 
 import (
-	"sync"
-
 	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"golang.org/x/net/context"
@@ -46,22 +44,15 @@ type Cluster interface {
 }
 
 type cluster struct {
-	c *Client
-
-	mu     sync.Mutex
-	conn   *grpc.ClientConn // conn in-use
+	rc     *remoteClient
 	remote pb.ClusterClient
 }
 
 func NewCluster(c *Client) Cluster {
-	conn := c.ActiveConnection()
-
-	return &cluster{
-		c: c,
-
-		conn:   conn,
-		remote: pb.NewClusterClient(conn),
-	}
+	ret := &cluster{}
+	f := func(conn *grpc.ClientConn) { ret.remote = pb.NewClusterClient(conn) }
+	ret.rc = newRemoteClient(c, f)
+	return ret
 }
 
 func (c *cluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error) {
@@ -75,7 +66,7 @@ func (c *cluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAdd
 		return nil, rpctypes.Error(err)
 	}
 
-	go c.switchRemote(err)
+	c.rc.reconnect(err)
 	return nil, rpctypes.Error(err)
 }
 
@@ -90,7 +81,7 @@ func (c *cluster) MemberRemove(ctx context.Context, id uint64) (*MemberRemoveRes
 		return nil, rpctypes.Error(err)
 	}
 
-	go c.switchRemote(err)
+	c.rc.reconnect(err)
 	return nil, rpctypes.Error(err)
 }
 
@@ -107,8 +98,7 @@ func (c *cluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []strin
 			return nil, rpctypes.Error(err)
 		}
 
-		err = c.switchRemote(err)
-		if err != nil {
+		if err = c.rc.reconnectWait(ctx, err); err != nil {
 			return nil, rpctypes.Error(err)
 		}
 	}
@@ -126,30 +116,14 @@ func (c *cluster) MemberList(ctx context.Context) (*MemberListResponse, error) {
 			return nil, rpctypes.Error(err)
 		}
 
-		err = c.switchRemote(err)
-		if err != nil {
+		if err = c.rc.reconnectWait(ctx, err); err != nil {
 			return nil, rpctypes.Error(err)
 		}
 	}
 }
 
 func (c *cluster) getRemote() pb.ClusterClient {
-	c.mu.Lock()
-	defer c.mu.Unlock()
-
+	c.rc.mu.Lock()
+	defer c.rc.mu.Unlock()
 	return c.remote
 }
-
-func (c *cluster) switchRemote(prevErr error) error {
-	newConn, err := c.c.retryConnection(c.conn, prevErr)
-	if err != nil {
-		return err
-	}
-
-	c.mu.Lock()
-	defer c.mu.Unlock()
-
-	c.conn = newConn
-	c.remote = pb.NewClusterClient(c.conn)
-	return nil
-}

+ 11 - 42
clientv3/kv.go

@@ -15,8 +15,6 @@
 package clientv3
 
 import (
-	"sync"
-
 	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"golang.org/x/net/context"
@@ -76,23 +74,15 @@ type OpResponse struct {
 }
 
 type kv struct {
-	c *Client
-
-	mu     sync.Mutex       // guards all fields
-	conn   *grpc.ClientConn // conn in-use
+	rc     *remoteClient
 	remote pb.KVClient
 }
 
 func NewKV(c *Client) KV {
-	conn := c.ActiveConnection()
-	remote := pb.NewKVClient(conn)
-
-	return &kv{
-		conn:   c.ActiveConnection(),
-		remote: remote,
-
-		c: c,
-	}
+	ret := &kv{}
+	f := func(conn *grpc.ClientConn) { ret.remote = pb.NewKVClient(conn) }
+	ret.rc = newRemoteClient(c, f)
+	return ret
 }
 
 func (kv *kv) Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error) {
@@ -111,17 +101,14 @@ func (kv *kv) Delete(ctx context.Context, key string, opts ...OpOption) (*Delete
 }
 
 func (kv *kv) Compact(ctx context.Context, rev int64) error {
-	remote := kv.getRemote()
-	_, err := remote.Compact(ctx, &pb.CompactionRequest{Revision: rev})
+	_, err := kv.getRemote().Compact(ctx, &pb.CompactionRequest{Revision: rev})
 	if err == nil {
 		return nil
 	}
-
 	if isHaltErr(ctx, err) {
 		return rpctypes.Error(err)
 	}
-
-	go kv.switchRemote(remote, err)
+	kv.rc.reconnect(err)
 	return rpctypes.Error(err)
 }
 
@@ -174,36 +161,18 @@ func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) {
 
 		// do not retry on modifications
 		if op.isWrite() {
-			go kv.switchRemote(remote, err)
+			kv.rc.reconnect(err)
 			return OpResponse{}, rpctypes.Error(err)
 		}
 
-		if nerr := kv.switchRemote(remote, err); nerr != nil {
+		if nerr := kv.rc.reconnectWait(ctx, err); nerr != nil {
 			return OpResponse{}, nerr
 		}
 	}
 }
 
-func (kv *kv) switchRemote(remote pb.KVClient, prevErr error) error {
-	kv.mu.Lock()
-	oldRemote := kv.remote
-	conn := kv.conn
-	kv.mu.Unlock()
-	if remote != oldRemote {
-		return nil
-	}
-	newConn, err := kv.c.retryConnection(conn, prevErr)
-	kv.mu.Lock()
-	defer kv.mu.Unlock()
-	if err == nil {
-		kv.conn = newConn
-		kv.remote = pb.NewKVClient(kv.conn)
-	}
-	return rpctypes.Error(err)
-}
-
 func (kv *kv) getRemote() pb.KVClient {
-	kv.mu.Lock()
-	defer kv.mu.Unlock()
+	kv.rc.mu.Lock()
+	defer kv.rc.mu.Unlock()
 	return kv.remote
 }

+ 9 - 35
clientv3/lease.go

@@ -71,14 +71,12 @@ type Lease interface {
 }
 
 type lessor struct {
-	c *Client
-
-	mu   sync.Mutex       // guards all fields
-	conn *grpc.ClientConn // conn in-use
+	mu sync.Mutex // guards all fields
 
 	// donec is closed when recvKeepAliveLoop stops
 	donec chan struct{}
 
+	rc     *remoteClient
 	remote pb.LeaseClient
 
 	stream       pb.Lease_LeaseKeepAliveClient
@@ -102,14 +100,12 @@ type keepAlive struct {
 
 func NewLease(c *Client) Lease {
 	l := &lessor{
-		c:    c,
-		conn: c.ActiveConnection(),
-
 		donec:      make(chan struct{}),
 		keepAlives: make(map[LeaseID]*keepAlive),
 	}
+	f := func(conn *grpc.ClientConn) { l.remote = pb.NewLeaseClient(conn) }
+	l.rc = newRemoteClient(c, f)
 
-	l.remote = pb.NewLeaseClient(l.conn)
 	l.stopCtx, l.stopCancel = context.WithCancel(context.Background())
 
 	go l.recvKeepAliveLoop()
@@ -283,7 +279,6 @@ func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAlive
 
 func (l *lessor) recvKeepAliveLoop() {
 	defer func() {
-		l.stopCancel()
 		l.mu.Lock()
 		close(l.donec)
 		for _, ka := range l.keepAlives {
@@ -386,8 +381,8 @@ func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) {
 }
 
 func (l *lessor) getRemote() pb.LeaseClient {
-	l.mu.Lock()
-	defer l.mu.Unlock()
+	l.rc.mu.Lock()
+	defer l.rc.mu.Unlock()
 	return l.remote
 }
 
@@ -399,36 +394,15 @@ func (l *lessor) getKeepAliveStream() pb.Lease_LeaseKeepAliveClient {
 
 func (l *lessor) switchRemoteAndStream(prevErr error) error {
 	for {
-		l.mu.Lock()
-		conn := l.conn
-		l.mu.Unlock()
-
-		var (
-			err     error
-			newConn *grpc.ClientConn
-		)
-
 		if prevErr != nil {
-			conn.Close()
-			newConn, err = l.c.retryConnection(conn, prevErr)
+			err := l.rc.reconnectWait(l.stopCtx, prevErr)
 			if err != nil {
 				return rpctypes.Error(err)
 			}
 		}
-
-		l.mu.Lock()
-		if newConn != nil {
-			l.conn = newConn
-		}
-
-		l.remote = pb.NewLeaseClient(l.conn)
-		l.mu.Unlock()
-
-		prevErr = l.newStream()
-		if prevErr != nil {
-			continue
+		if prevErr = l.newStream(); prevErr == nil {
+			return nil
 		}
-		return nil
 	}
 }
 

+ 10 - 26
clientv3/maintenance.go

@@ -16,7 +16,6 @@ package clientv3
 
 import (
 	"io"
-	"sync"
 
 	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
@@ -57,18 +56,15 @@ type Maintenance interface {
 type maintenance struct {
 	c *Client
 
-	mu     sync.Mutex
-	conn   *grpc.ClientConn // conn in-use
+	rc     *remoteClient
 	remote pb.MaintenanceClient
 }
 
 func NewMaintenance(c *Client) Maintenance {
-	conn := c.ActiveConnection()
-	return &maintenance{
-		c:      c,
-		conn:   conn,
-		remote: pb.NewMaintenanceClient(conn),
-	}
+	ret := &maintenance{c: c}
+	f := func(conn *grpc.ClientConn) { ret.remote = pb.NewMaintenanceClient(conn) }
+	ret.rc = newRemoteClient(c, f)
+	return ret
 }
 
 func (m *maintenance) AlarmList(ctx context.Context) (*AlarmResponse, error) {
@@ -85,7 +81,7 @@ func (m *maintenance) AlarmList(ctx context.Context) (*AlarmResponse, error) {
 		if isHaltErr(ctx, err) {
 			return nil, rpctypes.Error(err)
 		}
-		if err = m.switchRemote(err); err != nil {
+		if err = m.rc.reconnectWait(ctx, err); err != nil {
 			return nil, err
 		}
 	}
@@ -118,8 +114,8 @@ func (m *maintenance) AlarmDisarm(ctx context.Context, am *AlarmMember) (*AlarmR
 	if err == nil {
 		return (*AlarmResponse)(resp), nil
 	}
-	if isHaltErr(ctx, err) {
-		go m.switchRemote(err)
+	if !isHaltErr(ctx, err) {
+		m.rc.reconnect(err)
 	}
 	return nil, rpctypes.Error(err)
 }
@@ -178,19 +174,7 @@ func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) {
 }
 
 func (m *maintenance) getRemote() pb.MaintenanceClient {
-	m.mu.Lock()
-	defer m.mu.Unlock()
+	m.rc.mu.Lock()
+	defer m.rc.mu.Unlock()
 	return m.remote
 }
-
-func (m *maintenance) switchRemote(prevErr error) error {
-	m.mu.Lock()
-	defer m.mu.Unlock()
-	newConn, err := m.c.retryConnection(m.conn, prevErr)
-	if err != nil {
-		return rpctypes.Error(err)
-	}
-	m.conn = newConn
-	m.remote = pb.NewMaintenanceClient(m.conn)
-	return nil
-}

+ 79 - 0
clientv3/remote_client.go

@@ -0,0 +1,79 @@
+// Copyright 2016 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package clientv3
+
+import (
+	"sync"
+
+	"golang.org/x/net/context"
+	"google.golang.org/grpc"
+)
+
+type remoteClient struct {
+	client     *Client
+	conn       *grpc.ClientConn
+	updateConn func(*grpc.ClientConn)
+	mu         sync.Mutex
+}
+
+func newRemoteClient(client *Client, update func(*grpc.ClientConn)) *remoteClient {
+	ret := &remoteClient{
+		client:     client,
+		conn:       client.ActiveConnection(),
+		updateConn: update,
+	}
+	ret.mu.Lock()
+	defer ret.mu.Unlock()
+	ret.updateConn(ret.conn)
+	return ret
+}
+
+// reconnectWait reconnects the client, returning when connection establishes/fails.
+func (r *remoteClient) reconnectWait(ctx context.Context, prevErr error) error {
+	r.mu.Lock()
+	updated := r.tryUpdate()
+	r.mu.Unlock()
+	if updated {
+		return nil
+	}
+	conn, err := r.client.connWait(ctx, prevErr)
+	if err == nil {
+		r.mu.Lock()
+		r.conn = conn
+		r.updateConn(conn)
+		r.mu.Unlock()
+	}
+	return err
+}
+
+// reconnect will reconnect the client without waiting
+func (r *remoteClient) reconnect(err error) {
+	r.mu.Lock()
+	defer r.mu.Unlock()
+	if r.tryUpdate() {
+		return
+	}
+	r.client.connStartRetry(err)
+}
+
+func (r *remoteClient) tryUpdate() bool {
+	activeConn := r.client.ActiveConnection()
+	if activeConn == nil || activeConn == r.conn {
+		return false
+	}
+	r.conn = activeConn
+	r.updateConn(activeConn)
+	return true
+}

+ 3 - 4
clientv3/txn.go

@@ -141,9 +141,8 @@ func (txn *txn) Commit() (*TxnResponse, error) {
 	kv := txn.kv
 
 	for {
-		remote := kv.getRemote()
 		r := &pb.TxnRequest{Compare: txn.cmps, Success: txn.sus, Failure: txn.fas}
-		resp, err := remote.Txn(txn.ctx, r)
+		resp, err := kv.getRemote().Txn(txn.ctx, r)
 		if err == nil {
 			return (*TxnResponse)(resp), nil
 		}
@@ -153,11 +152,11 @@ func (txn *txn) Commit() (*TxnResponse, error) {
 		}
 
 		if txn.isWrite {
-			go kv.switchRemote(remote, err)
+			kv.rc.reconnect(err)
 			return nil, rpctypes.Error(err)
 		}
 
-		if nerr := kv.switchRemote(remote, err); nerr != nil {
+		if nerr := kv.rc.reconnectWait(txn.ctx, err); nerr != nil {
 			return nil, nerr
 		}
 	}

+ 25 - 25
clientv3/watch.go

@@ -87,8 +87,7 @@ func (wr *WatchResponse) IsProgressNotify() bool {
 
 // watcher implements the Watcher interface
 type watcher struct {
-	c      *Client
-	conn   *grpc.ClientConn
+	rc     *remoteClient
 	remote pb.WatchClient
 
 	// ctx controls internal remote.Watch requests
@@ -142,13 +141,7 @@ type watcherStream struct {
 
 func NewWatcher(c *Client) Watcher {
 	ctx, cancel := context.WithCancel(context.Background())
-	conn := c.ActiveConnection()
-
 	w := &watcher{
-		c:      c,
-		conn:   conn,
-		remote: pb.NewWatchClient(conn),
-
 		ctx:     ctx,
 		cancel:  cancel,
 		streams: make(map[int64]*watcherStream),
@@ -159,6 +152,10 @@ func NewWatcher(c *Client) Watcher {
 		donec: make(chan struct{}),
 		errc:  make(chan error, 1),
 	}
+
+	f := func(conn *grpc.ClientConn) { w.remote = pb.NewWatchClient(conn) }
+	w.rc = newRemoteClient(c, f)
+
 	go w.run()
 	return w
 }
@@ -204,10 +201,7 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
 }
 
 func (w *watcher) Close() error {
-	select {
-	case w.stopc <- struct{}{}:
-	case <-w.donec:
-	}
+	close(w.stopc)
 	<-w.donec
 	return v3rpc.Error(<-w.errc)
 }
@@ -277,15 +271,20 @@ func (w *watcher) closeStream(ws *watcherStream) {
 
 // run is the root of the goroutines for managing a watcher client
 func (w *watcher) run() {
+	var wc pb.Watch_WatchClient
+	var closeErr error
+
 	defer func() {
+		select {
+		case w.errc <- closeErr:
+		default:
+		}
 		close(w.donec)
 		w.cancel()
 	}()
 
 	// start a stream with the etcd grpc server
-	wc, wcerr := w.newWatchClient()
-	if wcerr != nil {
-		w.errc <- wcerr
+	if wc, closeErr = w.newWatchClient(); closeErr != nil {
 		return
 	}
 
@@ -335,8 +334,7 @@ func (w *watcher) run() {
 		// watch client failed to recv; spawn another if possible
 		// TODO report watch client errors from errc?
 		case <-w.errc:
-			if wc, wcerr = w.newWatchClient(); wcerr != nil {
-				w.errc <- wcerr
+			if wc, closeErr = w.newWatchClient(); closeErr != nil {
 				return
 			}
 			curReqC = w.reqc
@@ -345,7 +343,6 @@ func (w *watcher) run() {
 			}
 			cancelSet = make(map[int64]struct{})
 		case <-w.stopc:
-			w.errc <- nil
 			return
 		}
 
@@ -503,17 +500,20 @@ func (w *watcher) resume() (ws pb.Watch_WatchClient, err error) {
 // openWatchClient retries opening a watchclient until retryConnection fails
 func (w *watcher) openWatchClient() (ws pb.Watch_WatchClient, err error) {
 	for {
-		if ws, err = w.remote.Watch(w.ctx); ws != nil {
+		select {
+		case <-w.stopc:
+			if err == nil {
+				err = context.Canceled
+			}
+			return nil, err
+		default:
+		}
+		if ws, err = w.remote.Watch(w.ctx); ws != nil && err == nil {
 			break
 		} else if isHaltErr(w.ctx, err) {
 			return nil, v3rpc.Error(err)
 		}
-		newConn, nerr := w.c.retryConnection(w.conn, nil)
-		if nerr != nil {
-			return nil, nerr
-		}
-		w.conn = newConn
-		w.remote = pb.NewWatchClient(w.conn)
+		err = w.rc.reconnectWait(w.ctx, nil)
 	}
 	return ws, nil
 }