Browse Source

Merge pull request #8374 from heyitsanthony/fix-leasing-reconn

leasing: retry on errors from acquire txn
Anthony Romano 8 years ago
parent
commit
921e0dbd72
2 changed files with 31 additions and 11 deletions
  1. 2 2
      clientv3/integration/leasing_test.go
  2. 29 9
      clientv3/leasing/kv.go

+ 2 - 2
clientv3/integration/leasing_test.go

@@ -1296,7 +1296,7 @@ func TestLeasingReconnectOwnerRevoke(t *testing.T) {
 		defer close(sdonec)
 		for i := 0; i < 10 && cctx.Err() == nil; i++ {
 			clus.Members[0].Stop(t)
-			time.Sleep(100 * time.Millisecond)
+			time.Sleep(10 * time.Millisecond)
 			clus.Members[0].Restart(t)
 		}
 	}()
@@ -1317,7 +1317,7 @@ func TestLeasingReconnectOwnerRevoke(t *testing.T) {
 	case <-pdonec:
 		cancel()
 		<-sdonec
-	case <-time.After(5 * time.Second):
+	case <-time.After(10 * time.Second):
 		cancel()
 		<-sdonec
 		<-pdonec

+ 29 - 9
clientv3/leasing/kv.go

@@ -20,9 +20,12 @@ import (
 
 	v3 "github.com/coreos/etcd/clientv3"
 	"github.com/coreos/etcd/clientv3/concurrency"
+	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
 	"github.com/coreos/etcd/mvcc/mvccpb"
 
 	"golang.org/x/net/context"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/codes"
 )
 
 type leasingKV struct {
@@ -239,16 +242,33 @@ func (lkv *leasingKV) put(ctx context.Context, op v3.Op) (pr *v3.PutResponse, er
 }
 
 func (lkv *leasingKV) acquire(ctx context.Context, key string, op v3.Op) (*v3.TxnResponse, error) {
-	if err := lkv.waitSession(ctx); err != nil {
-		return nil, err
+	for ctx.Err() == nil {
+		if err := lkv.waitSession(ctx); err != nil {
+			return nil, err
+		}
+		resp, err := lkv.kv.Txn(ctx).If(
+			v3.Compare(v3.CreateRevision(lkv.pfx+key), "=", 0)).
+			Then(
+				op,
+				v3.OpPut(lkv.pfx+key, "", v3.WithLease(lkv.leaseID()))).
+			Else(
+				op,
+				v3.OpGet(lkv.pfx+key),
+			).Commit()
+		if err == nil {
+			if !resp.Succeeded {
+				kvs := resp.Responses[1].GetResponseRange().Kvs
+				// if txn failed since already owner, lease is acquired
+				resp.Succeeded = v3.LeaseID(kvs[0].Lease) == lkv.leaseID()
+			}
+			return resp, nil
+		}
+		// retry if transient error
+		if _, ok := err.(rpctypes.EtcdError); ok || grpc.Code(err) != codes.Unavailable {
+			return nil, err
+		}
 	}
-	return lkv.kv.Txn(ctx).If(
-		v3.Compare(v3.CreateRevision(lkv.pfx+key), "=", 0)).
-		Then(
-			op,
-			v3.OpPut(lkv.pfx+key, "", v3.WithLease(lkv.leaseID()))).
-		Else(op).
-		Commit()
+	return nil, ctx.Err()
 }
 
 func (lkv *leasingKV) get(ctx context.Context, op v3.Op) (*v3.GetResponse, error) {