Browse Source

clientv3: add 'Sync' method

Gyu-Ho Lee 9 years ago
parent
commit
cdb1e34799
3 changed files with 56 additions and 3 deletions
  1. 15 1
      clientv3/balancer.go
  2. 33 0
      clientv3/client.go
  3. 8 2
      clientv3/config.go

+ 15 - 1
clientv3/balancer.go

@@ -96,10 +96,24 @@ func getHost2ep(eps []string) map[string]string {
 }
 
 func (b *simpleBalancer) updateAddrs(eps []string) {
+	np := getHost2ep(eps)
+
 	b.mu.Lock()
 	defer b.mu.Unlock()
 
-	b.host2ep = getHost2ep(eps)
+	match := len(np) == len(b.host2ep)
+	for k, v := range np {
+		if b.host2ep[k] != v {
+			match = false
+			break
+		}
+	}
+	if match {
+		// same endpoints, so no need to update address
+		return
+	}
+
+	b.host2ep = np
 
 	addrs := make([]grpc.Address, 0, len(eps))
 	for i := range eps {

+ 33 - 0
clientv3/client.go

@@ -105,6 +105,38 @@ func (c *Client) SetEndpoints(eps ...string) {
 	c.balancer.updateAddrs(eps)
 }
 
+// Sync synchronizes client's endpoints with the known endpoints from the etcd membership.
+func (c *Client) Sync(ctx context.Context) error {
+	mresp, err := c.MemberList(ctx)
+	if err != nil {
+		return err
+	}
+	var eps []string
+	for _, m := range mresp.Members {
+		eps = append(eps, m.ClientURLs...)
+	}
+	c.SetEndpoints(eps...)
+	return nil
+}
+
+func (c *Client) autoSync() {
+	if c.cfg.AutoSyncInterval == time.Duration(0) {
+		return
+	}
+
+	for {
+		select {
+		case <-c.ctx.Done():
+			return
+		case <-time.After(c.cfg.AutoSyncInterval):
+			ctx, _ := context.WithTimeout(c.ctx, 5*time.Second)
+			if err := c.Sync(ctx); err != nil && err != c.ctx.Err() {
+				logger.Println("Auto sync endpoints failed:", err)
+			}
+		}
+	}
+}
+
 type authTokenCredential struct {
 	token string
 }
@@ -292,6 +324,7 @@ func newClient(cfg *Config) (*Client, error) {
 		logger.Set(log.New(ioutil.Discard, "", 0))
 	}
 
+	go client.autoSync()
 	return client, nil
 }
 

+ 8 - 2
clientv3/config.go

@@ -28,6 +28,10 @@ type Config struct {
 	// Endpoints is a list of URLs
 	Endpoints []string
 
+	// AutoSyncInterval is the interval to update endpoints with its latest members.
+	// 0 disables auto-sync. By default auto-sync is disabled.
+	AutoSyncInterval time.Duration
+
 	// DialTimeout is the timeout for failing to establish a connection.
 	DialTimeout time.Duration
 
@@ -46,6 +50,7 @@ type Config struct {
 
 type yamlConfig struct {
 	Endpoints             []string      `json:"endpoints"`
+	AutoSyncInterval      time.Duration `json:"auto-sync-interval"`
 	DialTimeout           time.Duration `json:"dial-timeout"`
 	InsecureTransport     bool          `json:"insecure-transport"`
 	InsecureSkipTLSVerify bool          `json:"insecure-skip-tls-verify"`
@@ -68,8 +73,9 @@ func configFromFile(fpath string) (*Config, error) {
 	}
 
 	cfg := &Config{
-		Endpoints:   yc.Endpoints,
-		DialTimeout: yc.DialTimeout,
+		Endpoints:        yc.Endpoints,
+		AutoSyncInterval: yc.AutoSyncInterval,
+		DialTimeout:      yc.DialTimeout,
 	}
 
 	if yc.InsecureTransport {