lessor.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. // Copyright 2015 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package lease
  15. import (
  16. "encoding/binary"
  17. "fmt"
  18. "sync"
  19. "time"
  20. "github.com/coreos/etcd/lease/leasepb"
  21. "github.com/coreos/etcd/pkg/idutil"
  22. "github.com/coreos/etcd/storage/backend"
  23. )
  24. var (
  25. minLeaseTerm = 5 * time.Second
  26. leaseBucketName = []byte("lease")
  27. )
  28. // DeleteableRange defines an interface with DeleteRange method.
  29. // We define this interface only for lessor to limit the number
  30. // of methods of storage.KV to what lessor actually needs.
  31. //
  32. // Having a minimum interface makes testing easy.
  33. type DeleteableRange interface {
  34. DeleteRange(key, end []byte) (int64, int64)
  35. }
  36. // a lessor is the owner of leases. It can grant, revoke,
  37. // renew and modify leases for lessee.
  38. // TODO: use clockwork for testability.
  39. type lessor struct {
  40. mu sync.Mutex
  41. // TODO: probably this should be a heap with a secondary
  42. // id index.
  43. // Now it is O(N) to loop over the leases to find expired ones.
  44. // We want to make Grant, Revoke, and FindExpired all O(logN) and
  45. // Renew O(1).
  46. // FindExpired and Renew should be the most frequent operations.
  47. leaseMap map[int64]*lease
  48. // A DeleteableRange the lessor operates on.
  49. // When a lease expires, the lessor will delete the
  50. // leased range (or key) from the DeleteableRange.
  51. dr DeleteableRange
  52. // backend to persist leases. We only persist lease ID and expiry for now.
  53. // The leased items can be recovered by iterating all the keys in kv.
  54. b backend.Backend
  55. idgen *idutil.Generator
  56. }
  57. func NewLessor(lessorID uint8, b backend.Backend, dr DeleteableRange) *lessor {
  58. l := &lessor{
  59. leaseMap: make(map[int64]*lease),
  60. b: b,
  61. dr: dr,
  62. idgen: idutil.NewGenerator(lessorID, time.Now()),
  63. }
  64. tx := l.b.BatchTx()
  65. tx.Lock()
  66. tx.UnsafeCreateBucket(leaseBucketName)
  67. tx.Unlock()
  68. l.b.ForceCommit()
  69. // TODO: recover from previous state in backend.
  70. return l
  71. }
  72. // Grant grants a lease that expires at least after TTL seconds.
  73. // TODO: when lessor is under high load, it should give out lease
  74. // with longer TTL to reduce renew load.
  75. func (le *lessor) Grant(ttl int64) *lease {
  76. // TODO: define max TTL
  77. expiry := time.Now().Add(time.Duration(ttl) * time.Second)
  78. expiry = minExpiry(time.Now(), expiry)
  79. id := int64(le.idgen.Next())
  80. le.mu.Lock()
  81. defer le.mu.Unlock()
  82. l := &lease{id: id, ttl: ttl, expiry: expiry, itemSet: make(map[leaseItem]struct{})}
  83. if _, ok := le.leaseMap[id]; ok {
  84. panic("lease: unexpected duplicate ID!")
  85. }
  86. le.leaseMap[id] = l
  87. l.persistTo(le.b)
  88. return l
  89. }
  90. // Revoke revokes a lease with given ID. The item attached to the
  91. // given lease will be removed. If the ID does not exist, an error
  92. // will be returned.
  93. func (le *lessor) Revoke(id int64) error {
  94. le.mu.Lock()
  95. defer le.mu.Unlock()
  96. l := le.leaseMap[id]
  97. if l == nil {
  98. return fmt.Errorf("lease: cannot find lease %x", id)
  99. }
  100. for item := range l.itemSet {
  101. le.dr.DeleteRange([]byte(item.key), nil)
  102. }
  103. delete(le.leaseMap, l.id)
  104. l.removeFrom(le.b)
  105. return nil
  106. }
  107. // Renew renews an existing lease. If the given lease does not exist or
  108. // has expired, an error will be returned.
  109. // TODO: return new TTL?
  110. func (le *lessor) Renew(id int64) error {
  111. le.mu.Lock()
  112. defer le.mu.Unlock()
  113. l := le.leaseMap[id]
  114. if l == nil {
  115. return fmt.Errorf("lease: cannot find lease %x", id)
  116. }
  117. expiry := time.Now().Add(time.Duration(l.ttl) * time.Second)
  118. l.expiry = minExpiry(time.Now(), expiry)
  119. return nil
  120. }
  121. // Attach attaches items to the lease with given ID. When the lease
  122. // expires, the attached items will be automatically removed.
  123. // If the given lease does not exist, an error will be returned.
  124. func (le *lessor) Attach(id int64, items []leaseItem) error {
  125. le.mu.Lock()
  126. defer le.mu.Unlock()
  127. l := le.leaseMap[id]
  128. if l == nil {
  129. return fmt.Errorf("lease: cannot find lease %x", id)
  130. }
  131. for _, it := range items {
  132. l.itemSet[it] = struct{}{}
  133. }
  134. return nil
  135. }
  136. // findExpiredLeases loops all the leases in the leaseMap and returns the expired
  137. // leases that needed to be revoked.
  138. func (le *lessor) findExpiredLeases() []*lease {
  139. le.mu.Lock()
  140. defer le.mu.Unlock()
  141. leases := make([]*lease, 0, 16)
  142. now := time.Now()
  143. for _, l := range le.leaseMap {
  144. if l.expiry.Sub(now) <= 0 {
  145. leases = append(leases, l)
  146. }
  147. }
  148. return leases
  149. }
  150. // get gets the lease with given id.
  151. // get is a helper fucntion for testing, at least for now.
  152. func (le *lessor) get(id int64) *lease {
  153. le.mu.Lock()
  154. defer le.mu.Unlock()
  155. return le.leaseMap[id]
  156. }
  157. type lease struct {
  158. id int64
  159. ttl int64 // time to live in seconds
  160. itemSet map[leaseItem]struct{}
  161. // expiry time in unixnano
  162. expiry time.Time
  163. }
  164. func (l lease) persistTo(b backend.Backend) {
  165. key := int64ToBytes(l.id)
  166. lpb := leasepb.Lease{ID: l.id, TTL: int64(l.ttl)}
  167. val, err := lpb.Marshal()
  168. if err != nil {
  169. panic("failed to marshal lease proto item")
  170. }
  171. b.BatchTx().Lock()
  172. b.BatchTx().UnsafePut(leaseBucketName, key, val)
  173. b.BatchTx().Unlock()
  174. }
  175. func (l lease) removeFrom(b backend.Backend) {
  176. key := int64ToBytes(l.id)
  177. b.BatchTx().Lock()
  178. b.BatchTx().UnsafeDelete(leaseBucketName, key)
  179. b.BatchTx().Unlock()
  180. }
  181. type leaseItem struct {
  182. key string
  183. }
  184. // minExpiry returns a minimal expiry. A minimal expiry is the larger on
  185. // between now + minLeaseTerm and the given expectedExpiry.
  186. func minExpiry(now time.Time, expectedExpiry time.Time) time.Time {
  187. minExpiry := time.Now().Add(minLeaseTerm)
  188. if expectedExpiry.Sub(minExpiry) < 0 {
  189. expectedExpiry = minExpiry
  190. }
  191. return expectedExpiry
  192. }
  193. func int64ToBytes(n int64) []byte {
  194. bytes := make([]byte, 8)
  195. binary.BigEndian.PutUint64(bytes, uint64(n))
  196. return bytes
  197. }