فهرست منبع

grpcproxy: configure register to Cluster API

Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
Gyu-Ho Lee 8 سال پیش
والد
کامیت
f862b47e92
4فایلهای تغییر یافته به همراه283 افزوده شده و 17 حذف شده
  1. 137 12
      proxy/grpcproxy/cluster.go
  2. 121 0
      proxy/grpcproxy/cluster_test.go
  3. 23 3
      proxy/grpcproxy/register.go
  4. 2 2
      proxy/grpcproxy/register_test.go

+ 137 - 12
proxy/grpcproxy/cluster.go

@@ -15,38 +15,163 @@
 package grpcproxy
 
 import (
+	"fmt"
+	"os"
+	"sync"
+
 	"github.com/coreos/etcd/clientv3"
+	"github.com/coreos/etcd/clientv3/naming"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 
 	"golang.org/x/net/context"
+	"golang.org/x/time/rate"
+	"google.golang.org/grpc"
+	gnaming "google.golang.org/grpc/naming"
 )
 
+// allow maximum 1 retry per second
+const resolveRetryRate = 1
+
 type clusterProxy struct {
-	client *clientv3.Client
+	clus clientv3.Cluster
+	ctx  context.Context
+	gr   *naming.GRPCResolver
+
+	// advertise client URL
+	advaddr string
+	prefix  string
+
+	umu  sync.RWMutex
+	umap map[string]gnaming.Update
+}
+
+// NewClusterProxy takes optional prefix to fetch grpc-proxy member endpoints.
+// The returned channel is closed when there is grpc-proxy endpoint registered
+// and the client's context is canceled so the 'register' loop returns.
+func NewClusterProxy(c *clientv3.Client, advaddr string, prefix string) (pb.ClusterServer, <-chan struct{}) {
+	cp := &clusterProxy{
+		clus: c.Cluster,
+		ctx:  c.Ctx(),
+		gr:   &naming.GRPCResolver{Client: c},
+
+		advaddr: advaddr,
+		prefix:  prefix,
+		umap:    make(map[string]gnaming.Update),
+	}
+
+	donec := make(chan struct{})
+	if advaddr != "" && prefix != "" {
+		go func() {
+			defer close(donec)
+			cp.resolve(prefix)
+		}()
+		return cp, donec
+	}
+
+	close(donec)
+	return cp, donec
 }
 
-func NewClusterProxy(c *clientv3.Client) pb.ClusterServer {
-	return &clusterProxy{
-		client: c,
+func (cp *clusterProxy) resolve(prefix string) {
+	rm := rate.NewLimiter(rate.Limit(resolveRetryRate), resolveRetryRate)
+	for rm.Wait(cp.ctx) == nil {
+		wa, err := cp.gr.Resolve(prefix)
+		if err != nil {
+			plog.Warningf("failed to resolve %q (%v)", prefix, err)
+			continue
+		}
+		cp.monitor(wa)
+	}
+}
+
+func (cp *clusterProxy) monitor(wa gnaming.Watcher) {
+	for cp.ctx.Err() == nil {
+		ups, err := wa.Next()
+		if err != nil {
+			plog.Warningf("clusterProxy watcher error (%v)", err)
+			if grpc.ErrorDesc(err) == naming.ErrWatcherClosed.Error() {
+				return
+			}
+		}
+
+		cp.umu.Lock()
+		for i := range ups {
+			switch ups[i].Op {
+			case gnaming.Add:
+				cp.umap[ups[i].Addr] = *ups[i]
+			case gnaming.Delete:
+				delete(cp.umap, ups[i].Addr)
+			}
+		}
+		cp.umu.Unlock()
 	}
 }
 
 func (cp *clusterProxy) MemberAdd(ctx context.Context, r *pb.MemberAddRequest) (*pb.MemberAddResponse, error) {
-	conn := cp.client.ActiveConnection()
-	return pb.NewClusterClient(conn).MemberAdd(ctx, r)
+	mresp, err := cp.clus.MemberAdd(ctx, r.PeerURLs)
+	if err != nil {
+		return nil, err
+	}
+	resp := (pb.MemberAddResponse)(*mresp)
+	return &resp, err
 }
 
 func (cp *clusterProxy) MemberRemove(ctx context.Context, r *pb.MemberRemoveRequest) (*pb.MemberRemoveResponse, error) {
-	conn := cp.client.ActiveConnection()
-	return pb.NewClusterClient(conn).MemberRemove(ctx, r)
+	mresp, err := cp.clus.MemberRemove(ctx, r.ID)
+	if err != nil {
+		return nil, err
+	}
+	resp := (pb.MemberRemoveResponse)(*mresp)
+	return &resp, err
 }
 
 func (cp *clusterProxy) MemberUpdate(ctx context.Context, r *pb.MemberUpdateRequest) (*pb.MemberUpdateResponse, error) {
-	conn := cp.client.ActiveConnection()
-	return pb.NewClusterClient(conn).MemberUpdate(ctx, r)
+	mresp, err := cp.clus.MemberUpdate(ctx, r.ID, r.PeerURLs)
+	if err != nil {
+		return nil, err
+	}
+	resp := (pb.MemberUpdateResponse)(*mresp)
+	return &resp, err
 }
 
+func (cp *clusterProxy) membersFromUpdates() ([]*pb.Member, error) {
+	cp.umu.RLock()
+	defer cp.umu.RUnlock()
+	mbs := make([]*pb.Member, 0, len(cp.umap))
+	for addr, upt := range cp.umap {
+		m, err := decodeMeta(fmt.Sprint(upt.Metadata))
+		if err != nil {
+			return nil, err
+		}
+		mbs = append(mbs, &pb.Member{Name: m.Name, ClientURLs: []string{addr}})
+	}
+	return mbs, nil
+}
+
+// MemberList wraps member list API with following rules:
+// - If 'advaddr' is not empty and 'prefix' is not empty, return registered member lists via resolver
+// - If 'advaddr' is not empty and 'prefix' is not empty and registered grpc-proxy members haven't been fetched, return the 'advaddr'
+// - If 'advaddr' is not empty and 'prefix' is empty, return 'advaddr' without forcing it to 'register'
+// - If 'advaddr' is empty, forward to member list API
 func (cp *clusterProxy) MemberList(ctx context.Context, r *pb.MemberListRequest) (*pb.MemberListResponse, error) {
-	conn := cp.client.ActiveConnection()
-	return pb.NewClusterClient(conn).MemberList(ctx, r)
+	if cp.advaddr != "" {
+		if cp.prefix != "" {
+			mbs, err := cp.membersFromUpdates()
+			if err != nil {
+				return nil, err
+			}
+			if len(mbs) > 0 {
+				return &pb.MemberListResponse{Members: mbs}, nil
+			}
+		}
+		// prefix is empty or no grpc-proxy members haven't been registered
+		hostname, _ := os.Hostname()
+		return &pb.MemberListResponse{Members: []*pb.Member{{Name: hostname, ClientURLs: []string{cp.advaddr}}}}, nil
+	}
+	mresp, err := cp.clus.MemberList(ctx)
+	if err != nil {
+		return nil, err
+	}
+	resp := (pb.MemberListResponse)(*mresp)
+	return &resp, err
 }

+ 121 - 0
proxy/grpcproxy/cluster_test.go

@@ -0,0 +1,121 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package grpcproxy
+
+import (
+	"net"
+	"testing"
+	"time"
+
+	"github.com/coreos/etcd/clientv3"
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	"github.com/coreos/etcd/integration"
+	"github.com/coreos/etcd/pkg/testutil"
+
+	"golang.org/x/net/context"
+	"google.golang.org/grpc"
+)
+
+func TestClusterProxyMemberList(t *testing.T) {
+	defer testutil.AfterTest(t)
+
+	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
+	defer clus.Terminate(t)
+
+	cts := newClusterProxyServer([]string{clus.Members[0].GRPCAddr()}, t)
+	defer cts.close(t)
+
+	cfg := clientv3.Config{
+		Endpoints:   []string{cts.caddr},
+		DialTimeout: 5 * time.Second,
+	}
+	client, err := clientv3.New(cfg)
+	if err != nil {
+		t.Fatalf("err %v, want nil", err)
+	}
+	defer client.Close()
+
+	// wait some time for register-loop to write keys
+	time.Sleep(time.Second)
+
+	var mresp *clientv3.MemberListResponse
+	mresp, err = client.Cluster.MemberList(context.Background())
+	if err != nil {
+		t.Fatalf("err %v, want nil", err)
+	}
+
+	if len(mresp.Members) != 1 {
+		t.Fatalf("len(mresp.Members) expected 1, got %d (%+v)", len(mresp.Members), mresp.Members)
+	}
+	if len(mresp.Members[0].ClientURLs) != 1 {
+		t.Fatalf("len(mresp.Members[0].ClientURLs) expected 1, got %d (%+v)", len(mresp.Members[0].ClientURLs), mresp.Members[0].ClientURLs[0])
+	}
+	if mresp.Members[0].ClientURLs[0] != cts.caddr {
+		t.Fatalf("mresp.Members[0].ClientURLs[0] expected %q, got %q", cts.caddr, mresp.Members[0].ClientURLs[0])
+	}
+}
+
+type clusterproxyTestServer struct {
+	cp     pb.ClusterServer
+	c      *clientv3.Client
+	server *grpc.Server
+	l      net.Listener
+	donec  <-chan struct{}
+	caddr  string
+}
+
+func (cts *clusterproxyTestServer) close(t *testing.T) {
+	cts.server.Stop()
+	cts.l.Close()
+	cts.c.Close()
+	select {
+	case <-cts.donec:
+		return
+	case <-time.After(5 * time.Second):
+		t.Fatalf("register-loop took too long to return")
+	}
+}
+
+func newClusterProxyServer(endpoints []string, t *testing.T) *clusterproxyTestServer {
+	cfg := clientv3.Config{
+		Endpoints:   endpoints,
+		DialTimeout: 5 * time.Second,
+	}
+	client, err := clientv3.New(cfg)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	cts := &clusterproxyTestServer{
+		c: client,
+	}
+	cts.l, err = net.Listen("tcp", "127.0.0.1:0")
+	if err != nil {
+		t.Fatal(err)
+	}
+	var opts []grpc.ServerOption
+	cts.server = grpc.NewServer(opts...)
+	go cts.server.Serve(cts.l)
+
+	// wait some time for free port 0 to be resolved
+	time.Sleep(500 * time.Millisecond)
+
+	Register(client, "test-prefix", cts.l.Addr().String(), 7)
+	cts.cp, cts.donec = NewClusterProxy(client, cts.l.Addr().String(), "test-prefix")
+	cts.caddr = cts.l.Addr().String()
+	pb.RegisterClusterServer(cts.server, cts.cp)
+
+	return cts
+}

+ 23 - 3
proxy/grpcproxy/register.go

@@ -15,6 +15,9 @@
 package grpcproxy
 
 import (
+	"encoding/json"
+	"os"
+
 	"github.com/coreos/etcd/clientv3"
 	"github.com/coreos/etcd/clientv3/concurrency"
 	"github.com/coreos/etcd/clientv3/naming"
@@ -26,10 +29,10 @@ import (
 // allow maximum 1 retry per second
 const registerRetryRate = 1
 
-// register registers itself as a grpc-proxy server by writing prefixed-key
+// Register registers itself as a grpc-proxy server by writing prefixed-key
 // with session of specified TTL (in seconds). The returned channel is closed
 // when the client's context is canceled.
-func register(c *clientv3.Client, prefix string, addr string, ttl int) <-chan struct{} {
+func Register(c *clientv3.Client, prefix string, addr string, ttl int) <-chan struct{} {
 	rm := rate.NewLimiter(rate.Limit(registerRetryRate), registerRetryRate)
 
 	donec := make(chan struct{})
@@ -65,10 +68,27 @@ func registerSession(c *clientv3.Client, prefix string, addr string, ttl int) (*
 	}
 
 	gr := &naming.GRPCResolver{Client: c}
-	if err = gr.Update(c.Ctx(), prefix, gnaming.Update{Op: gnaming.Add, Addr: addr}, clientv3.WithLease(ss.Lease())); err != nil {
+	if err = gr.Update(c.Ctx(), prefix, gnaming.Update{Op: gnaming.Add, Addr: addr, Metadata: getMeta()}, clientv3.WithLease(ss.Lease())); err != nil {
 		return nil, err
 	}
 
 	plog.Infof("registered %q with %d-second lease", addr, ttl)
 	return ss, nil
 }
+
+// meta represents metadata of proxy register.
+type meta struct {
+	Name string `json:"name"`
+}
+
+func getMeta() string {
+	hostname, _ := os.Hostname()
+	bts, _ := json.Marshal(meta{Name: hostname})
+	return string(bts)
+}
+
+func decodeMeta(s string) (meta, error) {
+	m := meta{}
+	err := json.Unmarshal([]byte(s), &m)
+	return m, err
+}

+ 2 - 2
proxy/grpcproxy/register_test.go

@@ -26,7 +26,7 @@ import (
 	gnaming "google.golang.org/grpc/naming"
 )
 
-func Test_register(t *testing.T) {
+func TestRegister(t *testing.T) {
 	defer testutil.AfterTest(t)
 
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
@@ -44,7 +44,7 @@ func Test_register(t *testing.T) {
 		t.Fatalf("len(ups) expected 0, got %d (%v)", len(ups), ups)
 	}
 
-	donec := register(cli, testPrefix, paddr, 5)
+	donec := Register(cli, testPrefix, paddr, 5)
 
 	ups, err = wa.Next()
 	if err != nil {