Browse Source

Merge pull request #4020 from xiang90/ctl_04

etcdctl: support etcd0.4
Xiang Li 10 years ago
parent
commit
2681137fe0
3 changed files with 69 additions and 12 deletions
  1. 8 3
      client/client.go
  2. 8 8
      client/client_test.go
  3. 53 1
      etcdctl/command/util.go

+ 8 - 3
client/client.go

@@ -162,6 +162,11 @@ type Client interface {
 	// this may differ from the initial Endpoints provided in the Config.
 	// this may differ from the initial Endpoints provided in the Config.
 	Endpoints() []string
 	Endpoints() []string
 
 
+	// SetEndpoints sets the set of API endpoints used by Client to resolve
+	// HTTP requests. If the given endpoints are not valid, an error will be
+	// returned
+	SetEndpoints(eps []string) error
+
 	httpClient
 	httpClient
 }
 }
 
 
@@ -176,7 +181,7 @@ func New(cfg Config) (Client, error) {
 			password: cfg.Password,
 			password: cfg.Password,
 		}
 		}
 	}
 	}
-	if err := c.reset(cfg.Endpoints); err != nil {
+	if err := c.SetEndpoints(cfg.Endpoints); err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
 	return c, nil
 	return c, nil
@@ -219,7 +224,7 @@ type httpClusterClient struct {
 	rand *rand.Rand
 	rand *rand.Rand
 }
 }
 
 
-func (c *httpClusterClient) reset(eps []string) error {
+func (c *httpClusterClient) SetEndpoints(eps []string) error {
 	if len(eps) == 0 {
 	if len(eps) == 0 {
 		return ErrNoEndpoints
 		return ErrNoEndpoints
 	}
 	}
@@ -341,7 +346,7 @@ func (c *httpClusterClient) Sync(ctx context.Context) error {
 		return nil
 		return nil
 	}
 	}
 
 
-	return c.reset(eps)
+	return c.SetEndpoints(eps)
 }
 }
 
 
 func (c *httpClusterClient) AutoSync(ctx context.Context, interval time.Duration) error {
 func (c *httpClusterClient) AutoSync(ctx context.Context, interval time.Duration) error {

+ 8 - 8
client/client_test.go

@@ -705,7 +705,7 @@ func TestHTTPClusterClientSync(t *testing.T) {
 		clientFactory: cf,
 		clientFactory: cf,
 		rand:          rand.New(rand.NewSource(0)),
 		rand:          rand.New(rand.NewSource(0)),
 	}
 	}
-	err := hc.reset([]string{"http://127.0.0.1:2379"})
+	err := hc.SetEndpoints([]string{"http://127.0.0.1:2379"})
 	if err != nil {
 	if err != nil {
 		t.Fatalf("unexpected error during setup: %#v", err)
 		t.Fatalf("unexpected error during setup: %#v", err)
 	}
 	}
@@ -728,7 +728,7 @@ func TestHTTPClusterClientSync(t *testing.T) {
 		t.Fatalf("incorrect endpoints post-Sync: want=%#v got=%#v", want, got)
 		t.Fatalf("incorrect endpoints post-Sync: want=%#v got=%#v", want, got)
 	}
 	}
 
 
-	err = hc.reset([]string{"http://127.0.0.1:4009"})
+	err = hc.SetEndpoints([]string{"http://127.0.0.1:4009"})
 	if err != nil {
 	if err != nil {
 		t.Fatalf("unexpected error during reset: %#v", err)
 		t.Fatalf("unexpected error during reset: %#v", err)
 	}
 	}
@@ -749,7 +749,7 @@ func TestHTTPClusterClientSyncFail(t *testing.T) {
 		clientFactory: cf,
 		clientFactory: cf,
 		rand:          rand.New(rand.NewSource(0)),
 		rand:          rand.New(rand.NewSource(0)),
 	}
 	}
-	err := hc.reset([]string{"http://127.0.0.1:2379"})
+	err := hc.SetEndpoints([]string{"http://127.0.0.1:2379"})
 	if err != nil {
 	if err != nil {
 		t.Fatalf("unexpected error during setup: %#v", err)
 		t.Fatalf("unexpected error during setup: %#v", err)
 	}
 	}
@@ -783,7 +783,7 @@ func TestHTTPClusterClientAutoSyncCancelContext(t *testing.T) {
 		clientFactory: cf,
 		clientFactory: cf,
 		rand:          rand.New(rand.NewSource(0)),
 		rand:          rand.New(rand.NewSource(0)),
 	}
 	}
-	err := hc.reset([]string{"http://127.0.0.1:2379"})
+	err := hc.SetEndpoints([]string{"http://127.0.0.1:2379"})
 	if err != nil {
 	if err != nil {
 		t.Fatalf("unexpected error during setup: %#v", err)
 		t.Fatalf("unexpected error during setup: %#v", err)
 	}
 	}
@@ -805,7 +805,7 @@ func TestHTTPClusterClientAutoSyncFail(t *testing.T) {
 		clientFactory: cf,
 		clientFactory: cf,
 		rand:          rand.New(rand.NewSource(0)),
 		rand:          rand.New(rand.NewSource(0)),
 	}
 	}
-	err := hc.reset([]string{"http://127.0.0.1:2379"})
+	err := hc.SetEndpoints([]string{"http://127.0.0.1:2379"})
 	if err != nil {
 	if err != nil {
 		t.Fatalf("unexpected error during setup: %#v", err)
 		t.Fatalf("unexpected error during setup: %#v", err)
 	}
 	}
@@ -838,7 +838,7 @@ func TestHTTPClusterClientSyncPinEndpoint(t *testing.T) {
 		clientFactory: cf,
 		clientFactory: cf,
 		rand:          rand.New(rand.NewSource(0)),
 		rand:          rand.New(rand.NewSource(0)),
 	}
 	}
-	err := hc.reset([]string{"http://127.0.0.1:4003", "http://127.0.0.1:2379", "http://127.0.0.1:4001", "http://127.0.0.1:4002"})
+	err := hc.SetEndpoints([]string{"http://127.0.0.1:4003", "http://127.0.0.1:2379", "http://127.0.0.1:4001", "http://127.0.0.1:4002"})
 	if err != nil {
 	if err != nil {
 		t.Fatalf("unexpected error during setup: %#v", err)
 		t.Fatalf("unexpected error during setup: %#v", err)
 	}
 	}
@@ -867,7 +867,7 @@ func TestHTTPClusterClientResetFail(t *testing.T) {
 
 
 	for i, tt := range tests {
 	for i, tt := range tests {
 		hc := &httpClusterClient{rand: rand.New(rand.NewSource(0))}
 		hc := &httpClusterClient{rand: rand.New(rand.NewSource(0))}
-		err := hc.reset(tt)
+		err := hc.SetEndpoints(tt)
 		if err == nil {
 		if err == nil {
 			t.Errorf("#%d: expected non-nil error", i)
 			t.Errorf("#%d: expected non-nil error", i)
 		}
 		}
@@ -879,7 +879,7 @@ func TestHTTPClusterClientResetPinRandom(t *testing.T) {
 	pinNum := 0
 	pinNum := 0
 	for i := 0; i < round; i++ {
 	for i := 0; i < round; i++ {
 		hc := &httpClusterClient{rand: rand.New(rand.NewSource(int64(i)))}
 		hc := &httpClusterClient{rand: rand.New(rand.NewSource(int64(i)))}
-		err := hc.reset([]string{"http://127.0.0.1:4001", "http://127.0.0.1:4002", "http://127.0.0.1:4003"})
+		err := hc.SetEndpoints([]string{"http://127.0.0.1:4001", "http://127.0.0.1:4002", "http://127.0.0.1:4003"})
 		if err != nil {
 		if err != nil {
 			t.Fatalf("#%d: reset error (%v)", i, err)
 			t.Fatalf("#%d: reset error (%v)", i, err)
 		}
 		}

+ 53 - 1
etcdctl/command/util.go

@@ -208,8 +208,19 @@ func mustNewClient(c *cli.Context) client.Client {
 			if err == client.ErrNoEndpoints {
 			if err == client.ErrNoEndpoints {
 				fmt.Fprintf(os.Stderr, "etcd cluster has no published client endpoints.\n")
 				fmt.Fprintf(os.Stderr, "etcd cluster has no published client endpoints.\n")
 				fmt.Fprintf(os.Stderr, "Try '--no-sync' if you want to access non-published client endpoints(%s).\n", strings.Join(hc.Endpoints(), ","))
 				fmt.Fprintf(os.Stderr, "Try '--no-sync' if you want to access non-published client endpoints(%s).\n", strings.Join(hc.Endpoints(), ","))
+				handleError(ExitServerError, err)
+			}
+
+			// fail-back to try sync cluster with peer API. this is for making etcdctl work with etcd 0.4.x.
+			// TODO: remove this when we deprecate the support for etcd 0.4.
+			eps, serr := syncWithPeerAPI(c, ctx, hc.Endpoints())
+			if serr != nil {
+				handleError(ExitServerError, serr)
+			}
+			err = hc.SetEndpoints(eps)
+			if err != nil {
+				handleError(ExitServerError, err)
 			}
 			}
-			handleError(ExitServerError, err)
 		}
 		}
 		if debug {
 		if debug {
 			fmt.Fprintf(os.Stderr, "got endpoints(%s) after sync\n", strings.Join(hc.Endpoints(), ","))
 			fmt.Fprintf(os.Stderr, "got endpoints(%s) after sync\n", strings.Join(hc.Endpoints(), ","))
@@ -271,3 +282,44 @@ func newClient(c *cli.Context) (client.Client, error) {
 func contextWithTotalTimeout(c *cli.Context) (context.Context, context.CancelFunc) {
 func contextWithTotalTimeout(c *cli.Context) (context.Context, context.CancelFunc) {
 	return context.WithTimeout(context.Background(), c.GlobalDuration("total-timeout"))
 	return context.WithTimeout(context.Background(), c.GlobalDuration("total-timeout"))
 }
 }
+
+// syncWithPeerAPI syncs cluster with peer API defined at
+// https://github.com/coreos/etcd/blob/v0.4.9/server/server.go#L311.
+// This exists for backward compatibility with etcd 0.4.x.
+func syncWithPeerAPI(c *cli.Context, ctx context.Context, knownPeers []string) ([]string, error) {
+	tr, err := getTransport(c)
+	if err != nil {
+		return nil, err
+	}
+
+	var (
+		body []byte
+		resp *http.Response
+	)
+	for _, p := range knownPeers {
+		var req *http.Request
+		req, err = http.NewRequest("GET", p+"/v2/peers", nil)
+		if err != nil {
+			continue
+		}
+		resp, err = tr.RoundTrip(req)
+		if err != nil {
+			continue
+		}
+		if resp.StatusCode != http.StatusOK {
+			resp.Body.Close()
+			continue
+		}
+		body, err = ioutil.ReadAll(resp.Body)
+		resp.Body.Close()
+		if err != nil {
+			continue
+		}
+	}
+	if err != nil {
+		return nil, err
+	}
+
+	// Parse the peers API format: https://github.com/coreos/etcd/blob/v0.4.9/server/server.go#L311
+	return strings.Split(string(body), ", "), nil
+}