Browse Source

Merge pull request #5772 from heyitsanthony/integration-proxy

integration: build tag for proxy
Anthony Romano 9 years ago
parent
commit
e001848270

+ 4 - 0
clientv3/kv.go

@@ -85,6 +85,10 @@ func NewKV(c *Client) KV {
 	return &kv{remote: pb.NewKVClient(c.conn)}
 	return &kv{remote: pb.NewKVClient(c.conn)}
 }
 }
 
 
+func NewKVFromKVClient(remote pb.KVClient) KV {
+	return &kv{remote: remote}
+}
+
 func (kv *kv) Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error) {
 func (kv *kv) Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error) {
 	r, err := kv.Do(ctx, OpPut(key, val, opts...))
 	r, err := kv.Do(ctx, OpPut(key, val, opts...))
 	return r.put, toErr(ctx, err)
 	return r.put, toErr(ctx, err)

+ 1 - 11
integration/cluster.go

@@ -515,7 +515,7 @@ func NewClientV3(m *member) (*clientv3.Client, error) {
 		}
 		}
 		cfg.TLS = tls
 		cfg.TLS = tls
 	}
 	}
-	return clientv3.New(cfg)
+	return newClientV3(cfg)
 }
 }
 
 
 // Clone returns a member with the same server configuration. The returned
 // Clone returns a member with the same server configuration. The returned
@@ -804,13 +804,3 @@ type grpcAPI struct {
 	// Maintenance is the maintenance API for the client's connection.
 	// Maintenance is the maintenance API for the client's connection.
 	Maintenance pb.MaintenanceClient
 	Maintenance pb.MaintenanceClient
 }
 }
-
-func toGRPC(c *clientv3.Client) grpcAPI {
-	return grpcAPI{
-		pb.NewClusterClient(c.ActiveConnection()),
-		pb.NewKVClient(c.ActiveConnection()),
-		pb.NewLeaseClient(c.ActiveConnection()),
-		pb.NewWatchClient(c.ActiveConnection()),
-		pb.NewMaintenanceClient(c.ActiveConnection()),
-	}
-}

+ 36 - 0
integration/cluster_direct.go

@@ -0,0 +1,36 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// +build !cluster_proxy
+
+package integration
+
+import (
+	"github.com/coreos/etcd/clientv3"
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+)
+
+func toGRPC(c *clientv3.Client) grpcAPI {
+	return grpcAPI{
+		pb.NewClusterClient(c.ActiveConnection()),
+		pb.NewKVClient(c.ActiveConnection()),
+		pb.NewLeaseClient(c.ActiveConnection()),
+		pb.NewWatchClient(c.ActiveConnection()),
+		pb.NewMaintenanceClient(c.ActiveConnection()),
+	}
+}
+
+func newClientV3(cfg clientv3.Config) (*clientv3.Client, error) {
+	return clientv3.New(cfg)
+}

+ 42 - 0
integration/cluster_proxy.go

@@ -0,0 +1,42 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// +build cluster_proxy
+
+package integration
+
+import (
+	"github.com/coreos/etcd/clientv3"
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	"github.com/coreos/etcd/proxy/grpcproxy"
+)
+
+func toGRPC(c *clientv3.Client) grpcAPI {
+	return grpcAPI{
+		pb.NewClusterClient(c.ActiveConnection()),
+		grpcproxy.KvServerToKvClient(grpcproxy.NewKvProxy(c)),
+		pb.NewLeaseClient(c.ActiveConnection()),
+		pb.NewWatchClient(c.ActiveConnection()),
+		pb.NewMaintenanceClient(c.ActiveConnection()),
+	}
+}
+
+func newClientV3(cfg clientv3.Config) (*clientv3.Client, error) {
+	c, err := clientv3.New(cfg)
+	if err != nil {
+		return nil, err
+	}
+	c.KV = clientv3.NewKVFromKVClient(grpcproxy.KvServerToKvClient(grpcproxy.NewKvProxy(c)))
+	return c, nil
+}

+ 12 - 10
proxy/grpcproxy/kv.go

@@ -23,14 +23,14 @@ import (
 )
 )
 
 
 type kvProxy struct {
 type kvProxy struct {
-	client *clientv3.Client
-	cache  cache.Cache
+	kv    clientv3.KV
+	cache cache.Cache
 }
 }
 
 
 func NewKvProxy(c *clientv3.Client) pb.KVServer {
 func NewKvProxy(c *clientv3.Client) pb.KVServer {
 	return &kvProxy{
 	return &kvProxy{
-		client: c,
-		cache:  cache.NewCache(cache.DefaultMaxEntries),
+		kv:    c.KV,
+		cache: cache.NewCache(cache.DefaultMaxEntries),
 	}
 	}
 }
 }
 
 
@@ -46,7 +46,7 @@ func (p *kvProxy) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRespo
 		}
 		}
 	}
 	}
 
 
-	resp, err := p.client.Do(ctx, RangeRequestToOp(r))
+	resp, err := p.kv.Do(ctx, RangeRequestToOp(r))
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
@@ -57,17 +57,17 @@ func (p *kvProxy) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRespo
 }
 }
 
 
 func (p *kvProxy) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
 func (p *kvProxy) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
-	resp, err := p.client.Do(ctx, PutRequestToOp(r))
+	resp, err := p.kv.Do(ctx, PutRequestToOp(r))
 	return (*pb.PutResponse)(resp.Put()), err
 	return (*pb.PutResponse)(resp.Put()), err
 }
 }
 
 
 func (p *kvProxy) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
 func (p *kvProxy) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
-	resp, err := p.client.Do(ctx, DelRequestToOp(r))
+	resp, err := p.kv.Do(ctx, DelRequestToOp(r))
 	return (*pb.DeleteRangeResponse)(resp.Del()), err
 	return (*pb.DeleteRangeResponse)(resp.Del()), err
 }
 }
 
 
 func (p *kvProxy) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
 func (p *kvProxy) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
-	txn := p.client.Txn(ctx)
+	txn := p.kv.Txn(ctx)
 	cmps := make([]clientv3.Cmp, len(r.Compare))
 	cmps := make([]clientv3.Cmp, len(r.Compare))
 	thenops := make([]clientv3.Op, len(r.Success))
 	thenops := make([]clientv3.Op, len(r.Success))
 	elseops := make([]clientv3.Op, len(r.Failure))
 	elseops := make([]clientv3.Op, len(r.Failure))
@@ -94,7 +94,7 @@ func (p *kvProxy) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.Com
 		opts = append(opts, clientv3.WithCompactPhysical())
 		opts = append(opts, clientv3.WithCompactPhysical())
 	}
 	}
 
 
-	resp, err := p.client.KV.Compact(ctx, r.Revision, opts...)
+	resp, err := p.kv.Compact(ctx, r.Revision, opts...)
 	if err == nil {
 	if err == nil {
 		p.cache.Compact(r.Revision)
 		p.cache.Compact(r.Revision)
 	}
 	}
@@ -151,6 +151,8 @@ func DelRequestToOp(r *pb.DeleteRangeRequest) clientv3.Op {
 	if len(r.RangeEnd) != 0 {
 	if len(r.RangeEnd) != 0 {
 		opts = append(opts, clientv3.WithRange(string(r.RangeEnd)))
 		opts = append(opts, clientv3.WithRange(string(r.RangeEnd)))
 	}
 	}
-
+	if r.PrevKv {
+		opts = append(opts, clientv3.WithPrevKV())
+	}
 	return clientv3.OpDelete(string(r.Key), opts...)
 	return clientv3.OpDelete(string(r.Key), opts...)
 }
 }

+ 47 - 0
proxy/grpcproxy/kv_client_adapter.go

@@ -0,0 +1,47 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package grpcproxy
+
+import (
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	"golang.org/x/net/context"
+	grpc "google.golang.org/grpc"
+)
+
+type kvs2kvc struct{ kvs pb.KVServer }
+
+func KvServerToKvClient(kvs pb.KVServer) pb.KVClient {
+	return &kvs2kvc{kvs}
+}
+
+func (s *kvs2kvc) Range(ctx context.Context, in *pb.RangeRequest, opts ...grpc.CallOption) (*pb.RangeResponse, error) {
+	return s.kvs.Range(ctx, in)
+}
+
+func (s *kvs2kvc) Put(ctx context.Context, in *pb.PutRequest, opts ...grpc.CallOption) (*pb.PutResponse, error) {
+	return s.kvs.Put(ctx, in)
+}
+
+func (s *kvs2kvc) DeleteRange(ctx context.Context, in *pb.DeleteRangeRequest, opts ...grpc.CallOption) (*pb.DeleteRangeResponse, error) {
+	return s.kvs.DeleteRange(ctx, in)
+}
+
+func (s *kvs2kvc) Txn(ctx context.Context, in *pb.TxnRequest, opts ...grpc.CallOption) (*pb.TxnResponse, error) {
+	return s.kvs.Txn(ctx, in)
+}
+
+func (s *kvs2kvc) Compact(ctx context.Context, in *pb.CompactionRequest, opts ...grpc.CallOption) (*pb.CompactionResponse, error) {
+	return s.kvs.Compact(ctx, in)
+}

+ 3 - 0
proxy/grpcproxy/kv_test.go

@@ -55,6 +55,7 @@ func TestKVProxyRange(t *testing.T) {
 
 
 type kvproxyTestServer struct {
 type kvproxyTestServer struct {
 	kp     pb.KVServer
 	kp     pb.KVServer
+	c      *clientv3.Client
 	server *grpc.Server
 	server *grpc.Server
 	l      net.Listener
 	l      net.Listener
 }
 }
@@ -62,6 +63,7 @@ type kvproxyTestServer struct {
 func (kts *kvproxyTestServer) close() {
 func (kts *kvproxyTestServer) close() {
 	kts.server.Stop()
 	kts.server.Stop()
 	kts.l.Close()
 	kts.l.Close()
+	kts.c.Close()
 }
 }
 
 
 func newKVProxyServer(endpoints []string, t *testing.T) *kvproxyTestServer {
 func newKVProxyServer(endpoints []string, t *testing.T) *kvproxyTestServer {
@@ -78,6 +80,7 @@ func newKVProxyServer(endpoints []string, t *testing.T) *kvproxyTestServer {
 
 
 	kvts := &kvproxyTestServer{
 	kvts := &kvproxyTestServer{
 		kp: kvp,
 		kp: kvp,
+		c:  client,
 	}
 	}
 
 
 	var opts []grpc.ServerOption
 	var opts []grpc.ServerOption

+ 4 - 0
test

@@ -81,6 +81,10 @@ function integration_pass {
 	go test -timeout 1m -v ${RACE} -cpu 1,2,4 -run=Example $@ ${TEST}
 	go test -timeout 1m -v ${RACE} -cpu 1,2,4 -run=Example $@ ${TEST}
 }
 }
 
 
+function grpcproxy_pass {
+	go test -timeout 10m -v ${RACE} -tags cluster_proxy -cpu 1,2,4 $@ ${REPO_PATH}/integration
+}
+
 function release_pass {
 function release_pass {
 	UPGRADE_VER=$(git tag -l | tail -1)
 	UPGRADE_VER=$(git tag -l | tail -1)
 	if [ -n "$MANUAL_VER" ]; then
 	if [ -n "$MANUAL_VER" ]; then