Selaa lähdekoodia

Merge pull request #4312 from heyitsanthony/v3-client-connretry

clientv3: connection retry and customizable endpoint selection
Anthony Romano 10 vuotta sitten
vanhempi
commit
3df91f85c4
1 muutettua tiedostoa jossa 75 lisäystä ja 19 poistoa
  1. 75 19
      clientv3/client.go

+ 75 - 19
clientv3/client.go

@@ -15,10 +15,13 @@
 package clientv3
 
 import (
+	"sync"
+
 	"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 )
 
+// Client provides and manages an etcd v3 client session.
 type Client struct {
 	// KV is the keyvalue API for the client's connection.
 	KV pb.KVClient
@@ -29,26 +32,36 @@ type Client struct {
 	// Cluster is the cluster API for the client's connection.
 	Cluster pb.ClusterClient
 
-	conn *grpc.ClientConn
-	cfg  Config
+	conn   *grpc.ClientConn
+	cfg    Config
+	mu     sync.RWMutex // protects connection selection and error list
+	errors []error      // errors passed to retryConnection
 }
 
+// EndpointDialer is a policy for choosing which endpoint to dial next
+type EndpointDialer func(*Client) (*grpc.ClientConn, error)
+
 type Config struct {
 	// Endpoints is a list of URLs
 	Endpoints []string
 
+	// RetryDialer chooses the next endpoint to use
+	RetryDialer EndpointDialer
+
 	// TODO TLS options
 }
 
 // New creates a new etcdv3 client from a given configuration.
 func New(cfg Config) (*Client, error) {
-	conn, err := cfg.dialEndpoints()
+	if cfg.RetryDialer == nil {
+		cfg.RetryDialer = dialEndpointList
+	}
+	// use a temporary skeleton client to bootstrap first connection
+	conn, err := cfg.RetryDialer(&Client{cfg: cfg})
 	if err != nil {
 		return nil, err
 	}
-	client := newClient(conn)
-	client.cfg = cfg
-	return client, nil
+	return newClient(conn, &cfg), nil
 }
 
 // NewFromURL creates a new etcdv3 client from a URL.
@@ -57,37 +70,80 @@ func NewFromURL(url string) (*Client, error) {
 }
 
 // NewFromConn creates a new etcdv3 client from an established grpc Connection.
-func NewFromConn(conn *grpc.ClientConn) *Client {
-	return newClient(conn)
-}
+func NewFromConn(conn *grpc.ClientConn) *Client { return newClient(conn, nil) }
 
 // Clone creates a copy of client with the old connection and new API clients.
-func (c *Client) Clone() *Client {
-	cl := newClient(c.conn)
-	cl.cfg = c.cfg
-	return cl
-}
+func (c *Client) Clone() *Client { return newClient(c.conn, &c.cfg) }
 
 // Close shuts down the client's etcd connections.
 func (c *Client) Close() error {
 	return c.conn.Close()
 }
 
-func newClient(conn *grpc.ClientConn) *Client {
+// Endpoints lists the registered endpoints for the client.
+func (c *Client) Endpoints() []string { return c.cfg.Endpoints }
+
+// Errors returns all errors that have been observed since called last.
+func (c *Client) Errors() (errs []error) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	errs = c.errors
+	c.errors = nil
+	return errs
+}
+
+// Dial establishes a connection for a given endpoint using the client's config
+func (c *Client) Dial(endpoint string) (*grpc.ClientConn, error) {
+	// TODO: enable grpc.WithTransportCredentials(creds)
+	return grpc.Dial(endpoint, grpc.WithInsecure())
+}
+
+func newClient(conn *grpc.ClientConn, cfg *Config) *Client {
+	if cfg == nil {
+		cfg = &Config{RetryDialer: dialEndpointList}
+	}
 	return &Client{
 		KV:      pb.NewKVClient(conn),
 		Lease:   pb.NewLeaseClient(conn),
 		Watch:   pb.NewWatchClient(conn),
 		Cluster: pb.NewClusterClient(conn),
 		conn:    conn,
+		cfg:     *cfg,
+	}
+}
+
+// activeConnection returns the current in-use connection
+func (c *Client) activeConnection() *grpc.ClientConn {
+	c.mu.RLock()
+	defer c.mu.RUnlock()
+	return c.conn
+}
+
+// refreshConnection establishes a new connection
+func (c *Client) retryConnection(oldConn *grpc.ClientConn, err error) *grpc.ClientConn {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if err != nil {
+		c.errors = append(c.errors, err)
+	}
+	if oldConn != c.conn {
+		// conn has already been updated
+		return c.conn
+	}
+	conn, dialErr := c.cfg.RetryDialer(c)
+	if dialErr != nil {
+		c.errors = append(c.errors, dialErr)
 	}
+	c.conn = conn
+	return c.conn
 }
 
-func (cfg *Config) dialEndpoints() (*grpc.ClientConn, error) {
+// dialEndpoints attempts to connect to each endpoint in order until a
+// connection is established.
+func dialEndpointList(c *Client) (*grpc.ClientConn, error) {
 	var err error
-	for _, ep := range cfg.Endpoints {
-		// TODO: enable grpc.WithTransportCredentials(creds)
-		conn, curErr := grpc.Dial(ep, grpc.WithInsecure())
+	for _, ep := range c.Endpoints() {
+		conn, curErr := c.Dial(ep)
 		if err != nil {
 			err = curErr
 		} else {