kv.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479
  1. // Copyright 2017 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package leasing
  15. import (
  16. "context"
  17. "strings"
  18. "sync"
  19. "time"
  20. v3 "go.etcd.io/etcd/clientv3"
  21. "go.etcd.io/etcd/clientv3/concurrency"
  22. "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
  23. pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
  24. "go.etcd.io/etcd/mvcc/mvccpb"
  25. "google.golang.org/grpc/codes"
  26. "google.golang.org/grpc/status"
  27. )
  28. type leasingKV struct {
  29. cl *v3.Client
  30. kv v3.KV
  31. pfx string
  32. leases leaseCache
  33. ctx context.Context
  34. cancel context.CancelFunc
  35. wg sync.WaitGroup
  36. sessionOpts []concurrency.SessionOption
  37. session *concurrency.Session
  38. sessionc chan struct{}
  39. }
  40. var closedCh chan struct{}
  41. func init() {
  42. closedCh = make(chan struct{})
  43. close(closedCh)
  44. }
  45. // NewKV wraps a KV instance so that all requests are wired through a leasing protocol.
  46. func NewKV(cl *v3.Client, pfx string, opts ...concurrency.SessionOption) (v3.KV, func(), error) {
  47. cctx, cancel := context.WithCancel(cl.Ctx())
  48. lkv := &leasingKV{
  49. cl: cl,
  50. kv: cl.KV,
  51. pfx: pfx,
  52. leases: leaseCache{revokes: make(map[string]time.Time)},
  53. ctx: cctx,
  54. cancel: cancel,
  55. sessionOpts: opts,
  56. sessionc: make(chan struct{}),
  57. }
  58. lkv.wg.Add(2)
  59. go func() {
  60. defer lkv.wg.Done()
  61. lkv.monitorSession()
  62. }()
  63. go func() {
  64. defer lkv.wg.Done()
  65. lkv.leases.clearOldRevokes(cctx)
  66. }()
  67. return lkv, lkv.Close, lkv.waitSession(cctx)
  68. }
  69. func (lkv *leasingKV) Close() {
  70. lkv.cancel()
  71. lkv.wg.Wait()
  72. }
  73. func (lkv *leasingKV) Get(ctx context.Context, key string, opts ...v3.OpOption) (*v3.GetResponse, error) {
  74. return lkv.get(ctx, v3.OpGet(key, opts...))
  75. }
  76. func (lkv *leasingKV) Put(ctx context.Context, key, val string, opts ...v3.OpOption) (*v3.PutResponse, error) {
  77. return lkv.put(ctx, v3.OpPut(key, val, opts...))
  78. }
  79. func (lkv *leasingKV) Delete(ctx context.Context, key string, opts ...v3.OpOption) (*v3.DeleteResponse, error) {
  80. return lkv.delete(ctx, v3.OpDelete(key, opts...))
  81. }
  82. func (lkv *leasingKV) Do(ctx context.Context, op v3.Op) (v3.OpResponse, error) {
  83. switch {
  84. case op.IsGet():
  85. resp, err := lkv.get(ctx, op)
  86. return resp.OpResponse(), err
  87. case op.IsPut():
  88. resp, err := lkv.put(ctx, op)
  89. return resp.OpResponse(), err
  90. case op.IsDelete():
  91. resp, err := lkv.delete(ctx, op)
  92. return resp.OpResponse(), err
  93. case op.IsTxn():
  94. cmps, thenOps, elseOps := op.Txn()
  95. resp, err := lkv.Txn(ctx).If(cmps...).Then(thenOps...).Else(elseOps...).Commit()
  96. return resp.OpResponse(), err
  97. }
  98. return v3.OpResponse{}, nil
  99. }
  100. func (lkv *leasingKV) Compact(ctx context.Context, rev int64, opts ...v3.CompactOption) (*v3.CompactResponse, error) {
  101. return lkv.kv.Compact(ctx, rev, opts...)
  102. }
  103. func (lkv *leasingKV) Txn(ctx context.Context) v3.Txn {
  104. return &txnLeasing{Txn: lkv.kv.Txn(ctx), lkv: lkv, ctx: ctx}
  105. }
  106. func (lkv *leasingKV) monitorSession() {
  107. for lkv.ctx.Err() == nil {
  108. if lkv.session != nil {
  109. select {
  110. case <-lkv.session.Done():
  111. case <-lkv.ctx.Done():
  112. return
  113. }
  114. }
  115. lkv.leases.mu.Lock()
  116. select {
  117. case <-lkv.sessionc:
  118. lkv.sessionc = make(chan struct{})
  119. default:
  120. }
  121. lkv.leases.entries = make(map[string]*leaseKey)
  122. lkv.leases.mu.Unlock()
  123. s, err := concurrency.NewSession(lkv.cl, lkv.sessionOpts...)
  124. if err != nil {
  125. continue
  126. }
  127. lkv.leases.mu.Lock()
  128. lkv.session = s
  129. close(lkv.sessionc)
  130. lkv.leases.mu.Unlock()
  131. }
  132. }
  133. func (lkv *leasingKV) monitorLease(ctx context.Context, key string, rev int64) {
  134. cctx, cancel := context.WithCancel(lkv.ctx)
  135. defer cancel()
  136. for cctx.Err() == nil {
  137. if rev == 0 {
  138. resp, err := lkv.kv.Get(ctx, lkv.pfx+key)
  139. if err != nil {
  140. continue
  141. }
  142. rev = resp.Header.Revision
  143. if len(resp.Kvs) == 0 || string(resp.Kvs[0].Value) == "REVOKE" {
  144. lkv.rescind(cctx, key, rev)
  145. return
  146. }
  147. }
  148. wch := lkv.cl.Watch(cctx, lkv.pfx+key, v3.WithRev(rev+1))
  149. for resp := range wch {
  150. for _, ev := range resp.Events {
  151. if string(ev.Kv.Value) != "REVOKE" {
  152. continue
  153. }
  154. if v3.LeaseID(ev.Kv.Lease) == lkv.leaseID() {
  155. lkv.rescind(cctx, key, ev.Kv.ModRevision)
  156. }
  157. return
  158. }
  159. }
  160. rev = 0
  161. }
  162. }
  163. // rescind releases a lease from this client.
  164. func (lkv *leasingKV) rescind(ctx context.Context, key string, rev int64) {
  165. if lkv.leases.Evict(key) > rev {
  166. return
  167. }
  168. cmp := v3.Compare(v3.CreateRevision(lkv.pfx+key), "<", rev)
  169. op := v3.OpDelete(lkv.pfx + key)
  170. for ctx.Err() == nil {
  171. if _, err := lkv.kv.Txn(ctx).If(cmp).Then(op).Commit(); err == nil {
  172. return
  173. }
  174. }
  175. }
  176. func (lkv *leasingKV) waitRescind(ctx context.Context, key string, rev int64) error {
  177. cctx, cancel := context.WithCancel(ctx)
  178. defer cancel()
  179. wch := lkv.cl.Watch(cctx, lkv.pfx+key, v3.WithRev(rev+1))
  180. for resp := range wch {
  181. for _, ev := range resp.Events {
  182. if ev.Type == v3.EventTypeDelete {
  183. return ctx.Err()
  184. }
  185. }
  186. }
  187. return ctx.Err()
  188. }
  189. func (lkv *leasingKV) tryModifyOp(ctx context.Context, op v3.Op) (*v3.TxnResponse, chan<- struct{}, error) {
  190. key := string(op.KeyBytes())
  191. wc, rev := lkv.leases.Lock(key)
  192. cmp := v3.Compare(v3.CreateRevision(lkv.pfx+key), "<", rev+1)
  193. resp, err := lkv.kv.Txn(ctx).If(cmp).Then(op).Commit()
  194. switch {
  195. case err != nil:
  196. lkv.leases.Evict(key)
  197. fallthrough
  198. case !resp.Succeeded:
  199. if wc != nil {
  200. close(wc)
  201. }
  202. return nil, nil, err
  203. }
  204. return resp, wc, nil
  205. }
  206. func (lkv *leasingKV) put(ctx context.Context, op v3.Op) (pr *v3.PutResponse, err error) {
  207. if err := lkv.waitSession(ctx); err != nil {
  208. return nil, err
  209. }
  210. for ctx.Err() == nil {
  211. resp, wc, err := lkv.tryModifyOp(ctx, op)
  212. if err != nil || wc == nil {
  213. resp, err = lkv.revoke(ctx, string(op.KeyBytes()), op)
  214. }
  215. if err != nil {
  216. return nil, err
  217. }
  218. if resp.Succeeded {
  219. lkv.leases.mu.Lock()
  220. lkv.leases.Update(op.KeyBytes(), op.ValueBytes(), resp.Header)
  221. lkv.leases.mu.Unlock()
  222. pr = (*v3.PutResponse)(resp.Responses[0].GetResponsePut())
  223. pr.Header = resp.Header
  224. }
  225. if wc != nil {
  226. close(wc)
  227. }
  228. if resp.Succeeded {
  229. return pr, nil
  230. }
  231. }
  232. return nil, ctx.Err()
  233. }
  234. func (lkv *leasingKV) acquire(ctx context.Context, key string, op v3.Op) (*v3.TxnResponse, error) {
  235. for ctx.Err() == nil {
  236. if err := lkv.waitSession(ctx); err != nil {
  237. return nil, err
  238. }
  239. lcmp := v3.Cmp{Key: []byte(key), Target: pb.Compare_LEASE}
  240. resp, err := lkv.kv.Txn(ctx).If(
  241. v3.Compare(v3.CreateRevision(lkv.pfx+key), "=", 0),
  242. v3.Compare(lcmp, "=", 0)).
  243. Then(
  244. op,
  245. v3.OpPut(lkv.pfx+key, "", v3.WithLease(lkv.leaseID()))).
  246. Else(
  247. op,
  248. v3.OpGet(lkv.pfx+key),
  249. ).Commit()
  250. if err == nil {
  251. if !resp.Succeeded {
  252. kvs := resp.Responses[1].GetResponseRange().Kvs
  253. // if txn failed since already owner, lease is acquired
  254. resp.Succeeded = len(kvs) > 0 && v3.LeaseID(kvs[0].Lease) == lkv.leaseID()
  255. }
  256. return resp, nil
  257. }
  258. // retry if transient error
  259. if _, ok := err.(rpctypes.EtcdError); ok {
  260. return nil, err
  261. }
  262. if ev, ok := status.FromError(err); ok && ev.Code() != codes.Unavailable {
  263. return nil, err
  264. }
  265. }
  266. return nil, ctx.Err()
  267. }
  268. func (lkv *leasingKV) get(ctx context.Context, op v3.Op) (*v3.GetResponse, error) {
  269. do := func() (*v3.GetResponse, error) {
  270. r, err := lkv.kv.Do(ctx, op)
  271. return r.Get(), err
  272. }
  273. if !lkv.readySession() {
  274. return do()
  275. }
  276. if resp, ok := lkv.leases.Get(ctx, op); resp != nil {
  277. return resp, nil
  278. } else if !ok || op.IsSerializable() {
  279. // must be handled by server or can skip linearization
  280. return do()
  281. }
  282. key := string(op.KeyBytes())
  283. if !lkv.leases.MayAcquire(key) {
  284. resp, err := lkv.kv.Do(ctx, op)
  285. return resp.Get(), err
  286. }
  287. resp, err := lkv.acquire(ctx, key, v3.OpGet(key))
  288. if err != nil {
  289. return nil, err
  290. }
  291. getResp := (*v3.GetResponse)(resp.Responses[0].GetResponseRange())
  292. getResp.Header = resp.Header
  293. if resp.Succeeded {
  294. getResp = lkv.leases.Add(key, getResp, op)
  295. lkv.wg.Add(1)
  296. go func() {
  297. defer lkv.wg.Done()
  298. lkv.monitorLease(ctx, key, resp.Header.Revision)
  299. }()
  300. }
  301. return getResp, nil
  302. }
  303. func (lkv *leasingKV) deleteRangeRPC(ctx context.Context, maxLeaseRev int64, key, end string) (*v3.DeleteResponse, error) {
  304. lkey, lend := lkv.pfx+key, lkv.pfx+end
  305. resp, err := lkv.kv.Txn(ctx).If(
  306. v3.Compare(v3.CreateRevision(lkey).WithRange(lend), "<", maxLeaseRev+1),
  307. ).Then(
  308. v3.OpGet(key, v3.WithRange(end), v3.WithKeysOnly()),
  309. v3.OpDelete(key, v3.WithRange(end)),
  310. ).Commit()
  311. if err != nil {
  312. lkv.leases.EvictRange(key, end)
  313. return nil, err
  314. }
  315. if !resp.Succeeded {
  316. return nil, nil
  317. }
  318. for _, kv := range resp.Responses[0].GetResponseRange().Kvs {
  319. lkv.leases.Delete(string(kv.Key), resp.Header)
  320. }
  321. delResp := (*v3.DeleteResponse)(resp.Responses[1].GetResponseDeleteRange())
  322. delResp.Header = resp.Header
  323. return delResp, nil
  324. }
  325. func (lkv *leasingKV) deleteRange(ctx context.Context, op v3.Op) (*v3.DeleteResponse, error) {
  326. key, end := string(op.KeyBytes()), string(op.RangeBytes())
  327. for ctx.Err() == nil {
  328. maxLeaseRev, err := lkv.revokeRange(ctx, key, end)
  329. if err != nil {
  330. return nil, err
  331. }
  332. wcs := lkv.leases.LockRange(key, end)
  333. delResp, err := lkv.deleteRangeRPC(ctx, maxLeaseRev, key, end)
  334. closeAll(wcs)
  335. if err != nil || delResp != nil {
  336. return delResp, err
  337. }
  338. }
  339. return nil, ctx.Err()
  340. }
  341. func (lkv *leasingKV) delete(ctx context.Context, op v3.Op) (dr *v3.DeleteResponse, err error) {
  342. if err := lkv.waitSession(ctx); err != nil {
  343. return nil, err
  344. }
  345. if len(op.RangeBytes()) > 0 {
  346. return lkv.deleteRange(ctx, op)
  347. }
  348. key := string(op.KeyBytes())
  349. for ctx.Err() == nil {
  350. resp, wc, err := lkv.tryModifyOp(ctx, op)
  351. if err != nil || wc == nil {
  352. resp, err = lkv.revoke(ctx, key, op)
  353. }
  354. if err != nil {
  355. // don't know if delete was processed
  356. lkv.leases.Evict(key)
  357. return nil, err
  358. }
  359. if resp.Succeeded {
  360. dr = (*v3.DeleteResponse)(resp.Responses[0].GetResponseDeleteRange())
  361. dr.Header = resp.Header
  362. lkv.leases.Delete(key, dr.Header)
  363. }
  364. if wc != nil {
  365. close(wc)
  366. }
  367. if resp.Succeeded {
  368. return dr, nil
  369. }
  370. }
  371. return nil, ctx.Err()
  372. }
  373. func (lkv *leasingKV) revoke(ctx context.Context, key string, op v3.Op) (*v3.TxnResponse, error) {
  374. rev := lkv.leases.Rev(key)
  375. txn := lkv.kv.Txn(ctx).If(v3.Compare(v3.CreateRevision(lkv.pfx+key), "<", rev+1)).Then(op)
  376. resp, err := txn.Else(v3.OpPut(lkv.pfx+key, "REVOKE", v3.WithIgnoreLease())).Commit()
  377. if err != nil || resp.Succeeded {
  378. return resp, err
  379. }
  380. return resp, lkv.waitRescind(ctx, key, resp.Header.Revision)
  381. }
  382. func (lkv *leasingKV) revokeRange(ctx context.Context, begin, end string) (int64, error) {
  383. lkey, lend := lkv.pfx+begin, ""
  384. if len(end) > 0 {
  385. lend = lkv.pfx + end
  386. }
  387. leaseKeys, err := lkv.kv.Get(ctx, lkey, v3.WithRange(lend))
  388. if err != nil {
  389. return 0, err
  390. }
  391. return lkv.revokeLeaseKvs(ctx, leaseKeys.Kvs)
  392. }
  393. func (lkv *leasingKV) revokeLeaseKvs(ctx context.Context, kvs []*mvccpb.KeyValue) (int64, error) {
  394. maxLeaseRev := int64(0)
  395. for _, kv := range kvs {
  396. if rev := kv.CreateRevision; rev > maxLeaseRev {
  397. maxLeaseRev = rev
  398. }
  399. if v3.LeaseID(kv.Lease) == lkv.leaseID() {
  400. // don't revoke own keys
  401. continue
  402. }
  403. key := strings.TrimPrefix(string(kv.Key), lkv.pfx)
  404. if _, err := lkv.revoke(ctx, key, v3.OpGet(key)); err != nil {
  405. return 0, err
  406. }
  407. }
  408. return maxLeaseRev, nil
  409. }
  410. func (lkv *leasingKV) waitSession(ctx context.Context) error {
  411. lkv.leases.mu.RLock()
  412. sessionc := lkv.sessionc
  413. lkv.leases.mu.RUnlock()
  414. select {
  415. case <-sessionc:
  416. return nil
  417. case <-lkv.ctx.Done():
  418. return lkv.ctx.Err()
  419. case <-ctx.Done():
  420. return ctx.Err()
  421. }
  422. }
  423. func (lkv *leasingKV) readySession() bool {
  424. lkv.leases.mu.RLock()
  425. defer lkv.leases.mu.RUnlock()
  426. if lkv.session == nil {
  427. return false
  428. }
  429. select {
  430. case <-lkv.session.Done():
  431. default:
  432. return true
  433. }
  434. return false
  435. }
  436. func (lkv *leasingKV) leaseID() v3.LeaseID {
  437. lkv.leases.mu.RLock()
  438. defer lkv.leases.mu.RUnlock()
  439. return lkv.session.Lease()
  440. }