瀏覽代碼

Merge pull request #4374 from xiang90/member_api

clientv3: implement cluster api
Xiang Li 10 年之前
父節點
當前提交
3f29e730eb
共有 2 個文件被更改,包括 270 次插入0 次删除
  1. 153 0
      clientv3/cluster.go
  2. 117 0
      clientv3/integration/cluster_test.go

+ 153 - 0
clientv3/cluster.go

@@ -0,0 +1,153 @@
+// Copyright 2016 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package clientv3
+
+import (
+	"sync"
+
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
+	"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+)
+
+type (
+	MemberListResponse   pb.MemberListResponse
+	MemberAddResponse    pb.MemberAddResponse
+	MemberRemoveResponse pb.MemberRemoveResponse
+	MemberUpdateResponse pb.MemberUpdateResponse
+)
+
+type Cluster interface {
+	// List lists the current cluster membership.
+	MemberList(ctx context.Context) (*MemberListResponse, error)
+
+	// MemberAdd adds a new member into the cluster.
+	MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error)
+
+	// MemberRemove removes an existing member from the cluster.
+	MemberRemove(ctx context.Context, id uint64) (*MemberRemoveResponse, error)
+
+	// MemberUpdate updates the peer addresses of the member.
+	MemberUpdate(ctx context.Context, id uint64, peerAddrs []string) (*MemberUpdateResponse, error)
+}
+
+type cluster struct {
+	c *Client
+
+	mu     sync.Mutex
+	conn   *grpc.ClientConn // conn in-use
+	remote pb.ClusterClient
+}
+
+func NewCluster(c *Client) Cluster {
+	conn := c.ActiveConnection()
+
+	return &cluster{
+		c: c,
+
+		conn:   conn,
+		remote: pb.NewClusterClient(conn),
+	}
+}
+
+func (c *cluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error) {
+	r := &pb.MemberAddRequest{PeerURLs: peerAddrs}
+	resp, err := c.getRemote().MemberAdd(ctx, r)
+	if err == nil {
+		return (*MemberAddResponse)(resp), nil
+	}
+
+	if isRPCError(err) {
+		return nil, err
+	}
+
+	go c.switchRemote(err)
+	return nil, err
+}
+
+func (c *cluster) MemberRemove(ctx context.Context, id uint64) (*MemberRemoveResponse, error) {
+	r := &pb.MemberRemoveRequest{ID: id}
+	resp, err := c.getRemote().MemberRemove(ctx, r)
+	if err == nil {
+		return (*MemberRemoveResponse)(resp), nil
+	}
+
+	if isRPCError(err) {
+		return nil, err
+	}
+
+	go c.switchRemote(err)
+	return nil, err
+}
+
+func (c *cluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []string) (*MemberUpdateResponse, error) {
+	// it is safe to retry on update.
+	for {
+		r := &pb.MemberUpdateRequest{ID: id, PeerURLs: peerAddrs}
+		resp, err := c.getRemote().MemberUpdate(ctx, r)
+		if err == nil {
+			return (*MemberUpdateResponse)(resp), nil
+		}
+
+		if isRPCError(err) {
+			return nil, err
+		}
+
+		err = c.switchRemote(err)
+		if err != nil {
+			return nil, err
+		}
+	}
+}
+
+func (c *cluster) MemberList(ctx context.Context) (*MemberListResponse, error) {
+	// it is safe to retry on list.
+	for {
+		resp, err := c.getRemote().MemberList(ctx, &pb.MemberListRequest{})
+		if err == nil {
+			return (*MemberListResponse)(resp), nil
+		}
+
+		if isRPCError(err) {
+			return nil, err
+		}
+
+		err = c.switchRemote(err)
+		if err != nil {
+			return nil, err
+		}
+	}
+}
+
+func (c *cluster) getRemote() pb.ClusterClient {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+
+	return c.remote
+}
+
+func (c *cluster) switchRemote(prevErr error) error {
+	newConn, err := c.c.retryConnection(c.conn, prevErr)
+	if err != nil {
+		return err
+	}
+
+	c.mu.Lock()
+	defer c.mu.Unlock()
+
+	c.conn = newConn
+	c.remote = pb.NewClusterClient(c.conn)
+	return nil
+}

+ 117 - 0
clientv3/integration/cluster_test.go

@@ -0,0 +1,117 @@
+// Copyright 2016 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package integration
+
+import (
+	"reflect"
+	"testing"
+
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
+	"github.com/coreos/etcd/clientv3"
+	"github.com/coreos/etcd/integration"
+	"github.com/coreos/etcd/pkg/testutil"
+)
+
+func TestMemberList(t *testing.T) {
+	defer testutil.AfterTest(t)
+
+	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
+	defer clus.Terminate(t)
+
+	capi := clientv3.NewCluster(clus.RandClient())
+
+	resp, err := capi.MemberList(context.Background())
+	if err != nil {
+		t.Fatalf("failed to list member %v", err)
+	}
+
+	if len(resp.Members) != 3 {
+		t.Errorf("number of members = %d, want %d", len(resp.Members), 3)
+	}
+}
+
+func TestMemberAdd(t *testing.T) {
+	defer testutil.AfterTest(t)
+
+	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
+	defer clus.Terminate(t)
+
+	capi := clientv3.NewCluster(clus.RandClient())
+
+	urls := []string{"http://127.0.0.1:1234"}
+	resp, err := capi.MemberAdd(context.Background(), urls)
+	if err != nil {
+		t.Fatalf("failed to add member %v", err)
+	}
+
+	if !reflect.DeepEqual(resp.Member.PeerURLs, urls) {
+		t.Errorf("urls = %v, want %v", urls, resp.Member.PeerURLs)
+	}
+}
+
+func TestMemberRemove(t *testing.T) {
+	defer testutil.AfterTest(t)
+
+	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
+	defer clus.Terminate(t)
+
+	capi := clientv3.NewCluster(clus.RandClient())
+	resp, err := capi.MemberList(context.Background())
+	if err != nil {
+		t.Fatalf("failed to list member %v", err)
+	}
+
+	_, err = capi.MemberRemove(context.Background(), resp.Members[0].ID)
+	if err != nil {
+		t.Fatalf("failed to remove member %v", err)
+	}
+
+	resp, err = capi.MemberList(context.Background())
+	if err != nil {
+		t.Fatalf("failed to list member %v", err)
+	}
+
+	if len(resp.Members) != 2 {
+		t.Errorf("number of members = %d, want %d", len(resp.Members), 2)
+	}
+}
+
+func TestMemberUpdate(t *testing.T) {
+	defer testutil.AfterTest(t)
+
+	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
+	defer clus.Terminate(t)
+
+	capi := clientv3.NewCluster(clus.RandClient())
+	resp, err := capi.MemberList(context.Background())
+	if err != nil {
+		t.Fatalf("failed to list member %v", err)
+	}
+
+	urls := []string{"http://127.0.0.1:1234"}
+	_, err = capi.MemberUpdate(context.Background(), resp.Members[0].ID, urls)
+	if err != nil {
+		t.Fatalf("failed to update member %v", err)
+	}
+
+	resp, err = capi.MemberList(context.Background())
+	if err != nil {
+		t.Fatalf("failed to list member %v", err)
+	}
+
+	if !reflect.DeepEqual(resp.Members[0].PeerURLs, urls) {
+		t.Errorf("urls = %v, want %v", urls, resp.Members[0].PeerURLs)
+	}
+}