소스 검색

Merge pull request #7014 from gyuho/auto-sync-grpc-proxy

*: register grpc-proxy members
Gyu-Ho Lee 8 년 전
부모
커밋
ad1d48b73d
7개의 변경된 파일389개의 추가작업 그리고 29개의 파일을 삭제
  1. 72 2
      Documentation/op-guide/grpc_proxy.md
  2. 4 1
      clientv3/naming/grpc.go
  3. 30 9
      etcdmain/grpc_proxy.go
  4. 137 12
      proxy/grpcproxy/cluster.go
  5. 121 0
      proxy/grpcproxy/cluster_test.go
  6. 23 3
      proxy/grpcproxy/register.go
  7. 2 2
      proxy/grpcproxy/register_test.go

+ 72 - 2
Documentation/op-guide/grpc_proxy.md

@@ -36,9 +36,9 @@ watch key A ^     ^ watch key A    |
 
 To effectively coalesce multiple client watchers into a single watcher, the gRPC proxy coalesces new `c-watchers` into an existing `s-watcher` when possible. This coalesced `s-watcher` may be out of sync with the etcd server due to network delays or buffered undelivered events. When the watch revision is unspecified, the gRPC proxy will not guarantee the `c-watcher` will start watching from the most recent store revision. For example, if a client watches from an etcd server with revision 1000, that watcher will begin at revision 1000. If a client watches from the gRPC proxy, may begin watching from revision 990.
 
-Similar limitations apply to cancellation. When the watcher is cancelled, the etcd server’s revision may be greater than the cancellation response revision. 
+Similar limitations apply to cancellation. When the watcher is cancelled, the etcd server’s revision may be greater than the cancellation response revision.
 
-These two limitations should not cause problems for most use cases. In the future, there may be additional options to force the watcher to bypass the gRPC proxy for more accurate revision responses. 
+These two limitations should not cause problems for most use cases. In the future, there may be additional options to force the watcher to bypass the gRPC proxy for more accurate revision responses.
 
 ## Scalable lease API
 
@@ -98,3 +98,73 @@ $ ETCDCTL_API=3 ./etcdctl --endpoints=127.0.0.1:2379 get foo
 foo
 bar
 ```
+
+## Client endpoint synchronization and name resolution
+
+The proxy supports registering its endpoints for discovery by writing to a user-defined endpoint. This serves two purposes. First, it allows clients to synchronize their endpoints against a set of proxy endpoints for high availability. Second, it is an endpoint provider for etcd [gRPC naming][dev-guide/grpc_naming.md].
+
+Register proxy(s) by providing a user-defined prefix:
+
+```bash
+$ etcd grpc-proxy start --endpoints=localhost:2379 \
+  --listen-addr=127.0.0.1:23790 \
+  --advertise-client-url=127.0.0.1:23790 \
+  --resolver-prefix="___grpc_proxy_endpoint" \
+  --resolver-ttl=60
+
+$ etcd grpc-proxy start --endpoints=localhost:2379 \
+  --listen-addr=127.0.0.1:23791 \
+  --advertise-client-url=127.0.0.1:23791 \
+  --resolver-prefix="___grpc_proxy_endpoint" \
+  --resolver-ttl=60
+```
+
+The proxy will list all its members for member list:
+
+```bash
+ETCDCTL_API=3 ./bin/etcdctl --endpoints=http://localhost:23790 member list --write-out table
+
++----+---------+--------------------------------+------------+-----------------+
+| ID | STATUS  |              NAME              | PEER ADDRS |  CLIENT ADDRS   |
++----+---------+--------------------------------+------------+-----------------+
+|  0 | started | Gyu-Hos-MBP.sfo.coreos.systems |            | 127.0.0.1:23791 |
+|  0 | started | Gyu-Hos-MBP.sfo.coreos.systems |            | 127.0.0.1:23790 |
++----+---------+--------------------------------+------------+-----------------+
+```
+
+This lets clients automatically discover proxy endpoints through Sync:
+
+```go
+cli, err := clientv3.New(clientv3.Config{
+    Endpoints: []string{"http://localhost:23790"},
+})
+if err != nil {
+    log.Fatal(err)
+}
+defer cli.Close()
+
+// fetch registered grpc-proxy endpoints
+if err := cli.Sync(context.Background()); err != nil {
+    log.Fatal(err)
+}
+```
+
+Note that if a proxy is configured without a resolver prefix,
+
+```bash
+$ etcd grpc-proxy start --endpoints=localhost:2379 \
+  --listen-addr=127.0.0.1:23792 \
+  --advertise-client-url=127.0.0.1:23792
+```
+
+the member list API to the grpc-proxy returns its own `advertise-client-url`:
+
+```bash
+ETCDCTL_API=3 ./bin/etcdctl --endpoints=http://localhost:23792 member list --write-out table
+
++----+---------+--------------------------------+------------+-----------------+
+| ID | STATUS  |              NAME              | PEER ADDRS |  CLIENT ADDRS   |
++----+---------+--------------------------------+------------+-----------------+
+|  0 | started | Gyu-Hos-MBP.sfo.coreos.systems |            | 127.0.0.1:23792 |
++----+---------+--------------------------------+------------+-----------------+
+```

+ 4 - 1
clientv3/naming/grpc.go

@@ -16,6 +16,7 @@ package naming
 
 import (
 	"encoding/json"
+	"fmt"
 
 	etcd "github.com/coreos/etcd/clientv3"
 	"golang.org/x/net/context"
@@ -25,6 +26,8 @@ import (
 	"google.golang.org/grpc/naming"
 )
 
+var ErrWatcherClosed = fmt.Errorf("naming: watch closed")
+
 // GRPCResolver creates a grpc.Watcher for a target to track its resolution changes.
 type GRPCResolver struct {
 	// Client is an initialized etcd client.
@@ -77,7 +80,7 @@ func (gw *gRPCWatcher) Next() ([]*naming.Update, error) {
 	// process new events on target/*
 	wr, ok := <-gw.wch
 	if !ok {
-		gw.err = grpc.Errorf(codes.Unavailable, "naming: watch closed")
+		gw.err = grpc.Errorf(codes.Unavailable, "%s", ErrWatcherClosed)
 		return nil, gw.err
 	}
 	if gw.err = wr.Err(); gw.err != nil {

+ 30 - 9
etcdmain/grpc_proxy.go

@@ -27,20 +27,22 @@ import (
 	"github.com/coreos/etcd/pkg/transport"
 	"github.com/coreos/etcd/proxy/grpcproxy"
 
-	"github.com/spf13/cobra"
-	"google.golang.org/grpc"
-
 	"github.com/cockroachdb/cmux"
 	grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
 	"github.com/prometheus/client_golang/prometheus"
+	"github.com/spf13/cobra"
+	"google.golang.org/grpc"
 )
 
 var (
-	grpcProxyListenAddr string
-	grpcProxyEndpoints  []string
-	grpcProxyCert       string
-	grpcProxyKey        string
-	grpcProxyCA         string
+	grpcProxyListenAddr         string
+	grpcProxyEndpoints          []string
+	grpcProxyCert               string
+	grpcProxyKey                string
+	grpcProxyCA                 string
+	grpcProxyAdvertiseClientURL string
+	grpcProxyResolverPrefix     string
+	grpcProxyResolverTTL        int
 )
 
 func init() {
@@ -70,11 +72,27 @@ func newGRPCProxyStartCommand() *cobra.Command {
 	cmd.Flags().StringVar(&grpcProxyCert, "cert", "", "identify secure connections with etcd servers using this TLS certificate file")
 	cmd.Flags().StringVar(&grpcProxyKey, "key", "", "identify secure connections with etcd servers using this TLS key file")
 	cmd.Flags().StringVar(&grpcProxyCA, "cacert", "", "verify certificates of TLS-enabled secure etcd servers using this CA bundle")
+	cmd.Flags().StringVar(&grpcProxyAdvertiseClientURL, "advertise-client-url", "127.0.0.1:23790", "advertise address to register (must be reachable by client)")
+	cmd.Flags().StringVar(&grpcProxyResolverPrefix, "resolver-prefix", "", "prefix to use for registering proxy (must be shared with other grpc-proxy members)")
+	cmd.Flags().IntVar(&grpcProxyResolverTTL, "resolver-ttl", 0, "specify TTL, in seconds, when registering proxy endpoints")
 
 	return &cmd
 }
 
 func startGRPCProxy(cmd *cobra.Command, args []string) {
+	if grpcProxyResolverPrefix != "" && grpcProxyResolverTTL < 1 {
+		fmt.Fprintln(os.Stderr, fmt.Errorf("invalid resolver-ttl %d", grpcProxyResolverTTL))
+		os.Exit(1)
+	}
+	if grpcProxyResolverPrefix == "" && grpcProxyResolverTTL > 0 {
+		fmt.Fprintln(os.Stderr, fmt.Errorf("invalid resolver-prefix %q", grpcProxyResolverPrefix))
+		os.Exit(1)
+	}
+	if grpcProxyResolverPrefix != "" && grpcProxyResolverTTL > 0 && grpcProxyAdvertiseClientURL == "" {
+		fmt.Fprintln(os.Stderr, fmt.Errorf("invalid advertise-client-url %q", grpcProxyAdvertiseClientURL))
+		os.Exit(1)
+	}
+
 	l, err := net.Listen("tcp", grpcProxyListenAddr)
 	if err != nil {
 		fmt.Fprintln(os.Stderr, err)
@@ -105,7 +123,10 @@ func startGRPCProxy(cmd *cobra.Command, args []string) {
 
 	kvp, _ := grpcproxy.NewKvProxy(client)
 	watchp, _ := grpcproxy.NewWatchProxy(client)
-	clusterp := grpcproxy.NewClusterProxy(client)
+	if grpcProxyResolverPrefix != "" {
+		grpcproxy.Register(client, grpcProxyResolverPrefix, grpcProxyAdvertiseClientURL, grpcProxyResolverTTL)
+	}
+	clusterp, _ := grpcproxy.NewClusterProxy(client, grpcProxyAdvertiseClientURL, grpcProxyResolverPrefix)
 	leasep, _ := grpcproxy.NewLeaseProxy(client)
 	mainp := grpcproxy.NewMaintenanceProxy(client)
 	authp := grpcproxy.NewAuthProxy(client)

+ 137 - 12
proxy/grpcproxy/cluster.go

@@ -15,38 +15,163 @@
 package grpcproxy
 
 import (
+	"fmt"
+	"os"
+	"sync"
+
 	"github.com/coreos/etcd/clientv3"
+	"github.com/coreos/etcd/clientv3/naming"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 
 	"golang.org/x/net/context"
+	"golang.org/x/time/rate"
+	"google.golang.org/grpc"
+	gnaming "google.golang.org/grpc/naming"
 )
 
+// allow maximum 1 retry per second
+const resolveRetryRate = 1
+
 type clusterProxy struct {
-	client *clientv3.Client
+	clus clientv3.Cluster
+	ctx  context.Context
+	gr   *naming.GRPCResolver
+
+	// advertise client URL
+	advaddr string
+	prefix  string
+
+	umu  sync.RWMutex
+	umap map[string]gnaming.Update
+}
+
+// NewClusterProxy takes optional prefix to fetch grpc-proxy member endpoints.
+// The returned channel is closed when there is grpc-proxy endpoint registered
+// and the client's context is canceled so the 'register' loop returns.
+func NewClusterProxy(c *clientv3.Client, advaddr string, prefix string) (pb.ClusterServer, <-chan struct{}) {
+	cp := &clusterProxy{
+		clus: c.Cluster,
+		ctx:  c.Ctx(),
+		gr:   &naming.GRPCResolver{Client: c},
+
+		advaddr: advaddr,
+		prefix:  prefix,
+		umap:    make(map[string]gnaming.Update),
+	}
+
+	donec := make(chan struct{})
+	if advaddr != "" && prefix != "" {
+		go func() {
+			defer close(donec)
+			cp.resolve(prefix)
+		}()
+		return cp, donec
+	}
+
+	close(donec)
+	return cp, donec
 }
 
-func NewClusterProxy(c *clientv3.Client) pb.ClusterServer {
-	return &clusterProxy{
-		client: c,
+func (cp *clusterProxy) resolve(prefix string) {
+	rm := rate.NewLimiter(rate.Limit(resolveRetryRate), resolveRetryRate)
+	for rm.Wait(cp.ctx) == nil {
+		wa, err := cp.gr.Resolve(prefix)
+		if err != nil {
+			plog.Warningf("failed to resolve %q (%v)", prefix, err)
+			continue
+		}
+		cp.monitor(wa)
+	}
+}
+
+func (cp *clusterProxy) monitor(wa gnaming.Watcher) {
+	for cp.ctx.Err() == nil {
+		ups, err := wa.Next()
+		if err != nil {
+			plog.Warningf("clusterProxy watcher error (%v)", err)
+			if grpc.ErrorDesc(err) == naming.ErrWatcherClosed.Error() {
+				return
+			}
+		}
+
+		cp.umu.Lock()
+		for i := range ups {
+			switch ups[i].Op {
+			case gnaming.Add:
+				cp.umap[ups[i].Addr] = *ups[i]
+			case gnaming.Delete:
+				delete(cp.umap, ups[i].Addr)
+			}
+		}
+		cp.umu.Unlock()
 	}
 }
 
 func (cp *clusterProxy) MemberAdd(ctx context.Context, r *pb.MemberAddRequest) (*pb.MemberAddResponse, error) {
-	conn := cp.client.ActiveConnection()
-	return pb.NewClusterClient(conn).MemberAdd(ctx, r)
+	mresp, err := cp.clus.MemberAdd(ctx, r.PeerURLs)
+	if err != nil {
+		return nil, err
+	}
+	resp := (pb.MemberAddResponse)(*mresp)
+	return &resp, err
 }
 
 func (cp *clusterProxy) MemberRemove(ctx context.Context, r *pb.MemberRemoveRequest) (*pb.MemberRemoveResponse, error) {
-	conn := cp.client.ActiveConnection()
-	return pb.NewClusterClient(conn).MemberRemove(ctx, r)
+	mresp, err := cp.clus.MemberRemove(ctx, r.ID)
+	if err != nil {
+		return nil, err
+	}
+	resp := (pb.MemberRemoveResponse)(*mresp)
+	return &resp, err
 }
 
 func (cp *clusterProxy) MemberUpdate(ctx context.Context, r *pb.MemberUpdateRequest) (*pb.MemberUpdateResponse, error) {
-	conn := cp.client.ActiveConnection()
-	return pb.NewClusterClient(conn).MemberUpdate(ctx, r)
+	mresp, err := cp.clus.MemberUpdate(ctx, r.ID, r.PeerURLs)
+	if err != nil {
+		return nil, err
+	}
+	resp := (pb.MemberUpdateResponse)(*mresp)
+	return &resp, err
 }
 
+func (cp *clusterProxy) membersFromUpdates() ([]*pb.Member, error) {
+	cp.umu.RLock()
+	defer cp.umu.RUnlock()
+	mbs := make([]*pb.Member, 0, len(cp.umap))
+	for addr, upt := range cp.umap {
+		m, err := decodeMeta(fmt.Sprint(upt.Metadata))
+		if err != nil {
+			return nil, err
+		}
+		mbs = append(mbs, &pb.Member{Name: m.Name, ClientURLs: []string{addr}})
+	}
+	return mbs, nil
+}
+
+// MemberList wraps member list API with following rules:
+// - If 'advaddr' is not empty and 'prefix' is not empty, return registered member lists via resolver
+// - If 'advaddr' is not empty and 'prefix' is not empty and registered grpc-proxy members haven't been fetched, return the 'advaddr'
+// - If 'advaddr' is not empty and 'prefix' is empty, return 'advaddr' without forcing it to 'register'
+// - If 'advaddr' is empty, forward to member list API
 func (cp *clusterProxy) MemberList(ctx context.Context, r *pb.MemberListRequest) (*pb.MemberListResponse, error) {
-	conn := cp.client.ActiveConnection()
-	return pb.NewClusterClient(conn).MemberList(ctx, r)
+	if cp.advaddr != "" {
+		if cp.prefix != "" {
+			mbs, err := cp.membersFromUpdates()
+			if err != nil {
+				return nil, err
+			}
+			if len(mbs) > 0 {
+				return &pb.MemberListResponse{Members: mbs}, nil
+			}
+		}
+		// prefix is empty or no grpc-proxy members haven't been registered
+		hostname, _ := os.Hostname()
+		return &pb.MemberListResponse{Members: []*pb.Member{{Name: hostname, ClientURLs: []string{cp.advaddr}}}}, nil
+	}
+	mresp, err := cp.clus.MemberList(ctx)
+	if err != nil {
+		return nil, err
+	}
+	resp := (pb.MemberListResponse)(*mresp)
+	return &resp, err
 }

+ 121 - 0
proxy/grpcproxy/cluster_test.go

@@ -0,0 +1,121 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package grpcproxy
+
+import (
+	"net"
+	"testing"
+	"time"
+
+	"github.com/coreos/etcd/clientv3"
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	"github.com/coreos/etcd/integration"
+	"github.com/coreos/etcd/pkg/testutil"
+
+	"golang.org/x/net/context"
+	"google.golang.org/grpc"
+)
+
+func TestClusterProxyMemberList(t *testing.T) {
+	defer testutil.AfterTest(t)
+
+	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
+	defer clus.Terminate(t)
+
+	cts := newClusterProxyServer([]string{clus.Members[0].GRPCAddr()}, t)
+	defer cts.close(t)
+
+	cfg := clientv3.Config{
+		Endpoints:   []string{cts.caddr},
+		DialTimeout: 5 * time.Second,
+	}
+	client, err := clientv3.New(cfg)
+	if err != nil {
+		t.Fatalf("err %v, want nil", err)
+	}
+	defer client.Close()
+
+	// wait some time for register-loop to write keys
+	time.Sleep(time.Second)
+
+	var mresp *clientv3.MemberListResponse
+	mresp, err = client.Cluster.MemberList(context.Background())
+	if err != nil {
+		t.Fatalf("err %v, want nil", err)
+	}
+
+	if len(mresp.Members) != 1 {
+		t.Fatalf("len(mresp.Members) expected 1, got %d (%+v)", len(mresp.Members), mresp.Members)
+	}
+	if len(mresp.Members[0].ClientURLs) != 1 {
+		t.Fatalf("len(mresp.Members[0].ClientURLs) expected 1, got %d (%+v)", len(mresp.Members[0].ClientURLs), mresp.Members[0].ClientURLs[0])
+	}
+	if mresp.Members[0].ClientURLs[0] != cts.caddr {
+		t.Fatalf("mresp.Members[0].ClientURLs[0] expected %q, got %q", cts.caddr, mresp.Members[0].ClientURLs[0])
+	}
+}
+
+type clusterproxyTestServer struct {
+	cp     pb.ClusterServer
+	c      *clientv3.Client
+	server *grpc.Server
+	l      net.Listener
+	donec  <-chan struct{}
+	caddr  string
+}
+
+func (cts *clusterproxyTestServer) close(t *testing.T) {
+	cts.server.Stop()
+	cts.l.Close()
+	cts.c.Close()
+	select {
+	case <-cts.donec:
+		return
+	case <-time.After(5 * time.Second):
+		t.Fatalf("register-loop took too long to return")
+	}
+}
+
+func newClusterProxyServer(endpoints []string, t *testing.T) *clusterproxyTestServer {
+	cfg := clientv3.Config{
+		Endpoints:   endpoints,
+		DialTimeout: 5 * time.Second,
+	}
+	client, err := clientv3.New(cfg)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	cts := &clusterproxyTestServer{
+		c: client,
+	}
+	cts.l, err = net.Listen("tcp", "127.0.0.1:0")
+	if err != nil {
+		t.Fatal(err)
+	}
+	var opts []grpc.ServerOption
+	cts.server = grpc.NewServer(opts...)
+	go cts.server.Serve(cts.l)
+
+	// wait some time for free port 0 to be resolved
+	time.Sleep(500 * time.Millisecond)
+
+	Register(client, "test-prefix", cts.l.Addr().String(), 7)
+	cts.cp, cts.donec = NewClusterProxy(client, cts.l.Addr().String(), "test-prefix")
+	cts.caddr = cts.l.Addr().String()
+	pb.RegisterClusterServer(cts.server, cts.cp)
+
+	return cts
+}

+ 23 - 3
proxy/grpcproxy/register.go

@@ -15,6 +15,9 @@
 package grpcproxy
 
 import (
+	"encoding/json"
+	"os"
+
 	"github.com/coreos/etcd/clientv3"
 	"github.com/coreos/etcd/clientv3/concurrency"
 	"github.com/coreos/etcd/clientv3/naming"
@@ -26,10 +29,10 @@ import (
 // allow maximum 1 retry per second
 const registerRetryRate = 1
 
-// register registers itself as a grpc-proxy server by writing prefixed-key
+// Register registers itself as a grpc-proxy server by writing prefixed-key
 // with session of specified TTL (in seconds). The returned channel is closed
 // when the client's context is canceled.
-func register(c *clientv3.Client, prefix string, addr string, ttl int) <-chan struct{} {
+func Register(c *clientv3.Client, prefix string, addr string, ttl int) <-chan struct{} {
 	rm := rate.NewLimiter(rate.Limit(registerRetryRate), registerRetryRate)
 
 	donec := make(chan struct{})
@@ -65,10 +68,27 @@ func registerSession(c *clientv3.Client, prefix string, addr string, ttl int) (*
 	}
 
 	gr := &naming.GRPCResolver{Client: c}
-	if err = gr.Update(c.Ctx(), prefix, gnaming.Update{Op: gnaming.Add, Addr: addr}, clientv3.WithLease(ss.Lease())); err != nil {
+	if err = gr.Update(c.Ctx(), prefix, gnaming.Update{Op: gnaming.Add, Addr: addr, Metadata: getMeta()}, clientv3.WithLease(ss.Lease())); err != nil {
 		return nil, err
 	}
 
 	plog.Infof("registered %q with %d-second lease", addr, ttl)
 	return ss, nil
 }
+
+// meta represents metadata of proxy register.
+type meta struct {
+	Name string `json:"name"`
+}
+
+func getMeta() string {
+	hostname, _ := os.Hostname()
+	bts, _ := json.Marshal(meta{Name: hostname})
+	return string(bts)
+}
+
+func decodeMeta(s string) (meta, error) {
+	m := meta{}
+	err := json.Unmarshal([]byte(s), &m)
+	return m, err
+}

+ 2 - 2
proxy/grpcproxy/register_test.go

@@ -26,7 +26,7 @@ import (
 	gnaming "google.golang.org/grpc/naming"
 )
 
-func Test_register(t *testing.T) {
+func TestRegister(t *testing.T) {
 	defer testutil.AfterTest(t)
 
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
@@ -44,7 +44,7 @@ func Test_register(t *testing.T) {
 		t.Fatalf("len(ups) expected 0, got %d (%v)", len(ups), ups)
 	}
 
-	donec := register(cli, testPrefix, paddr, 5)
+	donec := Register(cli, testPrefix, paddr, 5)
 
 	ups, err = wa.Next()
 	if err != nil {