Browse Source

client: add AutoSync function

AutoSync provides the way for client to syncing member list from
etcd cluster automatically.
Yicheng Qin 10 years ago
parent
commit
c53b3016ae
2 changed files with 77 additions and 0 deletions
  1. 32 0
      client/client.go
  2. 45 0
      client/client_test.go

+ 32 - 0
client/client.go

@@ -122,6 +122,22 @@ type Client interface {
 	// Sync updates the internal cache of the etcd cluster's membership.
 	// Sync updates the internal cache of the etcd cluster's membership.
 	Sync(context.Context) error
 	Sync(context.Context) error
 
 
+	// AutoSync periodically calls Sync() every given interval.
+	// The recommended sync interval is 10 seconds to 1 minute, which does
+	// not bring too much overhead to server and makes client catch up the
+	// cluster change in time.
+	//
+	// The example to use it:
+	//
+	//  for {
+	//      err := client.AutoSync(ctx, 10*time.Second)
+	//      if err == context.DeadlineExceeded || err == context.Canceled {
+	//          break
+	//      }
+	//      log.Print(err)
+	//  }
+	AutoSync(context.Context, time.Duration) error
+
 	// Endpoints returns a copy of the current set of API endpoints used
 	// Endpoints returns a copy of the current set of API endpoints used
 	// by Client to resolve HTTP requests. If Sync has ever been called,
 	// by Client to resolve HTTP requests. If Sync has ever been called,
 	// this may differ from the initial Endpoints provided in the Config.
 	// this may differ from the initial Endpoints provided in the Config.
@@ -290,6 +306,22 @@ func (c *httpClusterClient) Sync(ctx context.Context) error {
 	return c.reset(eps)
 	return c.reset(eps)
 }
 }
 
 
+func (c *httpClusterClient) AutoSync(ctx context.Context, interval time.Duration) error {
+	ticker := time.NewTicker(interval)
+	defer ticker.Stop()
+	for {
+		err := c.Sync(ctx)
+		if err != nil {
+			return err
+		}
+		select {
+		case <-ctx.Done():
+			return ctx.Err()
+		case <-ticker.C:
+		}
+	}
+}
+
 type roundTripResponse struct {
 type roundTripResponse struct {
 	resp *http.Response
 	resp *http.Response
 	err  error
 	err  error

+ 45 - 0
client/client_test.go

@@ -757,6 +757,51 @@ func TestHTTPClusterClientSyncFail(t *testing.T) {
 	}
 	}
 }
 }
 
 
+func TestHTTPClusterClientAutoSyncCancelContext(t *testing.T) {
+	cf := newStaticHTTPClientFactory([]staticHTTPResponse{
+		staticHTTPResponse{
+			resp: http.Response{StatusCode: http.StatusOK, Header: http.Header{"Content-Type": []string{"application/json"}}},
+			body: []byte(`{"members":[{"id":"2745e2525fce8fe","peerURLs":["http://127.0.0.1:7003"],"name":"node3","clientURLs":["http://127.0.0.1:4003"]},{"id":"42134f434382925","peerURLs":["http://127.0.0.1:2380","http://127.0.0.1:7001"],"name":"node1","clientURLs":["http://127.0.0.1:2379","http://127.0.0.1:4001"]},{"id":"94088180e21eb87b","peerURLs":["http://127.0.0.1:7002"],"name":"node2","clientURLs":["http://127.0.0.1:4002"]}]}`),
+		},
+	})
+
+	hc := &httpClusterClient{
+		clientFactory: cf,
+		rand:          rand.New(rand.NewSource(0)),
+	}
+	err := hc.reset([]string{"http://127.0.0.1:2379"})
+	if err != nil {
+		t.Fatalf("unexpected error during setup: %#v", err)
+	}
+	ctx, cancel := context.WithCancel(context.Background())
+	cancel()
+
+	err = hc.AutoSync(ctx, time.Hour)
+	if err != context.Canceled {
+		t.Fatalf("incorrect error value: want=%v got=%v", context.Canceled, err)
+	}
+}
+
+func TestHTTPClusterClientAutoSyncFail(t *testing.T) {
+	cf := newStaticHTTPClientFactory([]staticHTTPResponse{
+		staticHTTPResponse{err: errors.New("fail!")},
+	})
+
+	hc := &httpClusterClient{
+		clientFactory: cf,
+		rand:          rand.New(rand.NewSource(0)),
+	}
+	err := hc.reset([]string{"http://127.0.0.1:2379"})
+	if err != nil {
+		t.Fatalf("unexpected error during setup: %#v", err)
+	}
+
+	err = hc.AutoSync(context.Background(), time.Hour)
+	if err.Error() != ErrClusterUnavailable.Error() {
+		t.Fatalf("incorrect error value: want=%v got=%v", ErrClusterUnavailable, err)
+	}
+}
+
 func TestHTTPClusterClientResetFail(t *testing.T) {
 func TestHTTPClusterClientResetFail(t *testing.T) {
 	tests := [][]string{
 	tests := [][]string{
 		// need at least one endpoint
 		// need at least one endpoint