ソースを参照

Merge pull request #5799 from xiang90/grpcnaming

clientv3: add grpc naming resolver
Xiang Li 9 年 前
コミット
8216fdc59f
2 ファイル変更192 行追加0 行削除
  1. 115 0
      clientv3/naming/grpc.go
  2. 77 0
      clientv3/naming/grpc_test.go

+ 115 - 0
clientv3/naming/grpc.go

@@ -0,0 +1,115 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package naming
+
+import (
+	"encoding/json"
+	"time"
+
+	"github.com/coreos/etcd/clientv3"
+	"github.com/coreos/etcd/mvcc/mvccpb"
+	"golang.org/x/net/context"
+	"google.golang.org/grpc/naming"
+)
+
+const (
+	gRPCNamingPrefix = "/github.com/grpc/"
+)
+
+// GRPCResolver creates a grpc.Watcher for a target to track its resolution changes.
+type GRPCResolver struct {
+	// Client is an initialized etcd client
+	Client *clientv3.Client
+	// Timeout for update/delete request.
+	Timeout time.Duration
+}
+
+func (gr *GRPCResolver) Add(target string, addr string, metadata interface{}) error {
+	update := naming.Update{
+		Addr:     addr,
+		Metadata: metadata,
+	}
+	val, err := json.Marshal(update)
+	if err != nil {
+		return err
+	}
+
+	ctx := context.Background()
+	if gr.Timeout != 0 {
+		var cancel context.CancelFunc
+		ctx, cancel = context.WithTimeout(context.Background(), gr.Timeout)
+		defer cancel()
+	}
+
+	_, err = gr.Client.KV.Put(ctx, gRPCNamingPrefix+target, string(val))
+	return err
+}
+
+func (gr *GRPCResolver) Delete(target string) error {
+	ctx := context.Background()
+	if gr.Timeout != 0 {
+		var cancel context.CancelFunc
+		ctx, cancel = context.WithTimeout(context.Background(), gr.Timeout)
+		defer cancel()
+	}
+
+	_, err := gr.Client.Delete(ctx, gRPCNamingPrefix+target)
+	return err
+}
+
+func (gr *GRPCResolver) Resolve(target string) (naming.Watcher, error) {
+	cctx, cancel := context.WithCancel(context.Background())
+
+	wch := gr.Client.Watch(cctx, gRPCNamingPrefix+target)
+
+	w := &gRPCWatcher{
+		cancel: cancel,
+		wch:    wch,
+	}
+
+	return w, nil
+}
+
+type gRPCWatcher struct {
+	cancel context.CancelFunc
+	wch    clientv3.WatchChan
+}
+
+func (gw *gRPCWatcher) Next() ([]*naming.Update, error) {
+	wr, ok := <-gw.wch
+	if !ok {
+		return nil, wr.Err()
+	}
+
+	updates := make([]*naming.Update, 0, len(wr.Events))
+
+	for _, e := range wr.Events {
+		switch e.Type {
+		case mvccpb.PUT:
+			var jupdate naming.Update
+			err := json.Unmarshal(e.Kv.Value, &jupdate)
+			if err != nil {
+				continue
+			}
+			updates = append(updates, &jupdate)
+		case mvccpb.DELETE:
+			updates = append(updates, &naming.Update{Op: naming.Delete})
+		}
+	}
+
+	return updates, nil
+}
+
+func (gw *gRPCWatcher) Close() { gw.cancel() }

+ 77 - 0
clientv3/naming/grpc_test.go

@@ -0,0 +1,77 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package naming
+
+import (
+	"reflect"
+	"testing"
+
+	"google.golang.org/grpc/naming"
+
+	"github.com/coreos/etcd/integration"
+	"github.com/coreos/etcd/pkg/testutil"
+)
+
+func TestGRPCResolver(t *testing.T) {
+	defer testutil.AfterTest(t)
+
+	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
+	defer clus.Terminate(t)
+
+	r := GRPCResolver{
+		Client: clus.RandClient(),
+	}
+
+	w, err := r.Resolve("foo")
+	if err != nil {
+		t.Fatal("failed to resolve foo", err)
+	}
+	defer w.Close()
+
+	err = r.Add("foo", "127.0.0.1", "metadata")
+	if err != nil {
+		t.Fatal("failed to add foo", err)
+	}
+
+	us, err := w.Next()
+	if err != nil {
+		t.Fatal("failed to get udpate", err)
+	}
+
+	wu := &naming.Update{
+		Op:       naming.Add,
+		Addr:     "127.0.0.1",
+		Metadata: "metadata",
+	}
+
+	if !reflect.DeepEqual(us[0], wu) {
+		t.Fatalf("up = %#v, want %#v", us[0], wu)
+	}
+
+	err = r.Delete("foo")
+
+	us, err = w.Next()
+	if err != nil {
+		t.Fatal("failed to get udpate", err)
+	}
+
+	wu = &naming.Update{
+		Op: naming.Delete,
+	}
+
+	if !reflect.DeepEqual(us[0], wu) {
+		t.Fatalf("up = %#v, want %#v", us[0], wu)
+	}
+}