|
@@ -0,0 +1,431 @@
|
|
|
|
|
+// Copyright 2017 The etcd Authors
|
|
|
|
|
+//
|
|
|
|
|
+// Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
|
+// you may not use this file except in compliance with the License.
|
|
|
|
|
+// You may obtain a copy of the License at
|
|
|
|
|
+//
|
|
|
|
|
+// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
|
+//
|
|
|
|
|
+// Unless required by applicable law or agreed to in writing, software
|
|
|
|
|
+// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
|
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
|
+// See the License for the specific language governing permissions and
|
|
|
|
|
+// limitations under the License.
|
|
|
|
|
+
|
|
|
|
|
+package leasing
|
|
|
|
|
+
|
|
|
|
|
+import (
|
|
|
|
|
+ "strings"
|
|
|
|
|
+ "time"
|
|
|
|
|
+
|
|
|
|
|
+ v3 "github.com/coreos/etcd/clientv3"
|
|
|
|
|
+ "github.com/coreos/etcd/clientv3/concurrency"
|
|
|
|
|
+ "github.com/coreos/etcd/mvcc/mvccpb"
|
|
|
|
|
+
|
|
|
|
|
+ "golang.org/x/net/context"
|
|
|
|
|
+)
|
|
|
|
|
+
|
|
|
|
|
+type leasingKV struct {
|
|
|
|
|
+ cl *v3.Client
|
|
|
|
|
+ kv v3.KV
|
|
|
|
|
+ pfx string
|
|
|
|
|
+ leases leaseCache
|
|
|
|
|
+ ctx context.Context
|
|
|
|
|
+ cancel context.CancelFunc
|
|
|
|
|
+
|
|
|
|
|
+ sessionOpts []concurrency.SessionOption
|
|
|
|
|
+ session *concurrency.Session
|
|
|
|
|
+ sessionc chan struct{}
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+var closedCh chan struct{}
|
|
|
|
|
+
|
|
|
|
|
+func init() {
|
|
|
|
|
+ closedCh = make(chan struct{})
|
|
|
|
|
+ close(closedCh)
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+// NewKV wraps a KV instance so that all requests are wired through a leasing protocol.
|
|
|
|
|
+func NewKV(cl *v3.Client, pfx string, opts ...concurrency.SessionOption) (v3.KV, error) {
|
|
|
|
|
+ cctx, cancel := context.WithCancel(cl.Ctx())
|
|
|
|
|
+ lkv := leasingKV{
|
|
|
|
|
+ cl: cl,
|
|
|
|
|
+ kv: cl.KV,
|
|
|
|
|
+ pfx: pfx,
|
|
|
|
|
+ leases: leaseCache{revokes: make(map[string]time.Time)},
|
|
|
|
|
+ ctx: cctx,
|
|
|
|
|
+ cancel: cancel,
|
|
|
|
|
+ sessionOpts: opts,
|
|
|
|
|
+ sessionc: make(chan struct{}),
|
|
|
|
|
+ }
|
|
|
|
|
+ go lkv.monitorSession()
|
|
|
|
|
+ go lkv.leases.clearOldRevokes(cctx)
|
|
|
|
|
+ return &lkv, lkv.waitSession(cctx)
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (lkv *leasingKV) Get(ctx context.Context, key string, opts ...v3.OpOption) (*v3.GetResponse, error) {
|
|
|
|
|
+ return lkv.get(ctx, v3.OpGet(key, opts...))
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (lkv *leasingKV) Put(ctx context.Context, key, val string, opts ...v3.OpOption) (*v3.PutResponse, error) {
|
|
|
|
|
+ return lkv.put(ctx, v3.OpPut(key, val, opts...))
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (lkv *leasingKV) Delete(ctx context.Context, key string, opts ...v3.OpOption) (*v3.DeleteResponse, error) {
|
|
|
|
|
+ return lkv.delete(ctx, v3.OpDelete(key, opts...))
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (lkv *leasingKV) Do(ctx context.Context, op v3.Op) (v3.OpResponse, error) {
|
|
|
|
|
+ switch {
|
|
|
|
|
+ case op.IsGet():
|
|
|
|
|
+ resp, err := lkv.get(ctx, op)
|
|
|
|
|
+ return resp.OpResponse(), err
|
|
|
|
|
+ case op.IsPut():
|
|
|
|
|
+ resp, err := lkv.put(ctx, op)
|
|
|
|
|
+ return resp.OpResponse(), err
|
|
|
|
|
+ case op.IsDelete():
|
|
|
|
|
+ resp, err := lkv.delete(ctx, op)
|
|
|
|
|
+ return resp.OpResponse(), err
|
|
|
|
|
+ case op.IsTxn():
|
|
|
|
|
+ cmps, thenOps, elseOps := op.Txn()
|
|
|
|
|
+ resp, err := lkv.Txn(ctx).If(cmps...).Then(thenOps...).Else(elseOps...).Commit()
|
|
|
|
|
+ return resp.OpResponse(), err
|
|
|
|
|
+ }
|
|
|
|
|
+ return v3.OpResponse{}, nil
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (lkv *leasingKV) Compact(ctx context.Context, rev int64, opts ...v3.CompactOption) (*v3.CompactResponse, error) {
|
|
|
|
|
+ return lkv.kv.Compact(ctx, rev, opts...)
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (lkv *leasingKV) Txn(ctx context.Context) v3.Txn {
|
|
|
|
|
+ return &txnLeasing{Txn: lkv.kv.Txn(ctx), lkv: lkv, ctx: ctx}
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (lkv *leasingKV) monitorSession() {
|
|
|
|
|
+ for lkv.ctx.Err() == nil {
|
|
|
|
|
+ if lkv.session != nil {
|
|
|
|
|
+ select {
|
|
|
|
|
+ case <-lkv.session.Done():
|
|
|
|
|
+ case <-lkv.ctx.Done():
|
|
|
|
|
+ return
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ lkv.leases.mu.Lock()
|
|
|
|
|
+ select {
|
|
|
|
|
+ case <-lkv.sessionc:
|
|
|
|
|
+ lkv.sessionc = make(chan struct{})
|
|
|
|
|
+ default:
|
|
|
|
|
+ }
|
|
|
|
|
+ lkv.leases.entries = make(map[string]*leaseKey)
|
|
|
|
|
+ lkv.leases.mu.Unlock()
|
|
|
|
|
+
|
|
|
|
|
+ s, err := concurrency.NewSession(lkv.cl, lkv.sessionOpts...)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ continue
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ lkv.leases.mu.Lock()
|
|
|
|
|
+ lkv.session = s
|
|
|
|
|
+ close(lkv.sessionc)
|
|
|
|
|
+ lkv.leases.mu.Unlock()
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (lkv *leasingKV) monitorLease(ctx context.Context, key string, rev int64) {
|
|
|
|
|
+ cctx, cancel := context.WithCancel(lkv.ctx)
|
|
|
|
|
+ defer cancel()
|
|
|
|
|
+ for cctx.Err() == nil {
|
|
|
|
|
+ if rev == 0 {
|
|
|
|
|
+ resp, err := lkv.kv.Get(ctx, lkv.pfx+key)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ continue
|
|
|
|
|
+ }
|
|
|
|
|
+ rev = resp.Header.Revision
|
|
|
|
|
+ if len(resp.Kvs) == 0 || string(resp.Kvs[0].Value) == "REVOKE" {
|
|
|
|
|
+ lkv.rescind(cctx, key, rev)
|
|
|
|
|
+ return
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ wch := lkv.cl.Watch(cctx, lkv.pfx+key, v3.WithRev(rev+1))
|
|
|
|
|
+ for resp := range wch {
|
|
|
|
|
+ for _, ev := range resp.Events {
|
|
|
|
|
+ if string(ev.Kv.Value) != "REVOKE" {
|
|
|
|
|
+ continue
|
|
|
|
|
+ }
|
|
|
|
|
+ if v3.LeaseID(ev.Kv.Lease) == lkv.leaseID() {
|
|
|
|
|
+ lkv.rescind(cctx, key, ev.Kv.ModRevision)
|
|
|
|
|
+ }
|
|
|
|
|
+ return
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ rev = 0
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+// rescind releases a lease from this client.
|
|
|
|
|
+func (lkv *leasingKV) rescind(ctx context.Context, key string, rev int64) {
|
|
|
|
|
+ if lkv.leases.Evict(key) > rev {
|
|
|
|
|
+ return
|
|
|
|
|
+ }
|
|
|
|
|
+ cmp := v3.Compare(v3.CreateRevision(lkv.pfx+key), "<", rev)
|
|
|
|
|
+ op := v3.OpDelete(lkv.pfx + key)
|
|
|
|
|
+ for ctx.Err() == nil {
|
|
|
|
|
+ if _, err := lkv.kv.Txn(ctx).If(cmp).Then(op).Commit(); err == nil {
|
|
|
|
|
+ return
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (lkv *leasingKV) waitRescind(ctx context.Context, key string, rev int64) error {
|
|
|
|
|
+ cctx, cancel := context.WithCancel(ctx)
|
|
|
|
|
+ defer cancel()
|
|
|
|
|
+ wch := lkv.cl.Watch(cctx, lkv.pfx+key, v3.WithRev(rev+1))
|
|
|
|
|
+ for resp := range wch {
|
|
|
|
|
+ for _, ev := range resp.Events {
|
|
|
|
|
+ if ev.Type == v3.EventTypeDelete {
|
|
|
|
|
+ return ctx.Err()
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ return ctx.Err()
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (lkv *leasingKV) tryModifyOp(ctx context.Context, op v3.Op) (*v3.TxnResponse, chan<- struct{}, error) {
|
|
|
|
|
+ key := string(op.KeyBytes())
|
|
|
|
|
+ wc, rev := lkv.leases.Lock(key)
|
|
|
|
|
+ cmp := v3.Compare(v3.CreateRevision(lkv.pfx+key), "<", rev+1)
|
|
|
|
|
+ resp, err := lkv.kv.Txn(ctx).If(cmp).Then(op).Commit()
|
|
|
|
|
+ switch {
|
|
|
|
|
+ case err != nil:
|
|
|
|
|
+ lkv.leases.Evict(key)
|
|
|
|
|
+ fallthrough
|
|
|
|
|
+ case !resp.Succeeded:
|
|
|
|
|
+ if wc != nil {
|
|
|
|
|
+ close(wc)
|
|
|
|
|
+ }
|
|
|
|
|
+ return nil, nil, err
|
|
|
|
|
+ }
|
|
|
|
|
+ return resp, wc, nil
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (lkv *leasingKV) put(ctx context.Context, op v3.Op) (pr *v3.PutResponse, err error) {
|
|
|
|
|
+ if err := lkv.waitSession(ctx); err != nil {
|
|
|
|
|
+ return nil, err
|
|
|
|
|
+ }
|
|
|
|
|
+ for ctx.Err() == nil {
|
|
|
|
|
+ resp, wc, err := lkv.tryModifyOp(ctx, op)
|
|
|
|
|
+ if err != nil || wc == nil {
|
|
|
|
|
+ resp, err = lkv.revoke(ctx, string(op.KeyBytes()), op)
|
|
|
|
|
+ }
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return nil, err
|
|
|
|
|
+ }
|
|
|
|
|
+ if resp.Succeeded {
|
|
|
|
|
+ lkv.leases.mu.Lock()
|
|
|
|
|
+ lkv.leases.Update(op.KeyBytes(), op.ValueBytes(), resp.Header)
|
|
|
|
|
+ lkv.leases.mu.Unlock()
|
|
|
|
|
+ pr = (*v3.PutResponse)(resp.Responses[0].GetResponsePut())
|
|
|
|
|
+ pr.Header = resp.Header
|
|
|
|
|
+ }
|
|
|
|
|
+ if wc != nil {
|
|
|
|
|
+ close(wc)
|
|
|
|
|
+ }
|
|
|
|
|
+ if resp.Succeeded {
|
|
|
|
|
+ return pr, nil
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ return nil, ctx.Err()
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (lkv *leasingKV) acquire(ctx context.Context, key string, op v3.Op) (*v3.TxnResponse, error) {
|
|
|
|
|
+ if err := lkv.waitSession(ctx); err != nil {
|
|
|
|
|
+ return nil, err
|
|
|
|
|
+ }
|
|
|
|
|
+ return lkv.kv.Txn(ctx).If(
|
|
|
|
|
+ v3.Compare(v3.CreateRevision(lkv.pfx+key), "=", 0)).
|
|
|
|
|
+ Then(
|
|
|
|
|
+ op,
|
|
|
|
|
+ v3.OpPut(lkv.pfx+key, "", v3.WithLease(lkv.leaseID()))).
|
|
|
|
|
+ Else(op).
|
|
|
|
|
+ Commit()
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (lkv *leasingKV) get(ctx context.Context, op v3.Op) (*v3.GetResponse, error) {
|
|
|
|
|
+ do := func() (*v3.GetResponse, error) {
|
|
|
|
|
+ r, err := lkv.kv.Do(ctx, op)
|
|
|
|
|
+ return r.Get(), err
|
|
|
|
|
+ }
|
|
|
|
|
+ if !lkv.readySession() {
|
|
|
|
|
+ return do()
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if resp, ok := lkv.leases.Get(ctx, op); resp != nil {
|
|
|
|
|
+ return resp, nil
|
|
|
|
|
+ } else if !ok || op.IsSerializable() {
|
|
|
|
|
+ // must be handled by server or can skip linearization
|
|
|
|
|
+ return do()
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ key := string(op.KeyBytes())
|
|
|
|
|
+ if !lkv.leases.MayAcquire(key) {
|
|
|
|
|
+ resp, err := lkv.kv.Do(ctx, op)
|
|
|
|
|
+ return resp.Get(), err
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ resp, err := lkv.acquire(ctx, key, v3.OpGet(key))
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return nil, err
|
|
|
|
|
+ }
|
|
|
|
|
+ getResp := (*v3.GetResponse)(resp.Responses[0].GetResponseRange())
|
|
|
|
|
+ getResp.Header = resp.Header
|
|
|
|
|
+ if resp.Succeeded {
|
|
|
|
|
+ getResp = lkv.leases.Add(key, getResp, op)
|
|
|
|
|
+ go lkv.monitorLease(ctx, key, resp.Header.Revision)
|
|
|
|
|
+ }
|
|
|
|
|
+ return getResp, nil
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (lkv *leasingKV) deleteRangeRPC(ctx context.Context, maxLeaseRev int64, key, end string) (*v3.DeleteResponse, error) {
|
|
|
|
|
+ lkey, lend := lkv.pfx+key, lkv.pfx+end
|
|
|
|
|
+ resp, err := lkv.kv.Txn(ctx).If(
|
|
|
|
|
+ v3.Compare(v3.CreateRevision(lkey).WithRange(lend), "<", maxLeaseRev+1),
|
|
|
|
|
+ ).Then(
|
|
|
|
|
+ v3.OpGet(key, v3.WithRange(end), v3.WithKeysOnly()),
|
|
|
|
|
+ v3.OpDelete(key, v3.WithRange(end)),
|
|
|
|
|
+ ).Commit()
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ lkv.leases.EvictRange(key, end)
|
|
|
|
|
+ return nil, err
|
|
|
|
|
+ }
|
|
|
|
|
+ if !resp.Succeeded {
|
|
|
|
|
+ return nil, nil
|
|
|
|
|
+ }
|
|
|
|
|
+ for _, kv := range resp.Responses[0].GetResponseRange().Kvs {
|
|
|
|
|
+ lkv.leases.Delete(string(kv.Key), resp.Header)
|
|
|
|
|
+ }
|
|
|
|
|
+ delResp := (*v3.DeleteResponse)(resp.Responses[1].GetResponseDeleteRange())
|
|
|
|
|
+ delResp.Header = resp.Header
|
|
|
|
|
+ return delResp, nil
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (lkv *leasingKV) deleteRange(ctx context.Context, op v3.Op) (*v3.DeleteResponse, error) {
|
|
|
|
|
+ key, end := string(op.KeyBytes()), string(op.RangeBytes())
|
|
|
|
|
+ for ctx.Err() == nil {
|
|
|
|
|
+ maxLeaseRev, err := lkv.revokeRange(ctx, key, end)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return nil, err
|
|
|
|
|
+ }
|
|
|
|
|
+ wcs := lkv.leases.LockRange(key, end)
|
|
|
|
|
+ delResp, err := lkv.deleteRangeRPC(ctx, maxLeaseRev, key, end)
|
|
|
|
|
+ closeAll(wcs)
|
|
|
|
|
+ if err != nil || delResp != nil {
|
|
|
|
|
+ return delResp, err
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ return nil, ctx.Err()
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (lkv *leasingKV) delete(ctx context.Context, op v3.Op) (dr *v3.DeleteResponse, err error) {
|
|
|
|
|
+ if err := lkv.waitSession(ctx); err != nil {
|
|
|
|
|
+ return nil, err
|
|
|
|
|
+ }
|
|
|
|
|
+ if len(op.RangeBytes()) > 0 {
|
|
|
|
|
+ return lkv.deleteRange(ctx, op)
|
|
|
|
|
+ }
|
|
|
|
|
+ key := string(op.KeyBytes())
|
|
|
|
|
+ for ctx.Err() == nil {
|
|
|
|
|
+ resp, wc, err := lkv.tryModifyOp(ctx, op)
|
|
|
|
|
+ if err != nil || wc == nil {
|
|
|
|
|
+ resp, err = lkv.revoke(ctx, key, op)
|
|
|
|
|
+ }
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ // don't know if delete was processed
|
|
|
|
|
+ lkv.leases.Evict(key)
|
|
|
|
|
+ return nil, err
|
|
|
|
|
+ }
|
|
|
|
|
+ if resp.Succeeded {
|
|
|
|
|
+ dr = (*v3.DeleteResponse)(resp.Responses[0].GetResponseDeleteRange())
|
|
|
|
|
+ dr.Header = resp.Header
|
|
|
|
|
+ lkv.leases.Delete(key, dr.Header)
|
|
|
|
|
+ }
|
|
|
|
|
+ if wc != nil {
|
|
|
|
|
+ close(wc)
|
|
|
|
|
+ }
|
|
|
|
|
+ if resp.Succeeded {
|
|
|
|
|
+ return dr, nil
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ return nil, ctx.Err()
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (lkv *leasingKV) revoke(ctx context.Context, key string, op v3.Op) (*v3.TxnResponse, error) {
|
|
|
|
|
+ rev := lkv.leases.Rev(key)
|
|
|
|
|
+ txn := lkv.kv.Txn(ctx).If(v3.Compare(v3.CreateRevision(lkv.pfx+key), "<", rev+1)).Then(op)
|
|
|
|
|
+ resp, err := txn.Else(v3.OpPut(lkv.pfx+key, "REVOKE", v3.WithIgnoreLease())).Commit()
|
|
|
|
|
+ if err != nil || resp.Succeeded {
|
|
|
|
|
+ return resp, err
|
|
|
|
|
+ }
|
|
|
|
|
+ return resp, lkv.waitRescind(ctx, key, resp.Header.Revision)
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (lkv *leasingKV) revokeRange(ctx context.Context, begin, end string) (int64, error) {
|
|
|
|
|
+ lkey, lend := lkv.pfx+begin, ""
|
|
|
|
|
+ if len(end) > 0 {
|
|
|
|
|
+ lend = lkv.pfx + end
|
|
|
|
|
+ }
|
|
|
|
|
+ leaseKeys, err := lkv.kv.Get(ctx, lkey, v3.WithRange(lend))
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return 0, err
|
|
|
|
|
+ }
|
|
|
|
|
+ return lkv.revokeLeaseKvs(ctx, leaseKeys.Kvs)
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (lkv *leasingKV) revokeLeaseKvs(ctx context.Context, kvs []*mvccpb.KeyValue) (int64, error) {
|
|
|
|
|
+ maxLeaseRev := int64(0)
|
|
|
|
|
+ for _, kv := range kvs {
|
|
|
|
|
+ if rev := kv.CreateRevision; rev > maxLeaseRev {
|
|
|
|
|
+ maxLeaseRev = rev
|
|
|
|
|
+ }
|
|
|
|
|
+ if v3.LeaseID(kv.Lease) == lkv.leaseID() {
|
|
|
|
|
+ // don't revoke own keys
|
|
|
|
|
+ continue
|
|
|
|
|
+ }
|
|
|
|
|
+ key := strings.TrimPrefix(string(kv.Key), lkv.pfx)
|
|
|
|
|
+ if _, err := lkv.revoke(ctx, key, v3.OpGet(key)); err != nil {
|
|
|
|
|
+ return 0, err
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ return maxLeaseRev, nil
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (lkv *leasingKV) waitSession(ctx context.Context) error {
|
|
|
|
|
+ select {
|
|
|
|
|
+ case <-lkv.sessionc:
|
|
|
|
|
+ return nil
|
|
|
|
|
+ case <-lkv.ctx.Done():
|
|
|
|
|
+ return lkv.ctx.Err()
|
|
|
|
|
+ case <-ctx.Done():
|
|
|
|
|
+ return ctx.Err()
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (lkv *leasingKV) readySession() bool {
|
|
|
|
|
+ lkv.leases.mu.RLock()
|
|
|
|
|
+ defer lkv.leases.mu.RUnlock()
|
|
|
|
|
+ if lkv.session == nil {
|
|
|
|
|
+ return false
|
|
|
|
|
+ }
|
|
|
|
|
+ select {
|
|
|
|
|
+ case <-lkv.session.Done():
|
|
|
|
|
+ default:
|
|
|
|
|
+ return true
|
|
|
|
|
+ }
|
|
|
|
|
+ return false
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (lkv *leasingKV) leaseID() v3.LeaseID {
|
|
|
|
|
+ lkv.leases.mu.RLock()
|
|
|
|
|
+ defer lkv.leases.mu.RUnlock()
|
|
|
|
|
+ return lkv.session.Lease()
|
|
|
|
|
+}
|