Browse Source

clientv3/naming: support resolving to multiple hosts

Previous implementation watches a single key so there's no way
to have separate hosts associate with separate keys for a single
grpc target. Instead, accept all keys on a prefix.

Also fixes first the Next() to read current name data from etcd instead
of waiting for the next event on a synced watcher.
Anthony Romano 9 years ago
parent
commit
7d50dc06a2
2 changed files with 134 additions and 63 deletions
  1. 73 60
      clientv3/naming/grpc.go
  2. 61 3
      clientv3/naming/grpc_test.go

+ 73 - 60
clientv3/naming/grpc.go

@@ -16,99 +16,112 @@ package naming
 
 import (
 	"encoding/json"
-	"time"
 
-	"github.com/coreos/etcd/clientv3"
-	"github.com/coreos/etcd/mvcc/mvccpb"
+	etcd "github.com/coreos/etcd/clientv3"
 	"golang.org/x/net/context"
-	"google.golang.org/grpc/naming"
-)
 
-const (
-	gRPCNamingPrefix = "/github.com/grpc/"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/naming"
 )
 
 // GRPCResolver creates a grpc.Watcher for a target to track its resolution changes.
 type GRPCResolver struct {
-	// Client is an initialized etcd client
-	Client *clientv3.Client
-	// Timeout for update/delete request.
-	Timeout time.Duration
+	// Client is an initialized etcd client.
+	Client *etcd.Client
 }
 
-func (gr *GRPCResolver) Add(target string, addr string, metadata interface{}) error {
-	update := naming.Update{
-		Addr:     addr,
-		Metadata: metadata,
-	}
-	val, err := json.Marshal(update)
-	if err != nil {
-		return err
-	}
-
-	ctx := context.Background()
-	if gr.Timeout != 0 {
-		var cancel context.CancelFunc
-		ctx, cancel = context.WithTimeout(context.Background(), gr.Timeout)
-		defer cancel()
-	}
-
-	_, err = gr.Client.KV.Put(ctx, gRPCNamingPrefix+target, string(val))
-	return err
-}
-
-func (gr *GRPCResolver) Delete(target string) error {
-	ctx := context.Background()
-	if gr.Timeout != 0 {
-		var cancel context.CancelFunc
-		ctx, cancel = context.WithTimeout(context.Background(), gr.Timeout)
-		defer cancel()
+func (gr *GRPCResolver) Update(ctx context.Context, target string, nm naming.Update) (err error) {
+	switch nm.Op {
+	case naming.Add:
+		var v []byte
+		if v, err = json.Marshal(nm); err != nil {
+			return grpc.Errorf(codes.InvalidArgument, err.Error())
+		}
+		_, err = gr.Client.KV.Put(ctx, target+"/"+nm.Addr, string(v))
+	case naming.Delete:
+		_, err = gr.Client.Delete(ctx, target+"/"+nm.Addr)
+	default:
+		return grpc.Errorf(codes.InvalidArgument, "naming: bad naming op")
 	}
-
-	_, err := gr.Client.Delete(ctx, gRPCNamingPrefix+target)
 	return err
 }
 
 func (gr *GRPCResolver) Resolve(target string) (naming.Watcher, error) {
-	cctx, cancel := context.WithCancel(context.Background())
-
-	wch := gr.Client.Watch(cctx, gRPCNamingPrefix+target)
-
-	w := &gRPCWatcher{
-		cancel: cancel,
-		wch:    wch,
-	}
-
+	ctx, cancel := context.WithCancel(context.Background())
+	w := &gRPCWatcher{c: gr.Client, target: target + "/", ctx: ctx, cancel: cancel}
 	return w, nil
 }
 
 type gRPCWatcher struct {
+	c      *etcd.Client
+	target string
+	ctx    context.Context
 	cancel context.CancelFunc
-	wch    clientv3.WatchChan
+	wch    etcd.WatchChan
+	err    error
 }
 
+// Next gets the next set of updates from the etcd resolver.
+// Calls to Next should be serialized; concurrent calls are not safe since
+// there is no way to reconcile the update ordering.
 func (gw *gRPCWatcher) Next() ([]*naming.Update, error) {
+	if gw.wch == nil {
+		// first Next() returns all addresses
+		return gw.firstNext()
+	}
+	if gw.err != nil {
+		return nil, gw.err
+	}
+
+	// process new events on target/*
 	wr, ok := <-gw.wch
 	if !ok {
-		return nil, wr.Err()
+		gw.err = grpc.Errorf(codes.Unavailable, "naming: watch closed")
+		return nil, gw.err
+	}
+	if gw.err = wr.Err(); gw.err != nil {
+		return nil, gw.err
 	}
 
 	updates := make([]*naming.Update, 0, len(wr.Events))
-
 	for _, e := range wr.Events {
+		var jupdate naming.Update
+		var err error
 		switch e.Type {
-		case mvccpb.PUT:
-			var jupdate naming.Update
-			err := json.Unmarshal(e.Kv.Value, &jupdate)
-			if err != nil {
-				continue
-			}
+		case etcd.EventTypePut:
+			err = json.Unmarshal(e.Kv.Value, &jupdate)
+			jupdate.Op = naming.Add
+		case etcd.EventTypeDelete:
+			err = json.Unmarshal(e.PrevKv.Value, &jupdate)
+			jupdate.Op = naming.Delete
+		}
+		if err == nil {
 			updates = append(updates, &jupdate)
-		case mvccpb.DELETE:
-			updates = append(updates, &naming.Update{Op: naming.Delete})
 		}
 	}
+	return updates, nil
+}
+
+func (gw *gRPCWatcher) firstNext() ([]*naming.Update, error) {
+	// Use serialized request so resolution still works if the target etcd
+	// server is partitioned away from the quorum.
+	resp, err := gw.c.Get(gw.ctx, gw.target, etcd.WithPrefix(), etcd.WithSerializable())
+	if gw.err = err; err != nil {
+		return nil, err
+	}
+
+	updates := make([]*naming.Update, 0, len(resp.Kvs))
+	for _, kv := range resp.Kvs {
+		var jupdate naming.Update
+		if err := json.Unmarshal(kv.Value, &jupdate); err != nil {
+			continue
+		}
+		updates = append(updates, &jupdate)
+	}
 
+	opts := []etcd.OpOption{etcd.WithRev(resp.Header.Revision + 1), etcd.WithPrefix(), etcd.WithPrevKV()}
+	gw.wch = gw.c.Watch(gw.ctx, gw.target, opts...)
 	return updates, nil
 }
 

+ 61 - 3
clientv3/naming/grpc_test.go

@@ -15,11 +15,14 @@
 package naming
 
 import (
+	"encoding/json"
 	"reflect"
 	"testing"
 
+	"golang.org/x/net/context"
 	"google.golang.org/grpc/naming"
 
+	etcd "github.com/coreos/etcd/clientv3"
 	"github.com/coreos/etcd/integration"
 	"github.com/coreos/etcd/pkg/testutil"
 )
@@ -40,7 +43,8 @@ func TestGRPCResolver(t *testing.T) {
 	}
 	defer w.Close()
 
-	err = r.Add("foo", "127.0.0.1", "metadata")
+	addOp := naming.Update{Op: naming.Add, Addr: "127.0.0.1", Metadata: "metadata"}
+	err = r.Update(context.TODO(), "foo", addOp)
 	if err != nil {
 		t.Fatal("failed to add foo", err)
 	}
@@ -60,7 +64,8 @@ func TestGRPCResolver(t *testing.T) {
 		t.Fatalf("up = %#v, want %#v", us[0], wu)
 	}
 
-	err = r.Delete("foo")
+	delOp := naming.Update{Op: naming.Delete, Addr: "127.0.0.1"}
+	err = r.Update(context.TODO(), "foo", delOp)
 
 	us, err = w.Next()
 	if err != nil {
@@ -68,10 +73,63 @@ func TestGRPCResolver(t *testing.T) {
 	}
 
 	wu = &naming.Update{
-		Op: naming.Delete,
+		Op:       naming.Delete,
+		Addr:     "127.0.0.1",
+		Metadata: "metadata",
 	}
 
 	if !reflect.DeepEqual(us[0], wu) {
 		t.Fatalf("up = %#v, want %#v", us[0], wu)
 	}
 }
+
+// TestGRPCResolverMultiInit ensures the resolver will initialize
+// correctly with multiple hosts and correctly receive multiple
+// updates in a single revision.
+func TestGRPCResolverMulti(t *testing.T) {
+	defer testutil.AfterTest(t)
+
+	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
+	defer clus.Terminate(t)
+	c := clus.RandClient()
+
+	v, verr := json.Marshal(naming.Update{Addr: "127.0.0.1", Metadata: "md"})
+	if verr != nil {
+		t.Fatal(verr)
+	}
+	if _, err := c.Put(context.TODO(), "foo/host", string(v)); err != nil {
+		t.Fatal(err)
+	}
+	if _, err := c.Put(context.TODO(), "foo/host2", string(v)); err != nil {
+		t.Fatal(err)
+	}
+
+	r := GRPCResolver{c}
+
+	w, err := r.Resolve("foo")
+	if err != nil {
+		t.Fatal("failed to resolve foo", err)
+	}
+	defer w.Close()
+
+	updates, nerr := w.Next()
+	if nerr != nil {
+		t.Fatal(nerr)
+	}
+	if len(updates) != 2 {
+		t.Fatalf("expected two updates, got %+v", updates)
+	}
+
+	_, err = c.Txn(context.TODO()).Then(etcd.OpDelete("foo/host"), etcd.OpDelete("foo/host2")).Commit()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	updates, nerr = w.Next()
+	if nerr != nil {
+		t.Fatal(nerr)
+	}
+	if len(updates) != 2 || (updates[0].Op != naming.Delete && updates[1].Op != naming.Delete) {
+		t.Fatalf("expected two updates, got %+v", updates)
+	}
+}