123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479 |
- // Copyright 2017 The etcd Authors
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package leasing
- import (
- "context"
- "strings"
- "sync"
- "time"
- v3 "go.etcd.io/etcd/clientv3"
- "go.etcd.io/etcd/clientv3/concurrency"
- "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
- pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
- "go.etcd.io/etcd/mvcc/mvccpb"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
- )
- type leasingKV struct {
- cl *v3.Client
- kv v3.KV
- pfx string
- leases leaseCache
- ctx context.Context
- cancel context.CancelFunc
- wg sync.WaitGroup
- sessionOpts []concurrency.SessionOption
- session *concurrency.Session
- sessionc chan struct{}
- }
- var closedCh chan struct{}
- func init() {
- closedCh = make(chan struct{})
- close(closedCh)
- }
- // NewKV wraps a KV instance so that all requests are wired through a leasing protocol.
- func NewKV(cl *v3.Client, pfx string, opts ...concurrency.SessionOption) (v3.KV, func(), error) {
- cctx, cancel := context.WithCancel(cl.Ctx())
- lkv := &leasingKV{
- cl: cl,
- kv: cl.KV,
- pfx: pfx,
- leases: leaseCache{revokes: make(map[string]time.Time)},
- ctx: cctx,
- cancel: cancel,
- sessionOpts: opts,
- sessionc: make(chan struct{}),
- }
- lkv.wg.Add(2)
- go func() {
- defer lkv.wg.Done()
- lkv.monitorSession()
- }()
- go func() {
- defer lkv.wg.Done()
- lkv.leases.clearOldRevokes(cctx)
- }()
- return lkv, lkv.Close, lkv.waitSession(cctx)
- }
- func (lkv *leasingKV) Close() {
- lkv.cancel()
- lkv.wg.Wait()
- }
- func (lkv *leasingKV) Get(ctx context.Context, key string, opts ...v3.OpOption) (*v3.GetResponse, error) {
- return lkv.get(ctx, v3.OpGet(key, opts...))
- }
- func (lkv *leasingKV) Put(ctx context.Context, key, val string, opts ...v3.OpOption) (*v3.PutResponse, error) {
- return lkv.put(ctx, v3.OpPut(key, val, opts...))
- }
- func (lkv *leasingKV) Delete(ctx context.Context, key string, opts ...v3.OpOption) (*v3.DeleteResponse, error) {
- return lkv.delete(ctx, v3.OpDelete(key, opts...))
- }
- func (lkv *leasingKV) Do(ctx context.Context, op v3.Op) (v3.OpResponse, error) {
- switch {
- case op.IsGet():
- resp, err := lkv.get(ctx, op)
- return resp.OpResponse(), err
- case op.IsPut():
- resp, err := lkv.put(ctx, op)
- return resp.OpResponse(), err
- case op.IsDelete():
- resp, err := lkv.delete(ctx, op)
- return resp.OpResponse(), err
- case op.IsTxn():
- cmps, thenOps, elseOps := op.Txn()
- resp, err := lkv.Txn(ctx).If(cmps...).Then(thenOps...).Else(elseOps...).Commit()
- return resp.OpResponse(), err
- }
- return v3.OpResponse{}, nil
- }
- func (lkv *leasingKV) Compact(ctx context.Context, rev int64, opts ...v3.CompactOption) (*v3.CompactResponse, error) {
- return lkv.kv.Compact(ctx, rev, opts...)
- }
- func (lkv *leasingKV) Txn(ctx context.Context) v3.Txn {
- return &txnLeasing{Txn: lkv.kv.Txn(ctx), lkv: lkv, ctx: ctx}
- }
- func (lkv *leasingKV) monitorSession() {
- for lkv.ctx.Err() == nil {
- if lkv.session != nil {
- select {
- case <-lkv.session.Done():
- case <-lkv.ctx.Done():
- return
- }
- }
- lkv.leases.mu.Lock()
- select {
- case <-lkv.sessionc:
- lkv.sessionc = make(chan struct{})
- default:
- }
- lkv.leases.entries = make(map[string]*leaseKey)
- lkv.leases.mu.Unlock()
- s, err := concurrency.NewSession(lkv.cl, lkv.sessionOpts...)
- if err != nil {
- continue
- }
- lkv.leases.mu.Lock()
- lkv.session = s
- close(lkv.sessionc)
- lkv.leases.mu.Unlock()
- }
- }
- func (lkv *leasingKV) monitorLease(ctx context.Context, key string, rev int64) {
- cctx, cancel := context.WithCancel(lkv.ctx)
- defer cancel()
- for cctx.Err() == nil {
- if rev == 0 {
- resp, err := lkv.kv.Get(ctx, lkv.pfx+key)
- if err != nil {
- continue
- }
- rev = resp.Header.Revision
- if len(resp.Kvs) == 0 || string(resp.Kvs[0].Value) == "REVOKE" {
- lkv.rescind(cctx, key, rev)
- return
- }
- }
- wch := lkv.cl.Watch(cctx, lkv.pfx+key, v3.WithRev(rev+1))
- for resp := range wch {
- for _, ev := range resp.Events {
- if string(ev.Kv.Value) != "REVOKE" {
- continue
- }
- if v3.LeaseID(ev.Kv.Lease) == lkv.leaseID() {
- lkv.rescind(cctx, key, ev.Kv.ModRevision)
- }
- return
- }
- }
- rev = 0
- }
- }
- // rescind releases a lease from this client.
- func (lkv *leasingKV) rescind(ctx context.Context, key string, rev int64) {
- if lkv.leases.Evict(key) > rev {
- return
- }
- cmp := v3.Compare(v3.CreateRevision(lkv.pfx+key), "<", rev)
- op := v3.OpDelete(lkv.pfx + key)
- for ctx.Err() == nil {
- if _, err := lkv.kv.Txn(ctx).If(cmp).Then(op).Commit(); err == nil {
- return
- }
- }
- }
- func (lkv *leasingKV) waitRescind(ctx context.Context, key string, rev int64) error {
- cctx, cancel := context.WithCancel(ctx)
- defer cancel()
- wch := lkv.cl.Watch(cctx, lkv.pfx+key, v3.WithRev(rev+1))
- for resp := range wch {
- for _, ev := range resp.Events {
- if ev.Type == v3.EventTypeDelete {
- return ctx.Err()
- }
- }
- }
- return ctx.Err()
- }
- func (lkv *leasingKV) tryModifyOp(ctx context.Context, op v3.Op) (*v3.TxnResponse, chan<- struct{}, error) {
- key := string(op.KeyBytes())
- wc, rev := lkv.leases.Lock(key)
- cmp := v3.Compare(v3.CreateRevision(lkv.pfx+key), "<", rev+1)
- resp, err := lkv.kv.Txn(ctx).If(cmp).Then(op).Commit()
- switch {
- case err != nil:
- lkv.leases.Evict(key)
- fallthrough
- case !resp.Succeeded:
- if wc != nil {
- close(wc)
- }
- return nil, nil, err
- }
- return resp, wc, nil
- }
- func (lkv *leasingKV) put(ctx context.Context, op v3.Op) (pr *v3.PutResponse, err error) {
- if err := lkv.waitSession(ctx); err != nil {
- return nil, err
- }
- for ctx.Err() == nil {
- resp, wc, err := lkv.tryModifyOp(ctx, op)
- if err != nil || wc == nil {
- resp, err = lkv.revoke(ctx, string(op.KeyBytes()), op)
- }
- if err != nil {
- return nil, err
- }
- if resp.Succeeded {
- lkv.leases.mu.Lock()
- lkv.leases.Update(op.KeyBytes(), op.ValueBytes(), resp.Header)
- lkv.leases.mu.Unlock()
- pr = (*v3.PutResponse)(resp.Responses[0].GetResponsePut())
- pr.Header = resp.Header
- }
- if wc != nil {
- close(wc)
- }
- if resp.Succeeded {
- return pr, nil
- }
- }
- return nil, ctx.Err()
- }
- func (lkv *leasingKV) acquire(ctx context.Context, key string, op v3.Op) (*v3.TxnResponse, error) {
- for ctx.Err() == nil {
- if err := lkv.waitSession(ctx); err != nil {
- return nil, err
- }
- lcmp := v3.Cmp{Key: []byte(key), Target: pb.Compare_LEASE}
- resp, err := lkv.kv.Txn(ctx).If(
- v3.Compare(v3.CreateRevision(lkv.pfx+key), "=", 0),
- v3.Compare(lcmp, "=", 0)).
- Then(
- op,
- v3.OpPut(lkv.pfx+key, "", v3.WithLease(lkv.leaseID()))).
- Else(
- op,
- v3.OpGet(lkv.pfx+key),
- ).Commit()
- if err == nil {
- if !resp.Succeeded {
- kvs := resp.Responses[1].GetResponseRange().Kvs
- // if txn failed since already owner, lease is acquired
- resp.Succeeded = len(kvs) > 0 && v3.LeaseID(kvs[0].Lease) == lkv.leaseID()
- }
- return resp, nil
- }
- // retry if transient error
- if _, ok := err.(rpctypes.EtcdError); ok {
- return nil, err
- }
- if ev, ok := status.FromError(err); ok && ev.Code() != codes.Unavailable {
- return nil, err
- }
- }
- return nil, ctx.Err()
- }
- func (lkv *leasingKV) get(ctx context.Context, op v3.Op) (*v3.GetResponse, error) {
- do := func() (*v3.GetResponse, error) {
- r, err := lkv.kv.Do(ctx, op)
- return r.Get(), err
- }
- if !lkv.readySession() {
- return do()
- }
- if resp, ok := lkv.leases.Get(ctx, op); resp != nil {
- return resp, nil
- } else if !ok || op.IsSerializable() {
- // must be handled by server or can skip linearization
- return do()
- }
- key := string(op.KeyBytes())
- if !lkv.leases.MayAcquire(key) {
- resp, err := lkv.kv.Do(ctx, op)
- return resp.Get(), err
- }
- resp, err := lkv.acquire(ctx, key, v3.OpGet(key))
- if err != nil {
- return nil, err
- }
- getResp := (*v3.GetResponse)(resp.Responses[0].GetResponseRange())
- getResp.Header = resp.Header
- if resp.Succeeded {
- getResp = lkv.leases.Add(key, getResp, op)
- lkv.wg.Add(1)
- go func() {
- defer lkv.wg.Done()
- lkv.monitorLease(ctx, key, resp.Header.Revision)
- }()
- }
- return getResp, nil
- }
- func (lkv *leasingKV) deleteRangeRPC(ctx context.Context, maxLeaseRev int64, key, end string) (*v3.DeleteResponse, error) {
- lkey, lend := lkv.pfx+key, lkv.pfx+end
- resp, err := lkv.kv.Txn(ctx).If(
- v3.Compare(v3.CreateRevision(lkey).WithRange(lend), "<", maxLeaseRev+1),
- ).Then(
- v3.OpGet(key, v3.WithRange(end), v3.WithKeysOnly()),
- v3.OpDelete(key, v3.WithRange(end)),
- ).Commit()
- if err != nil {
- lkv.leases.EvictRange(key, end)
- return nil, err
- }
- if !resp.Succeeded {
- return nil, nil
- }
- for _, kv := range resp.Responses[0].GetResponseRange().Kvs {
- lkv.leases.Delete(string(kv.Key), resp.Header)
- }
- delResp := (*v3.DeleteResponse)(resp.Responses[1].GetResponseDeleteRange())
- delResp.Header = resp.Header
- return delResp, nil
- }
- func (lkv *leasingKV) deleteRange(ctx context.Context, op v3.Op) (*v3.DeleteResponse, error) {
- key, end := string(op.KeyBytes()), string(op.RangeBytes())
- for ctx.Err() == nil {
- maxLeaseRev, err := lkv.revokeRange(ctx, key, end)
- if err != nil {
- return nil, err
- }
- wcs := lkv.leases.LockRange(key, end)
- delResp, err := lkv.deleteRangeRPC(ctx, maxLeaseRev, key, end)
- closeAll(wcs)
- if err != nil || delResp != nil {
- return delResp, err
- }
- }
- return nil, ctx.Err()
- }
- func (lkv *leasingKV) delete(ctx context.Context, op v3.Op) (dr *v3.DeleteResponse, err error) {
- if err := lkv.waitSession(ctx); err != nil {
- return nil, err
- }
- if len(op.RangeBytes()) > 0 {
- return lkv.deleteRange(ctx, op)
- }
- key := string(op.KeyBytes())
- for ctx.Err() == nil {
- resp, wc, err := lkv.tryModifyOp(ctx, op)
- if err != nil || wc == nil {
- resp, err = lkv.revoke(ctx, key, op)
- }
- if err != nil {
- // don't know if delete was processed
- lkv.leases.Evict(key)
- return nil, err
- }
- if resp.Succeeded {
- dr = (*v3.DeleteResponse)(resp.Responses[0].GetResponseDeleteRange())
- dr.Header = resp.Header
- lkv.leases.Delete(key, dr.Header)
- }
- if wc != nil {
- close(wc)
- }
- if resp.Succeeded {
- return dr, nil
- }
- }
- return nil, ctx.Err()
- }
- func (lkv *leasingKV) revoke(ctx context.Context, key string, op v3.Op) (*v3.TxnResponse, error) {
- rev := lkv.leases.Rev(key)
- txn := lkv.kv.Txn(ctx).If(v3.Compare(v3.CreateRevision(lkv.pfx+key), "<", rev+1)).Then(op)
- resp, err := txn.Else(v3.OpPut(lkv.pfx+key, "REVOKE", v3.WithIgnoreLease())).Commit()
- if err != nil || resp.Succeeded {
- return resp, err
- }
- return resp, lkv.waitRescind(ctx, key, resp.Header.Revision)
- }
- func (lkv *leasingKV) revokeRange(ctx context.Context, begin, end string) (int64, error) {
- lkey, lend := lkv.pfx+begin, ""
- if len(end) > 0 {
- lend = lkv.pfx + end
- }
- leaseKeys, err := lkv.kv.Get(ctx, lkey, v3.WithRange(lend))
- if err != nil {
- return 0, err
- }
- return lkv.revokeLeaseKvs(ctx, leaseKeys.Kvs)
- }
- func (lkv *leasingKV) revokeLeaseKvs(ctx context.Context, kvs []*mvccpb.KeyValue) (int64, error) {
- maxLeaseRev := int64(0)
- for _, kv := range kvs {
- if rev := kv.CreateRevision; rev > maxLeaseRev {
- maxLeaseRev = rev
- }
- if v3.LeaseID(kv.Lease) == lkv.leaseID() {
- // don't revoke own keys
- continue
- }
- key := strings.TrimPrefix(string(kv.Key), lkv.pfx)
- if _, err := lkv.revoke(ctx, key, v3.OpGet(key)); err != nil {
- return 0, err
- }
- }
- return maxLeaseRev, nil
- }
- func (lkv *leasingKV) waitSession(ctx context.Context) error {
- lkv.leases.mu.RLock()
- sessionc := lkv.sessionc
- lkv.leases.mu.RUnlock()
- select {
- case <-sessionc:
- return nil
- case <-lkv.ctx.Done():
- return lkv.ctx.Err()
- case <-ctx.Done():
- return ctx.Err()
- }
- }
- func (lkv *leasingKV) readySession() bool {
- lkv.leases.mu.RLock()
- defer lkv.leases.mu.RUnlock()
- if lkv.session == nil {
- return false
- }
- select {
- case <-lkv.session.Done():
- default:
- return true
- }
- return false
- }
- func (lkv *leasingKV) leaseID() v3.LeaseID {
- lkv.leases.mu.RLock()
- defer lkv.leases.mu.RUnlock()
- return lkv.session.Lease()
- }
|