Browse Source

grpcproxy: wrapper from pb.KVServer to pb.KVClient

Anthony Romano 9 years ago
parent
commit
54df540c2c
3 changed files with 59 additions and 9 deletions
  1. 9 9
      proxy/grpcproxy/kv.go
  2. 47 0
      proxy/grpcproxy/kv_client_adapter.go
  3. 3 0
      proxy/grpcproxy/kv_test.go

+ 9 - 9
proxy/grpcproxy/kv.go

@@ -23,14 +23,14 @@ import (
 )
 )
 
 
 type kvProxy struct {
 type kvProxy struct {
-	client *clientv3.Client
-	cache  cache.Cache
+	kv    clientv3.KV
+	cache cache.Cache
 }
 }
 
 
 func NewKvProxy(c *clientv3.Client) pb.KVServer {
 func NewKvProxy(c *clientv3.Client) pb.KVServer {
 	return &kvProxy{
 	return &kvProxy{
-		client: c,
-		cache:  cache.NewCache(cache.DefaultMaxEntries),
+		kv:    c.KV,
+		cache: cache.NewCache(cache.DefaultMaxEntries),
 	}
 	}
 }
 }
 
 
@@ -46,7 +46,7 @@ func (p *kvProxy) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRespo
 		}
 		}
 	}
 	}
 
 
-	resp, err := p.client.Do(ctx, RangeRequestToOp(r))
+	resp, err := p.kv.Do(ctx, RangeRequestToOp(r))
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
@@ -57,17 +57,17 @@ func (p *kvProxy) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRespo
 }
 }
 
 
 func (p *kvProxy) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
 func (p *kvProxy) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
-	resp, err := p.client.Do(ctx, PutRequestToOp(r))
+	resp, err := p.kv.Do(ctx, PutRequestToOp(r))
 	return (*pb.PutResponse)(resp.Put()), err
 	return (*pb.PutResponse)(resp.Put()), err
 }
 }
 
 
 func (p *kvProxy) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
 func (p *kvProxy) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
-	resp, err := p.client.Do(ctx, DelRequestToOp(r))
+	resp, err := p.kv.Do(ctx, DelRequestToOp(r))
 	return (*pb.DeleteRangeResponse)(resp.Del()), err
 	return (*pb.DeleteRangeResponse)(resp.Del()), err
 }
 }
 
 
 func (p *kvProxy) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
 func (p *kvProxy) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
-	txn := p.client.Txn(ctx)
+	txn := p.kv.Txn(ctx)
 	cmps := make([]clientv3.Cmp, len(r.Compare))
 	cmps := make([]clientv3.Cmp, len(r.Compare))
 	thenops := make([]clientv3.Op, len(r.Success))
 	thenops := make([]clientv3.Op, len(r.Success))
 	elseops := make([]clientv3.Op, len(r.Failure))
 	elseops := make([]clientv3.Op, len(r.Failure))
@@ -94,7 +94,7 @@ func (p *kvProxy) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.Com
 		opts = append(opts, clientv3.WithCompactPhysical())
 		opts = append(opts, clientv3.WithCompactPhysical())
 	}
 	}
 
 
-	resp, err := p.client.KV.Compact(ctx, r.Revision, opts...)
+	resp, err := p.kv.Compact(ctx, r.Revision, opts...)
 	if err == nil {
 	if err == nil {
 		p.cache.Compact(r.Revision)
 		p.cache.Compact(r.Revision)
 	}
 	}

+ 47 - 0
proxy/grpcproxy/kv_client_adapter.go

@@ -0,0 +1,47 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package grpcproxy
+
+import (
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	"golang.org/x/net/context"
+	grpc "google.golang.org/grpc"
+)
+
+type kvs2kvc struct{ kvs pb.KVServer }
+
+func KvServerToKvClient(kvs pb.KVServer) pb.KVClient {
+	return &kvs2kvc{kvs}
+}
+
+func (s *kvs2kvc) Range(ctx context.Context, in *pb.RangeRequest, opts ...grpc.CallOption) (*pb.RangeResponse, error) {
+	return s.kvs.Range(ctx, in)
+}
+
+func (s *kvs2kvc) Put(ctx context.Context, in *pb.PutRequest, opts ...grpc.CallOption) (*pb.PutResponse, error) {
+	return s.kvs.Put(ctx, in)
+}
+
+func (s *kvs2kvc) DeleteRange(ctx context.Context, in *pb.DeleteRangeRequest, opts ...grpc.CallOption) (*pb.DeleteRangeResponse, error) {
+	return s.kvs.DeleteRange(ctx, in)
+}
+
+func (s *kvs2kvc) Txn(ctx context.Context, in *pb.TxnRequest, opts ...grpc.CallOption) (*pb.TxnResponse, error) {
+	return s.kvs.Txn(ctx, in)
+}
+
+func (s *kvs2kvc) Compact(ctx context.Context, in *pb.CompactionRequest, opts ...grpc.CallOption) (*pb.CompactionResponse, error) {
+	return s.kvs.Compact(ctx, in)
+}

+ 3 - 0
proxy/grpcproxy/kv_test.go

@@ -55,6 +55,7 @@ func TestKVProxyRange(t *testing.T) {
 
 
 type kvproxyTestServer struct {
 type kvproxyTestServer struct {
 	kp     pb.KVServer
 	kp     pb.KVServer
+	c      *clientv3.Client
 	server *grpc.Server
 	server *grpc.Server
 	l      net.Listener
 	l      net.Listener
 }
 }
@@ -62,6 +63,7 @@ type kvproxyTestServer struct {
 func (kts *kvproxyTestServer) close() {
 func (kts *kvproxyTestServer) close() {
 	kts.server.Stop()
 	kts.server.Stop()
 	kts.l.Close()
 	kts.l.Close()
+	kts.c.Close()
 }
 }
 
 
 func newKVProxyServer(endpoints []string, t *testing.T) *kvproxyTestServer {
 func newKVProxyServer(endpoints []string, t *testing.T) *kvproxyTestServer {
@@ -78,6 +80,7 @@ func newKVProxyServer(endpoints []string, t *testing.T) *kvproxyTestServer {
 
 
 	kvts := &kvproxyTestServer{
 	kvts := &kvproxyTestServer{
 		kp: kvp,
 		kp: kvp,
+		c:  client,
 	}
 	}
 
 
 	var opts []grpc.ServerOption
 	var opts []grpc.ServerOption