123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306 |
- // Copyright 2017 The etcd Authors
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package leasing
- import (
- "context"
- "strings"
- "sync"
- "time"
- v3 "github.com/coreos/etcd/clientv3"
- v3pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
- "github.com/coreos/etcd/mvcc/mvccpb"
- )
- const revokeBackoff = 2 * time.Second
- type leaseCache struct {
- mu sync.RWMutex
- entries map[string]*leaseKey
- revokes map[string]time.Time
- header *v3pb.ResponseHeader
- }
- type leaseKey struct {
- response *v3.GetResponse
- // rev is the leasing key revision.
- rev int64
- waitc chan struct{}
- }
- func (lc *leaseCache) Rev(key string) int64 {
- lc.mu.RLock()
- defer lc.mu.RUnlock()
- if li := lc.entries[key]; li != nil {
- return li.rev
- }
- return 0
- }
- func (lc *leaseCache) Lock(key string) (chan<- struct{}, int64) {
- lc.mu.Lock()
- defer lc.mu.Unlock()
- if li := lc.entries[key]; li != nil {
- li.waitc = make(chan struct{})
- return li.waitc, li.rev
- }
- return nil, 0
- }
- func (lc *leaseCache) LockRange(begin, end string) (ret []chan<- struct{}) {
- lc.mu.Lock()
- defer lc.mu.Unlock()
- for k, li := range lc.entries {
- if inRange(k, begin, end) {
- li.waitc = make(chan struct{})
- ret = append(ret, li.waitc)
- }
- }
- return ret
- }
- func inRange(k, begin, end string) bool {
- if strings.Compare(k, begin) < 0 {
- return false
- }
- if end != "\x00" && strings.Compare(k, end) >= 0 {
- return false
- }
- return true
- }
- func (lc *leaseCache) LockWriteOps(ops []v3.Op) (ret []chan<- struct{}) {
- for _, op := range ops {
- if op.IsGet() {
- continue
- }
- key := string(op.KeyBytes())
- if end := string(op.RangeBytes()); end == "" {
- if wc, _ := lc.Lock(key); wc != nil {
- ret = append(ret, wc)
- }
- } else {
- for k := range lc.entries {
- if !inRange(k, key, end) {
- continue
- }
- if wc, _ := lc.Lock(k); wc != nil {
- ret = append(ret, wc)
- }
- }
- }
- }
- return ret
- }
- func (lc *leaseCache) NotifyOps(ops []v3.Op) (wcs []<-chan struct{}) {
- for _, op := range ops {
- if op.IsGet() {
- if _, wc := lc.notify(string(op.KeyBytes())); wc != nil {
- wcs = append(wcs, wc)
- }
- }
- }
- return wcs
- }
- func (lc *leaseCache) MayAcquire(key string) bool {
- lc.mu.RLock()
- lr, ok := lc.revokes[key]
- lc.mu.RUnlock()
- return !ok || time.Since(lr) > revokeBackoff
- }
- func (lc *leaseCache) Add(key string, resp *v3.GetResponse, op v3.Op) *v3.GetResponse {
- lk := &leaseKey{resp, resp.Header.Revision, closedCh}
- lc.mu.Lock()
- if lc.header == nil || lc.header.Revision < resp.Header.Revision {
- lc.header = resp.Header
- }
- lc.entries[key] = lk
- ret := lk.get(op)
- lc.mu.Unlock()
- return ret
- }
- func (lc *leaseCache) Update(key, val []byte, respHeader *v3pb.ResponseHeader) {
- li := lc.entries[string(key)]
- if li == nil {
- return
- }
- cacheResp := li.response
- if len(cacheResp.Kvs) == 0 {
- kv := &mvccpb.KeyValue{
- Key: key,
- CreateRevision: respHeader.Revision,
- }
- cacheResp.Kvs = append(cacheResp.Kvs, kv)
- cacheResp.Count = 1
- }
- cacheResp.Kvs[0].Version++
- if cacheResp.Kvs[0].ModRevision < respHeader.Revision {
- cacheResp.Header = respHeader
- cacheResp.Kvs[0].ModRevision = respHeader.Revision
- cacheResp.Kvs[0].Value = val
- }
- }
- func (lc *leaseCache) Delete(key string, hdr *v3pb.ResponseHeader) {
- lc.mu.Lock()
- defer lc.mu.Unlock()
- lc.delete(key, hdr)
- }
- func (lc *leaseCache) delete(key string, hdr *v3pb.ResponseHeader) {
- if li := lc.entries[key]; li != nil && hdr.Revision >= li.response.Header.Revision {
- li.response.Kvs = nil
- li.response.Header = copyHeader(hdr)
- }
- }
- func (lc *leaseCache) Evict(key string) (rev int64) {
- lc.mu.Lock()
- defer lc.mu.Unlock()
- if li := lc.entries[key]; li != nil {
- rev = li.rev
- delete(lc.entries, key)
- lc.revokes[key] = time.Now()
- }
- return rev
- }
- func (lc *leaseCache) EvictRange(key, end string) {
- lc.mu.Lock()
- defer lc.mu.Unlock()
- for k := range lc.entries {
- if inRange(k, key, end) {
- delete(lc.entries, key)
- lc.revokes[key] = time.Now()
- }
- }
- }
- func isBadOp(op v3.Op) bool { return op.Rev() > 0 || len(op.RangeBytes()) > 0 }
- func (lc *leaseCache) Get(ctx context.Context, op v3.Op) (*v3.GetResponse, bool) {
- if isBadOp(op) {
- return nil, false
- }
- key := string(op.KeyBytes())
- li, wc := lc.notify(key)
- if li == nil {
- return nil, true
- }
- select {
- case <-wc:
- case <-ctx.Done():
- return nil, true
- }
- lc.mu.RLock()
- lk := *li
- ret := lk.get(op)
- lc.mu.RUnlock()
- return ret, true
- }
- func (lk *leaseKey) get(op v3.Op) *v3.GetResponse {
- ret := *lk.response
- ret.Header = copyHeader(ret.Header)
- empty := len(ret.Kvs) == 0 || op.IsCountOnly()
- empty = empty || (op.MinModRev() > ret.Kvs[0].ModRevision)
- empty = empty || (op.MaxModRev() != 0 && op.MaxModRev() < ret.Kvs[0].ModRevision)
- empty = empty || (op.MinCreateRev() > ret.Kvs[0].CreateRevision)
- empty = empty || (op.MaxCreateRev() != 0 && op.MaxCreateRev() < ret.Kvs[0].CreateRevision)
- if empty {
- ret.Kvs = nil
- } else {
- kv := *ret.Kvs[0]
- kv.Key = make([]byte, len(kv.Key))
- copy(kv.Key, ret.Kvs[0].Key)
- if !op.IsKeysOnly() {
- kv.Value = make([]byte, len(kv.Value))
- copy(kv.Value, ret.Kvs[0].Value)
- }
- ret.Kvs = []*mvccpb.KeyValue{&kv}
- }
- return &ret
- }
- func (lc *leaseCache) notify(key string) (*leaseKey, <-chan struct{}) {
- lc.mu.RLock()
- defer lc.mu.RUnlock()
- if li := lc.entries[key]; li != nil {
- return li, li.waitc
- }
- return nil, nil
- }
- func (lc *leaseCache) clearOldRevokes(ctx context.Context) {
- for {
- select {
- case <-ctx.Done():
- return
- case <-time.After(time.Second):
- lc.mu.Lock()
- for k, lr := range lc.revokes {
- if time.Now().Sub(lr.Add(revokeBackoff)) > 0 {
- delete(lc.revokes, k)
- }
- }
- lc.mu.Unlock()
- }
- }
- }
- func (lc *leaseCache) evalCmp(cmps []v3.Cmp) (cmpVal bool, ok bool) {
- for _, cmp := range cmps {
- if len(cmp.RangeEnd) > 0 {
- return false, false
- }
- lk := lc.entries[string(cmp.Key)]
- if lk == nil {
- return false, false
- }
- if !evalCmp(lk.response, cmp) {
- return false, true
- }
- }
- return true, true
- }
- func (lc *leaseCache) evalOps(ops []v3.Op) ([]*v3pb.ResponseOp, bool) {
- resps := make([]*v3pb.ResponseOp, len(ops))
- for i, op := range ops {
- if !op.IsGet() || isBadOp(op) {
- // TODO: support read-only Txn
- return nil, false
- }
- lk := lc.entries[string(op.KeyBytes())]
- if lk == nil {
- return nil, false
- }
- resp := lk.get(op)
- if resp == nil {
- return nil, false
- }
- resps[i] = &v3pb.ResponseOp{
- Response: &v3pb.ResponseOp_ResponseRange{
- (*v3pb.RangeResponse)(resp),
- },
- }
- }
- return resps, true
- }
|