cache.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. // Copyright 2017 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package leasing
  15. import (
  16. "context"
  17. "strings"
  18. "sync"
  19. "time"
  20. v3 "github.com/coreos/etcd/clientv3"
  21. v3pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  22. "github.com/coreos/etcd/mvcc/mvccpb"
  23. )
  24. const revokeBackoff = 2 * time.Second
  25. type leaseCache struct {
  26. mu sync.RWMutex
  27. entries map[string]*leaseKey
  28. revokes map[string]time.Time
  29. header *v3pb.ResponseHeader
  30. }
  31. type leaseKey struct {
  32. response *v3.GetResponse
  33. // rev is the leasing key revision.
  34. rev int64
  35. waitc chan struct{}
  36. }
  37. func (lc *leaseCache) Rev(key string) int64 {
  38. lc.mu.RLock()
  39. defer lc.mu.RUnlock()
  40. if li := lc.entries[key]; li != nil {
  41. return li.rev
  42. }
  43. return 0
  44. }
  45. func (lc *leaseCache) Lock(key string) (chan<- struct{}, int64) {
  46. lc.mu.Lock()
  47. defer lc.mu.Unlock()
  48. if li := lc.entries[key]; li != nil {
  49. li.waitc = make(chan struct{})
  50. return li.waitc, li.rev
  51. }
  52. return nil, 0
  53. }
  54. func (lc *leaseCache) LockRange(begin, end string) (ret []chan<- struct{}) {
  55. lc.mu.Lock()
  56. defer lc.mu.Unlock()
  57. for k, li := range lc.entries {
  58. if inRange(k, begin, end) {
  59. li.waitc = make(chan struct{})
  60. ret = append(ret, li.waitc)
  61. }
  62. }
  63. return ret
  64. }
  65. func inRange(k, begin, end string) bool {
  66. if strings.Compare(k, begin) < 0 {
  67. return false
  68. }
  69. if end != "\x00" && strings.Compare(k, end) >= 0 {
  70. return false
  71. }
  72. return true
  73. }
  74. func (lc *leaseCache) LockWriteOps(ops []v3.Op) (ret []chan<- struct{}) {
  75. for _, op := range ops {
  76. if op.IsGet() {
  77. continue
  78. }
  79. key := string(op.KeyBytes())
  80. if end := string(op.RangeBytes()); end == "" {
  81. if wc, _ := lc.Lock(key); wc != nil {
  82. ret = append(ret, wc)
  83. }
  84. } else {
  85. for k := range lc.entries {
  86. if !inRange(k, key, end) {
  87. continue
  88. }
  89. if wc, _ := lc.Lock(k); wc != nil {
  90. ret = append(ret, wc)
  91. }
  92. }
  93. }
  94. }
  95. return ret
  96. }
  97. func (lc *leaseCache) NotifyOps(ops []v3.Op) (wcs []<-chan struct{}) {
  98. for _, op := range ops {
  99. if op.IsGet() {
  100. if _, wc := lc.notify(string(op.KeyBytes())); wc != nil {
  101. wcs = append(wcs, wc)
  102. }
  103. }
  104. }
  105. return wcs
  106. }
  107. func (lc *leaseCache) MayAcquire(key string) bool {
  108. lc.mu.RLock()
  109. lr, ok := lc.revokes[key]
  110. lc.mu.RUnlock()
  111. return !ok || time.Since(lr) > revokeBackoff
  112. }
  113. func (lc *leaseCache) Add(key string, resp *v3.GetResponse, op v3.Op) *v3.GetResponse {
  114. lk := &leaseKey{resp, resp.Header.Revision, closedCh}
  115. lc.mu.Lock()
  116. if lc.header == nil || lc.header.Revision < resp.Header.Revision {
  117. lc.header = resp.Header
  118. }
  119. lc.entries[key] = lk
  120. ret := lk.get(op)
  121. lc.mu.Unlock()
  122. return ret
  123. }
  124. func (lc *leaseCache) Update(key, val []byte, respHeader *v3pb.ResponseHeader) {
  125. li := lc.entries[string(key)]
  126. if li == nil {
  127. return
  128. }
  129. cacheResp := li.response
  130. if len(cacheResp.Kvs) == 0 {
  131. kv := &mvccpb.KeyValue{
  132. Key: key,
  133. CreateRevision: respHeader.Revision,
  134. }
  135. cacheResp.Kvs = append(cacheResp.Kvs, kv)
  136. cacheResp.Count = 1
  137. }
  138. cacheResp.Kvs[0].Version++
  139. if cacheResp.Kvs[0].ModRevision < respHeader.Revision {
  140. cacheResp.Header = respHeader
  141. cacheResp.Kvs[0].ModRevision = respHeader.Revision
  142. cacheResp.Kvs[0].Value = val
  143. }
  144. }
  145. func (lc *leaseCache) Delete(key string, hdr *v3pb.ResponseHeader) {
  146. lc.mu.Lock()
  147. defer lc.mu.Unlock()
  148. lc.delete(key, hdr)
  149. }
  150. func (lc *leaseCache) delete(key string, hdr *v3pb.ResponseHeader) {
  151. if li := lc.entries[key]; li != nil && hdr.Revision >= li.response.Header.Revision {
  152. li.response.Kvs = nil
  153. li.response.Header = copyHeader(hdr)
  154. }
  155. }
  156. func (lc *leaseCache) Evict(key string) (rev int64) {
  157. lc.mu.Lock()
  158. defer lc.mu.Unlock()
  159. if li := lc.entries[key]; li != nil {
  160. rev = li.rev
  161. delete(lc.entries, key)
  162. lc.revokes[key] = time.Now()
  163. }
  164. return rev
  165. }
  166. func (lc *leaseCache) EvictRange(key, end string) {
  167. lc.mu.Lock()
  168. defer lc.mu.Unlock()
  169. for k := range lc.entries {
  170. if inRange(k, key, end) {
  171. delete(lc.entries, key)
  172. lc.revokes[key] = time.Now()
  173. }
  174. }
  175. }
  176. func isBadOp(op v3.Op) bool { return op.Rev() > 0 || len(op.RangeBytes()) > 0 }
  177. func (lc *leaseCache) Get(ctx context.Context, op v3.Op) (*v3.GetResponse, bool) {
  178. if isBadOp(op) {
  179. return nil, false
  180. }
  181. key := string(op.KeyBytes())
  182. li, wc := lc.notify(key)
  183. if li == nil {
  184. return nil, true
  185. }
  186. select {
  187. case <-wc:
  188. case <-ctx.Done():
  189. return nil, true
  190. }
  191. lc.mu.RLock()
  192. lk := *li
  193. ret := lk.get(op)
  194. lc.mu.RUnlock()
  195. return ret, true
  196. }
  197. func (lk *leaseKey) get(op v3.Op) *v3.GetResponse {
  198. ret := *lk.response
  199. ret.Header = copyHeader(ret.Header)
  200. empty := len(ret.Kvs) == 0 || op.IsCountOnly()
  201. empty = empty || (op.MinModRev() > ret.Kvs[0].ModRevision)
  202. empty = empty || (op.MaxModRev() != 0 && op.MaxModRev() < ret.Kvs[0].ModRevision)
  203. empty = empty || (op.MinCreateRev() > ret.Kvs[0].CreateRevision)
  204. empty = empty || (op.MaxCreateRev() != 0 && op.MaxCreateRev() < ret.Kvs[0].CreateRevision)
  205. if empty {
  206. ret.Kvs = nil
  207. } else {
  208. kv := *ret.Kvs[0]
  209. kv.Key = make([]byte, len(kv.Key))
  210. copy(kv.Key, ret.Kvs[0].Key)
  211. if !op.IsKeysOnly() {
  212. kv.Value = make([]byte, len(kv.Value))
  213. copy(kv.Value, ret.Kvs[0].Value)
  214. }
  215. ret.Kvs = []*mvccpb.KeyValue{&kv}
  216. }
  217. return &ret
  218. }
  219. func (lc *leaseCache) notify(key string) (*leaseKey, <-chan struct{}) {
  220. lc.mu.RLock()
  221. defer lc.mu.RUnlock()
  222. if li := lc.entries[key]; li != nil {
  223. return li, li.waitc
  224. }
  225. return nil, nil
  226. }
  227. func (lc *leaseCache) clearOldRevokes(ctx context.Context) {
  228. for {
  229. select {
  230. case <-ctx.Done():
  231. return
  232. case <-time.After(time.Second):
  233. lc.mu.Lock()
  234. for k, lr := range lc.revokes {
  235. if time.Now().Sub(lr.Add(revokeBackoff)) > 0 {
  236. delete(lc.revokes, k)
  237. }
  238. }
  239. lc.mu.Unlock()
  240. }
  241. }
  242. }
  243. func (lc *leaseCache) evalCmp(cmps []v3.Cmp) (cmpVal bool, ok bool) {
  244. for _, cmp := range cmps {
  245. if len(cmp.RangeEnd) > 0 {
  246. return false, false
  247. }
  248. lk := lc.entries[string(cmp.Key)]
  249. if lk == nil {
  250. return false, false
  251. }
  252. if !evalCmp(lk.response, cmp) {
  253. return false, true
  254. }
  255. }
  256. return true, true
  257. }
  258. func (lc *leaseCache) evalOps(ops []v3.Op) ([]*v3pb.ResponseOp, bool) {
  259. resps := make([]*v3pb.ResponseOp, len(ops))
  260. for i, op := range ops {
  261. if !op.IsGet() || isBadOp(op) {
  262. // TODO: support read-only Txn
  263. return nil, false
  264. }
  265. lk := lc.entries[string(op.KeyBytes())]
  266. if lk == nil {
  267. return nil, false
  268. }
  269. resp := lk.get(op)
  270. if resp == nil {
  271. return nil, false
  272. }
  273. resps[i] = &v3pb.ResponseOp{
  274. Response: &v3pb.ResponseOp_ResponseRange{
  275. (*v3pb.RangeResponse)(resp),
  276. },
  277. }
  278. }
  279. return resps, true
  280. }