Browse Source

Merge pull request #8335 from heyitsanthony/test-put-atmostonce

clientv3: put at most once
Anthony Romano 8 years ago
parent
commit
9f1bfd9e4b

+ 3 - 8
clientv3/client.go

@@ -52,11 +52,9 @@ type Client struct {
 	conn     *grpc.ClientConn
 	conn     *grpc.ClientConn
 	dialerrc chan error
 	dialerrc chan error
 
 
-	cfg              Config
-	creds            *credentials.TransportCredentials
-	balancer         *simpleBalancer
-	retryWrapper     retryRpcFunc
-	retryAuthWrapper retryRpcFunc
+	cfg      Config
+	creds    *credentials.TransportCredentials
+	balancer *simpleBalancer
 
 
 	ctx    context.Context
 	ctx    context.Context
 	cancel context.CancelFunc
 	cancel context.CancelFunc
@@ -387,8 +385,6 @@ func newClient(cfg *Config) (*Client, error) {
 		return nil, err
 		return nil, err
 	}
 	}
 	client.conn = conn
 	client.conn = conn
-	client.retryWrapper = client.newRetryWrapper()
-	client.retryAuthWrapper = client.newAuthRetryWrapper()
 
 
 	// wait for a connection
 	// wait for a connection
 	if cfg.DialTimeout > 0 {
 	if cfg.DialTimeout > 0 {
@@ -510,7 +506,6 @@ func toErr(ctx context.Context, err error) error {
 			err = ctx.Err()
 			err = ctx.Err()
 		}
 		}
 	case codes.Unavailable:
 	case codes.Unavailable:
-		err = ErrNoAvailableEndpoints
 	case codes.FailedPrecondition:
 	case codes.FailedPrecondition:
 		err = grpc.ErrClientConnClosing
 		err = grpc.ErrClientConnClosing
 	}
 	}

+ 15 - 0
clientv3/integration/dial_test.go

@@ -16,6 +16,7 @@ package integration
 
 
 import (
 import (
 	"math/rand"
 	"math/rand"
+	"strings"
 	"testing"
 	"testing"
 	"time"
 	"time"
 
 
@@ -189,3 +190,17 @@ func TestDialForeignEndpoint(t *testing.T) {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
 }
 }
+
+// TestSetEndpointAndPut checks that a Put following a SetEndpoint
+// to a working endpoint will always succeed.
+func TestSetEndpointAndPut(t *testing.T) {
+	defer testutil.AfterTest(t)
+	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2})
+	defer clus.Terminate(t)
+
+	clus.Client(1).SetEndpoints(clus.Members[0].GRPCAddr())
+	_, err := clus.Client(1).Put(context.TODO(), "foo", "bar")
+	if err != nil && !strings.Contains(err.Error(), "closing") {
+		t.Fatal(err)
+	}
+}

+ 37 - 0
clientv3/integration/kv_test.go

@@ -895,3 +895,40 @@ func TestKVGetResetLoneEndpoint(t *testing.T) {
 	case <-donec:
 	case <-donec:
 	}
 	}
 }
 }
+
+// TestKVPutAtMostOnce ensures that a Put will only occur at most once
+// in the presence of network errors.
+func TestKVPutAtMostOnce(t *testing.T) {
+	defer testutil.AfterTest(t)
+	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
+	defer clus.Terminate(t)
+
+	if _, err := clus.Client(0).Put(context.TODO(), "k", "1"); err != nil {
+		t.Fatal(err)
+	}
+
+	for i := 0; i < 10; i++ {
+		clus.Members[0].DropConnections()
+		donec := make(chan struct{})
+		go func() {
+			defer close(donec)
+			for i := 0; i < 10; i++ {
+				clus.Members[0].DropConnections()
+				time.Sleep(5 * time.Millisecond)
+			}
+		}()
+		_, err := clus.Client(0).Put(context.TODO(), "k", "v")
+		<-donec
+		if err != nil {
+			break
+		}
+	}
+
+	resp, err := clus.Client(0).Get(context.TODO(), "k")
+	if err != nil {
+		t.Fatal(err)
+	}
+	if resp.Kvs[0].Version > 11 {
+		t.Fatalf("expected version <= 10, got %+v", resp.Kvs[0])
+	}
+}

+ 5 - 14
clientv3/ordering/kv_test.go

@@ -45,15 +45,11 @@ func TestDetectKvOrderViolation(t *testing.T) {
 	cli, err := clientv3.New(cfg)
 	cli, err := clientv3.New(cfg)
 	ctx := context.TODO()
 	ctx := context.TODO()
 
 
-	cli.SetEndpoints(clus.Members[0].GRPCAddr())
-	_, err = cli.Put(ctx, "foo", "bar")
-	if err != nil {
+	if _, err = clus.Client(0).Put(ctx, "foo", "bar"); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
 	// ensure that the second member has the current revision for the key foo
 	// ensure that the second member has the current revision for the key foo
-	cli.SetEndpoints(clus.Members[1].GRPCAddr())
-	_, err = cli.Get(ctx, "foo")
-	if err != nil {
+	if _, err = clus.Client(1).Get(ctx, "foo"); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
 
 
@@ -107,23 +103,18 @@ func TestDetectTxnOrderViolation(t *testing.T) {
 	cli, err := clientv3.New(cfg)
 	cli, err := clientv3.New(cfg)
 	ctx := context.TODO()
 	ctx := context.TODO()
 
 
-	cli.SetEndpoints(clus.Members[0].GRPCAddr())
-	_, err = cli.Put(ctx, "foo", "bar")
-	if err != nil {
+	if _, err = clus.Client(0).Put(ctx, "foo", "bar"); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
 	// ensure that the second member has the current revision for the key foo
 	// ensure that the second member has the current revision for the key foo
-	cli.SetEndpoints(clus.Members[1].GRPCAddr())
-	_, err = cli.Get(ctx, "foo")
-	if err != nil {
+	if _, err = clus.Client(1).Get(ctx, "foo"); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
 
 
 	// stop third member in order to force the member to have an outdated revision
 	// stop third member in order to force the member to have an outdated revision
 	clus.Members[2].Stop(t)
 	clus.Members[2].Stop(t)
 	time.Sleep(1 * time.Second) // give enough time for operation
 	time.Sleep(1 * time.Second) // give enough time for operation
-	_, err = cli.Put(ctx, "foo", "buzz")
-	if err != nil {
+	if _, err = clus.Client(1).Put(ctx, "foo", "buzz"); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
 
 

+ 9 - 15
clientv3/ordering/util_test.go

@@ -28,26 +28,21 @@ func TestEndpointSwitchResolvesViolation(t *testing.T) {
 	defer testutil.AfterTest(t)
 	defer testutil.AfterTest(t)
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 	defer clus.Terminate(t)
-	cfg := clientv3.Config{
-		Endpoints: []string{
-			clus.Members[0].GRPCAddr(),
-			clus.Members[1].GRPCAddr(),
-			clus.Members[2].GRPCAddr(),
-		},
+	eps := []string{
+		clus.Members[0].GRPCAddr(),
+		clus.Members[1].GRPCAddr(),
+		clus.Members[2].GRPCAddr(),
 	}
 	}
+	cfg := clientv3.Config{Endpoints: []string{clus.Members[0].GRPCAddr()}}
 	cli, err := clientv3.New(cfg)
 	cli, err := clientv3.New(cfg)
-	eps := cli.Endpoints()
+
 	ctx := context.TODO()
 	ctx := context.TODO()
 
 
-	cli.SetEndpoints(clus.Members[0].GRPCAddr())
-	_, err = cli.Put(ctx, "foo", "bar")
-	if err != nil {
+	if _, err = clus.Client(0).Put(ctx, "foo", "bar"); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
 	// ensure that the second member has current revision for key "foo"
 	// ensure that the second member has current revision for key "foo"
-	cli.SetEndpoints(clus.Members[1].GRPCAddr())
-	_, err = cli.Get(ctx, "foo")
-	if err != nil {
+	if _, err = clus.Client(1).Get(ctx, "foo"); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
 
 
@@ -58,8 +53,7 @@ func TestEndpointSwitchResolvesViolation(t *testing.T) {
 	time.Sleep(1 * time.Second) // give enough time for the operation
 	time.Sleep(1 * time.Second) // give enough time for the operation
 
 
 	// update to "foo" will not be replicated to the third member due to the partition
 	// update to "foo" will not be replicated to the third member due to the partition
-	_, err = cli.Put(ctx, "foo", "buzz")
-	if err != nil {
+	if _, err = clus.Client(1).Put(ctx, "foo", "buzz"); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
 
 

+ 36 - 23
clientv3/retry.go

@@ -24,26 +24,29 @@ import (
 
 
 type rpcFunc func(ctx context.Context) error
 type rpcFunc func(ctx context.Context) error
 type retryRpcFunc func(context.Context, rpcFunc) error
 type retryRpcFunc func(context.Context, rpcFunc) error
+type retryStopErrFunc func(error) bool
 
 
-func (c *Client) newRetryWrapper() retryRpcFunc {
-	return func(rpcCtx context.Context, f rpcFunc) error {
-		for {
-			err := f(rpcCtx)
-			if err == nil {
-				return nil
-			}
+func isReadStopError(err error) bool {
+	eErr := rpctypes.Error(err)
+	// always stop retry on etcd errors
+	if _, ok := eErr.(rpctypes.EtcdError); ok {
+		return true
+	}
+	// only retry if unavailable
+	return grpc.Code(err) != codes.Unavailable
+}
 
 
-			eErr := rpctypes.Error(err)
-			// always stop retry on etcd errors
-			if _, ok := eErr.(rpctypes.EtcdError); ok {
-				return err
-			}
+func isWriteStopError(err error) bool {
+	return grpc.Code(err) != codes.Unavailable ||
+		grpc.ErrorDesc(err) != "there is no address available"
+}
 
 
-			// only retry if unavailable
-			if grpc.Code(err) != codes.Unavailable {
+func (c *Client) newRetryWrapper(isStop retryStopErrFunc) retryRpcFunc {
+	return func(rpcCtx context.Context, f rpcFunc) error {
+		for {
+			if err := f(rpcCtx); err == nil || isStop(err) {
 				return err
 				return err
 			}
 			}
-
 			select {
 			select {
 			case <-c.balancer.ConnectNotify():
 			case <-c.balancer.ConnectNotify():
 			case <-rpcCtx.Done():
 			case <-rpcCtx.Done():
@@ -79,17 +82,24 @@ func (c *Client) newAuthRetryWrapper() retryRpcFunc {
 
 
 // RetryKVClient implements a KVClient that uses the client's FailFast retry policy.
 // RetryKVClient implements a KVClient that uses the client's FailFast retry policy.
 func RetryKVClient(c *Client) pb.KVClient {
 func RetryKVClient(c *Client) pb.KVClient {
-	retryWrite := &retryWriteKVClient{pb.NewKVClient(c.conn), c.retryWrapper}
-	return &retryKVClient{&retryWriteKVClient{retryWrite, c.retryAuthWrapper}}
+	readRetry := c.newRetryWrapper(isReadStopError)
+	writeRetry := c.newRetryWrapper(isWriteStopError)
+	conn := pb.NewKVClient(c.conn)
+	retryBasic := &retryKVClient{&retryWriteKVClient{conn, writeRetry}, readRetry}
+	retryAuthWrapper := c.newAuthRetryWrapper()
+	return &retryKVClient{
+		&retryWriteKVClient{retryBasic, retryAuthWrapper},
+		retryAuthWrapper}
 }
 }
 
 
 type retryKVClient struct {
 type retryKVClient struct {
 	*retryWriteKVClient
 	*retryWriteKVClient
+	readRetry retryRpcFunc
 }
 }
 
 
 func (rkv *retryKVClient) Range(ctx context.Context, in *pb.RangeRequest, opts ...grpc.CallOption) (resp *pb.RangeResponse, err error) {
 func (rkv *retryKVClient) Range(ctx context.Context, in *pb.RangeRequest, opts ...grpc.CallOption) (resp *pb.RangeResponse, err error) {
-	err = rkv.retryf(ctx, func(rctx context.Context) error {
-		resp, err = rkv.retryWriteKVClient.Range(rctx, in, opts...)
+	err = rkv.readRetry(ctx, func(rctx context.Context) error {
+		resp, err = rkv.KVClient.Range(rctx, in, opts...)
 		return err
 		return err
 	})
 	})
 	return resp, err
 	return resp, err
@@ -139,8 +149,11 @@ type retryLeaseClient struct {
 
 
 // RetryLeaseClient implements a LeaseClient that uses the client's FailFast retry policy.
 // RetryLeaseClient implements a LeaseClient that uses the client's FailFast retry policy.
 func RetryLeaseClient(c *Client) pb.LeaseClient {
 func RetryLeaseClient(c *Client) pb.LeaseClient {
-	retry := &retryLeaseClient{pb.NewLeaseClient(c.conn), c.retryWrapper}
-	return &retryLeaseClient{retry, c.retryAuthWrapper}
+	retry := &retryLeaseClient{
+		pb.NewLeaseClient(c.conn),
+		c.newRetryWrapper(isReadStopError),
+	}
+	return &retryLeaseClient{retry, c.newAuthRetryWrapper()}
 }
 }
 
 
 func (rlc *retryLeaseClient) LeaseGrant(ctx context.Context, in *pb.LeaseGrantRequest, opts ...grpc.CallOption) (resp *pb.LeaseGrantResponse, err error) {
 func (rlc *retryLeaseClient) LeaseGrant(ctx context.Context, in *pb.LeaseGrantRequest, opts ...grpc.CallOption) (resp *pb.LeaseGrantResponse, err error) {
@@ -167,7 +180,7 @@ type retryClusterClient struct {
 
 
 // RetryClusterClient implements a ClusterClient that uses the client's FailFast retry policy.
 // RetryClusterClient implements a ClusterClient that uses the client's FailFast retry policy.
 func RetryClusterClient(c *Client) pb.ClusterClient {
 func RetryClusterClient(c *Client) pb.ClusterClient {
-	return &retryClusterClient{pb.NewClusterClient(c.conn), c.retryWrapper}
+	return &retryClusterClient{pb.NewClusterClient(c.conn), c.newRetryWrapper(isWriteStopError)}
 }
 }
 
 
 func (rcc *retryClusterClient) MemberAdd(ctx context.Context, in *pb.MemberAddRequest, opts ...grpc.CallOption) (resp *pb.MemberAddResponse, err error) {
 func (rcc *retryClusterClient) MemberAdd(ctx context.Context, in *pb.MemberAddRequest, opts ...grpc.CallOption) (resp *pb.MemberAddResponse, err error) {
@@ -201,7 +214,7 @@ type retryAuthClient struct {
 
 
 // RetryAuthClient implements a AuthClient that uses the client's FailFast retry policy.
 // RetryAuthClient implements a AuthClient that uses the client's FailFast retry policy.
 func RetryAuthClient(c *Client) pb.AuthClient {
 func RetryAuthClient(c *Client) pb.AuthClient {
-	return &retryAuthClient{pb.NewAuthClient(c.conn), c.retryWrapper}
+	return &retryAuthClient{pb.NewAuthClient(c.conn), c.newRetryWrapper(isWriteStopError)}
 }
 }
 
 
 func (rac *retryAuthClient) AuthEnable(ctx context.Context, in *pb.AuthEnableRequest, opts ...grpc.CallOption) (resp *pb.AuthEnableResponse, err error) {
 func (rac *retryAuthClient) AuthEnable(ctx context.Context, in *pb.AuthEnableRequest, opts ...grpc.CallOption) (resp *pb.AuthEnableResponse, err error) {