Browse Source

clientv3: call KV/Txn APIs with default gRPC call options

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
Gyuho Lee 8 years ago
parent
commit
f87760998b

+ 1 - 1
clientv3/integration/dial_test.go

@@ -182,7 +182,7 @@ func TestDialForeignEndpoint(t *testing.T) {
 
 
 	// grpc can return a lazy connection that's not connected yet; confirm
 	// grpc can return a lazy connection that's not connected yet; confirm
 	// that it can communicate with the cluster.
 	// that it can communicate with the cluster.
-	kvc := clientv3.NewKVFromKVClient(pb.NewKVClient(conn))
+	kvc := clientv3.NewKVFromKVClient(pb.NewKVClient(conn), clus.Client(0))
 	ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
 	ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
 	defer cancel()
 	defer cancel()
 	if _, gerr := kvc.Get(ctx, "abc"); gerr != nil {
 	if _, gerr := kvc.Get(ctx, "abc"); gerr != nil {

+ 23 - 11
clientv3/kv.go

@@ -18,6 +18,8 @@ import (
 	"context"
 	"context"
 
 
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+
+	"google.golang.org/grpc"
 )
 )
 
 
 type (
 type (
@@ -88,15 +90,24 @@ func (resp *TxnResponse) OpResponse() OpResponse {
 }
 }
 
 
 type kv struct {
 type kv struct {
-	remote pb.KVClient
+	remote   pb.KVClient
+	callOpts []grpc.CallOption
 }
 }
 
 
 func NewKV(c *Client) KV {
 func NewKV(c *Client) KV {
-	return &kv{remote: RetryKVClient(c)}
+	api := &kv{remote: RetryKVClient(c)}
+	if c != nil {
+		api.callOpts = c.callOpts
+	}
+	return api
 }
 }
 
 
-func NewKVFromKVClient(remote pb.KVClient) KV {
-	return &kv{remote: remote}
+func NewKVFromKVClient(remote pb.KVClient, c *Client) KV {
+	api := &kv{remote: remote}
+	if c != nil {
+		api.callOpts = c.callOpts
+	}
+	return api
 }
 }
 
 
 func (kv *kv) Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error) {
 func (kv *kv) Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error) {
@@ -115,7 +126,7 @@ func (kv *kv) Delete(ctx context.Context, key string, opts ...OpOption) (*Delete
 }
 }
 
 
 func (kv *kv) Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error) {
 func (kv *kv) Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error) {
-	resp, err := kv.remote.Compact(ctx, OpCompact(rev, opts...).toRequest())
+	resp, err := kv.remote.Compact(ctx, OpCompact(rev, opts...).toRequest(), kv.callOpts...)
 	if err != nil {
 	if err != nil {
 		return nil, toErr(ctx, err)
 		return nil, toErr(ctx, err)
 	}
 	}
@@ -124,8 +135,9 @@ func (kv *kv) Compact(ctx context.Context, rev int64, opts ...CompactOption) (*C
 
 
 func (kv *kv) Txn(ctx context.Context) Txn {
 func (kv *kv) Txn(ctx context.Context) Txn {
 	return &txn{
 	return &txn{
-		kv:  kv,
-		ctx: ctx,
+		kv:       kv,
+		ctx:      ctx,
+		callOpts: kv.callOpts,
 	}
 	}
 }
 }
 
 
@@ -134,27 +146,27 @@ func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) {
 	switch op.t {
 	switch op.t {
 	case tRange:
 	case tRange:
 		var resp *pb.RangeResponse
 		var resp *pb.RangeResponse
-		resp, err = kv.remote.Range(ctx, op.toRangeRequest())
+		resp, err = kv.remote.Range(ctx, op.toRangeRequest(), kv.callOpts...)
 		if err == nil {
 		if err == nil {
 			return OpResponse{get: (*GetResponse)(resp)}, nil
 			return OpResponse{get: (*GetResponse)(resp)}, nil
 		}
 		}
 	case tPut:
 	case tPut:
 		var resp *pb.PutResponse
 		var resp *pb.PutResponse
 		r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV, IgnoreValue: op.ignoreValue, IgnoreLease: op.ignoreLease}
 		r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV, IgnoreValue: op.ignoreValue, IgnoreLease: op.ignoreLease}
-		resp, err = kv.remote.Put(ctx, r)
+		resp, err = kv.remote.Put(ctx, r, kv.callOpts...)
 		if err == nil {
 		if err == nil {
 			return OpResponse{put: (*PutResponse)(resp)}, nil
 			return OpResponse{put: (*PutResponse)(resp)}, nil
 		}
 		}
 	case tDeleteRange:
 	case tDeleteRange:
 		var resp *pb.DeleteRangeResponse
 		var resp *pb.DeleteRangeResponse
 		r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end, PrevKv: op.prevKV}
 		r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end, PrevKv: op.prevKV}
-		resp, err = kv.remote.DeleteRange(ctx, r)
+		resp, err = kv.remote.DeleteRange(ctx, r, kv.callOpts...)
 		if err == nil {
 		if err == nil {
 			return OpResponse{del: (*DeleteResponse)(resp)}, nil
 			return OpResponse{del: (*DeleteResponse)(resp)}, nil
 		}
 		}
 	case tTxn:
 	case tTxn:
 		var resp *pb.TxnResponse
 		var resp *pb.TxnResponse
-		resp, err = kv.remote.Txn(ctx, op.toTxnRequest())
+		resp, err = kv.remote.Txn(ctx, op.toTxnRequest(), kv.callOpts...)
 		if err == nil {
 		if err == nil {
 			return OpResponse{txn: (*TxnResponse)(resp)}, nil
 			return OpResponse{txn: (*TxnResponse)(resp)}, nil
 		}
 		}

+ 2 - 2
clientv3/ordering/kv_test.go

@@ -204,7 +204,7 @@ var rangeTests = []struct {
 
 
 func TestKvOrdering(t *testing.T) {
 func TestKvOrdering(t *testing.T) {
 	for i, tt := range rangeTests {
 	for i, tt := range rangeTests {
-		mKV := &mockKV{clientv3.NewKVFromKVClient(nil), tt.response.OpResponse()}
+		mKV := &mockKV{clientv3.NewKVFromKVClient(nil, nil), tt.response.OpResponse()}
 		kv := &kvOrdering{
 		kv := &kvOrdering{
 			mKV,
 			mKV,
 			func(r *clientv3.GetResponse) OrderViolationFunc {
 			func(r *clientv3.GetResponse) OrderViolationFunc {
@@ -258,7 +258,7 @@ var txnTests = []struct {
 
 
 func TestTxnOrdering(t *testing.T) {
 func TestTxnOrdering(t *testing.T) {
 	for i, tt := range txnTests {
 	for i, tt := range txnTests {
-		mKV := &mockKV{clientv3.NewKVFromKVClient(nil), tt.response.OpResponse()}
+		mKV := &mockKV{clientv3.NewKVFromKVClient(nil, nil), tt.response.OpResponse()}
 		kv := &kvOrdering{
 		kv := &kvOrdering{
 			mKV,
 			mKV,
 			func(r *clientv3.TxnResponse) OrderViolationFunc {
 			func(r *clientv3.TxnResponse) OrderViolationFunc {

+ 5 - 1
clientv3/txn.go

@@ -19,6 +19,8 @@ import (
 	"sync"
 	"sync"
 
 
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+
+	"google.golang.org/grpc"
 )
 )
 
 
 // Txn is the interface that wraps mini-transactions.
 // Txn is the interface that wraps mini-transactions.
@@ -65,6 +67,8 @@ type txn struct {
 
 
 	sus []*pb.RequestOp
 	sus []*pb.RequestOp
 	fas []*pb.RequestOp
 	fas []*pb.RequestOp
+
+	callOpts []grpc.CallOption
 }
 }
 
 
 func (txn *txn) If(cs ...Cmp) Txn {
 func (txn *txn) If(cs ...Cmp) Txn {
@@ -139,7 +143,7 @@ func (txn *txn) Commit() (*TxnResponse, error) {
 
 
 	var resp *pb.TxnResponse
 	var resp *pb.TxnResponse
 	var err error
 	var err error
-	resp, err = txn.kv.remote.Txn(txn.ctx, r)
+	resp, err = txn.kv.remote.Txn(txn.ctx, r, txn.callOpts...)
 	if err != nil {
 	if err != nil {
 		return nil, toErr(txn.ctx, err)
 		return nil, toErr(txn.ctx, err)
 	}
 	}

+ 1 - 1
etcdserver/api/v3client/v3client.go

@@ -31,7 +31,7 @@ func New(s *etcdserver.EtcdServer) *clientv3.Client {
 	c := clientv3.NewCtxClient(context.Background())
 	c := clientv3.NewCtxClient(context.Background())
 
 
 	kvc := adapter.KvServerToKvClient(v3rpc.NewQuotaKVServer(s))
 	kvc := adapter.KvServerToKvClient(v3rpc.NewQuotaKVServer(s))
-	c.KV = clientv3.NewKVFromKVClient(kvc)
+	c.KV = clientv3.NewKVFromKVClient(kvc, c)
 
 
 	lc := adapter.LeaseServerToLeaseClient(v3rpc.NewQuotaLeaseServer(s))
 	lc := adapter.LeaseServerToLeaseClient(v3rpc.NewQuotaLeaseServer(s))
 	c.Lease = clientv3.NewLeaseFromLeaseClient(lc, time.Second)
 	c.Lease = clientv3.NewLeaseFromLeaseClient(lc, time.Second)

+ 1 - 1
integration/cluster_proxy.go

@@ -99,7 +99,7 @@ func newClientV3(cfg clientv3.Config) (*clientv3.Client, error) {
 		return nil, err
 		return nil, err
 	}
 	}
 	rpc := toGRPC(c)
 	rpc := toGRPC(c)
-	c.KV = clientv3.NewKVFromKVClient(rpc.KV)
+	c.KV = clientv3.NewKVFromKVClient(rpc.KV, c)
 	pmu.Lock()
 	pmu.Lock()
 	lc := c.Lease
 	lc := c.Lease
 	c.Lease = clientv3.NewLeaseFromLeaseClient(rpc.Lease, cfg.DialTimeout)
 	c.Lease = clientv3.NewLeaseFromLeaseClient(rpc.Lease, cfg.DialTimeout)