Browse Source

clientv3: throttle reconnection rate

Client was reconnecting after establishing connections because the lease
and watch APIs were thrashing. Instead, wait a little before accepting
new reconnect requests.
Anthony Romano 9 years ago
parent
commit
e8101ddf09
1 changed files with 21 additions and 8 deletions
  1. 21 8
      clientv3/client.go

+ 21 - 8
clientv3/client.go

@@ -27,6 +27,7 @@ import (
 	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
 
 	"golang.org/x/net/context"
+	"golang.org/x/time/rate"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/credentials"
 	"google.golang.org/grpc/metadata"
@@ -34,6 +35,9 @@ import (
 
 var (
 	ErrNoAvailableEndpoints = errors.New("etcdclient: no available endpoints")
+
+	// minConnRetryWait is the minimum time between reconnects to avoid flooding
+	minConnRetryWait = time.Second
 )
 
 // Client provides and manages an etcd v3 client session.
@@ -191,7 +195,7 @@ func newClient(cfg *Config) (*Client, error) {
 		creds:    creds,
 		ctx:      ctx,
 		cancel:   cancel,
-		reconnc:  make(chan error),
+		reconnc:  make(chan error, 1),
 		newconnc: make(chan struct{}),
 	}
 
@@ -248,8 +252,11 @@ func (c *Client) retryConnection(err error) (newConn *grpc.ClientConn, dialErr e
 
 // connStartRetry schedules a reconnect if one is not already running
 func (c *Client) connStartRetry(err error) {
+	c.mu.Lock()
+	ch := c.reconnc
+	defer c.mu.Unlock()
 	select {
-	case c.reconnc <- err:
+	case ch <- err:
 	default:
 	}
 }
@@ -273,15 +280,20 @@ func (c *Client) connWait(ctx context.Context, err error) (*grpc.ClientConn, err
 // connMonitor monitors the connection and handles retries
 func (c *Client) connMonitor() {
 	var err error
-	for {
+
+	defer func() {
+		_, err = c.retryConnection(c.ctx.Err())
+		c.mu.Lock()
+		c.lastConnErr = err
+		close(c.newconnc)
+		c.mu.Unlock()
+	}()
+
+	limiter := rate.NewLimiter(rate.Every(minConnRetryWait), 1)
+	for limiter.Wait(c.ctx) == nil {
 		select {
 		case err = <-c.reconnc:
 		case <-c.ctx.Done():
-			_, err = c.retryConnection(c.ctx.Err())
-			c.mu.Lock()
-			c.lastConnErr = err
-			close(c.newconnc)
-			c.mu.Unlock()
 			return
 		}
 		conn, connErr := c.retryConnection(err)
@@ -290,6 +302,7 @@ func (c *Client) connMonitor() {
 		c.conn = conn
 		close(c.newconnc)
 		c.newconnc = make(chan struct{})
+		c.reconnc = make(chan error, 1)
 		c.mu.Unlock()
 	}
 }