|
|
@@ -20,9 +20,12 @@ import (
|
|
|
|
|
|
v3 "github.com/coreos/etcd/clientv3"
|
|
|
"github.com/coreos/etcd/clientv3/concurrency"
|
|
|
+ "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
|
|
"github.com/coreos/etcd/mvcc/mvccpb"
|
|
|
|
|
|
"golang.org/x/net/context"
|
|
|
+ "google.golang.org/grpc"
|
|
|
+ "google.golang.org/grpc/codes"
|
|
|
)
|
|
|
|
|
|
type leasingKV struct {
|
|
|
@@ -239,16 +242,33 @@ func (lkv *leasingKV) put(ctx context.Context, op v3.Op) (pr *v3.PutResponse, er
|
|
|
}
|
|
|
|
|
|
func (lkv *leasingKV) acquire(ctx context.Context, key string, op v3.Op) (*v3.TxnResponse, error) {
|
|
|
- if err := lkv.waitSession(ctx); err != nil {
|
|
|
- return nil, err
|
|
|
+ for ctx.Err() == nil {
|
|
|
+ if err := lkv.waitSession(ctx); err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ resp, err := lkv.kv.Txn(ctx).If(
|
|
|
+ v3.Compare(v3.CreateRevision(lkv.pfx+key), "=", 0)).
|
|
|
+ Then(
|
|
|
+ op,
|
|
|
+ v3.OpPut(lkv.pfx+key, "", v3.WithLease(lkv.leaseID()))).
|
|
|
+ Else(
|
|
|
+ op,
|
|
|
+ v3.OpGet(lkv.pfx+key),
|
|
|
+ ).Commit()
|
|
|
+ if err == nil {
|
|
|
+ if !resp.Succeeded {
|
|
|
+ kvs := resp.Responses[1].GetResponseRange().Kvs
|
|
|
+ // if txn failed since already owner, lease is acquired
|
|
|
+ resp.Succeeded = v3.LeaseID(kvs[0].Lease) == lkv.leaseID()
|
|
|
+ }
|
|
|
+ return resp, nil
|
|
|
+ }
|
|
|
+ // retry if transient error
|
|
|
+ if _, ok := err.(rpctypes.EtcdError); ok || grpc.Code(err) != codes.Unavailable {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
}
|
|
|
- return lkv.kv.Txn(ctx).If(
|
|
|
- v3.Compare(v3.CreateRevision(lkv.pfx+key), "=", 0)).
|
|
|
- Then(
|
|
|
- op,
|
|
|
- v3.OpPut(lkv.pfx+key, "", v3.WithLease(lkv.leaseID()))).
|
|
|
- Else(op).
|
|
|
- Commit()
|
|
|
+ return nil, ctx.Err()
|
|
|
}
|
|
|
|
|
|
func (lkv *leasingKV) get(ctx context.Context, op v3.Op) (*v3.GetResponse, error) {
|