lessor.go 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273
  1. // Copyright 2015 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package lease
  15. import (
  16. "encoding/binary"
  17. "fmt"
  18. "math"
  19. "sync"
  20. "time"
  21. "github.com/coreos/etcd/lease/leasepb"
  22. "github.com/coreos/etcd/pkg/idutil"
  23. "github.com/coreos/etcd/storage/backend"
  24. )
  25. const (
  26. // NoLease is a special LeaseID representing the absence of a lease.
  27. NoLease = LeaseID(0)
  28. )
  29. var (
  30. minLeaseTerm = 5 * time.Second
  31. leaseBucketName = []byte("lease")
  32. )
  33. type LeaseID int64
  34. // DeleteableRange defines an interface with DeleteRange method.
  35. // We define this interface only for lessor to limit the number
  36. // of methods of storage.KV to what lessor actually needs.
  37. //
  38. // Having a minimum interface makes testing easy.
  39. type DeleteableRange interface {
  40. DeleteRange(key, end []byte) (int64, int64)
  41. }
  42. // a lessor is the owner of leases. It can grant, revoke,
  43. // renew and modify leases for lessee.
  44. // TODO: use clockwork for testability.
  45. type lessor struct {
  46. mu sync.Mutex
  47. // TODO: probably this should be a heap with a secondary
  48. // id index.
  49. // Now it is O(N) to loop over the leases to find expired ones.
  50. // We want to make Grant, Revoke, and FindExpired all O(logN) and
  51. // Renew O(1).
  52. // FindExpired and Renew should be the most frequent operations.
  53. leaseMap map[LeaseID]*lease
  54. // A DeleteableRange the lessor operates on.
  55. // When a lease expires, the lessor will delete the
  56. // leased range (or key) from the DeleteableRange.
  57. dr DeleteableRange
  58. // backend to persist leases. We only persist lease ID and expiry for now.
  59. // The leased items can be recovered by iterating all the keys in kv.
  60. b backend.Backend
  61. idgen *idutil.Generator
  62. }
  63. func NewLessor(lessorID uint8, b backend.Backend, dr DeleteableRange) *lessor {
  64. l := &lessor{
  65. leaseMap: make(map[LeaseID]*lease),
  66. b: b,
  67. dr: dr,
  68. idgen: idutil.NewGenerator(lessorID, time.Now()),
  69. }
  70. l.initAndRecover()
  71. return l
  72. }
  73. // Grant grants a lease that expires at least after TTL seconds.
  74. // TODO: when lessor is under high load, it should give out lease
  75. // with longer TTL to reduce renew load.
  76. func (le *lessor) Grant(ttl int64) *lease {
  77. // TODO: define max TTL
  78. expiry := time.Now().Add(time.Duration(ttl) * time.Second)
  79. expiry = minExpiry(time.Now(), expiry)
  80. id := LeaseID(le.idgen.Next())
  81. le.mu.Lock()
  82. defer le.mu.Unlock()
  83. l := &lease{id: id, ttl: ttl, expiry: expiry, itemSet: make(map[leaseItem]struct{})}
  84. if _, ok := le.leaseMap[id]; ok {
  85. panic("lease: unexpected duplicate ID!")
  86. }
  87. le.leaseMap[id] = l
  88. l.persistTo(le.b)
  89. return l
  90. }
  91. // Revoke revokes a lease with given ID. The item attached to the
  92. // given lease will be removed. If the ID does not exist, an error
  93. // will be returned.
  94. func (le *lessor) Revoke(id LeaseID) error {
  95. le.mu.Lock()
  96. defer le.mu.Unlock()
  97. l := le.leaseMap[id]
  98. if l == nil {
  99. return fmt.Errorf("lease: cannot find lease %x", id)
  100. }
  101. for item := range l.itemSet {
  102. le.dr.DeleteRange([]byte(item.key), nil)
  103. }
  104. delete(le.leaseMap, l.id)
  105. l.removeFrom(le.b)
  106. return nil
  107. }
  108. // Renew renews an existing lease. If the given lease does not exist or
  109. // has expired, an error will be returned.
  110. // TODO: return new TTL?
  111. func (le *lessor) Renew(id LeaseID) error {
  112. le.mu.Lock()
  113. defer le.mu.Unlock()
  114. l := le.leaseMap[id]
  115. if l == nil {
  116. return fmt.Errorf("lease: cannot find lease %x", id)
  117. }
  118. expiry := time.Now().Add(time.Duration(l.ttl) * time.Second)
  119. l.expiry = minExpiry(time.Now(), expiry)
  120. return nil
  121. }
  122. // Attach attaches items to the lease with given ID. When the lease
  123. // expires, the attached items will be automatically removed.
  124. // If the given lease does not exist, an error will be returned.
  125. func (le *lessor) Attach(id LeaseID, items []leaseItem) error {
  126. le.mu.Lock()
  127. defer le.mu.Unlock()
  128. l := le.leaseMap[id]
  129. if l == nil {
  130. return fmt.Errorf("lease: cannot find lease %x", id)
  131. }
  132. for _, it := range items {
  133. l.itemSet[it] = struct{}{}
  134. }
  135. return nil
  136. }
  137. // findExpiredLeases loops all the leases in the leaseMap and returns the expired
  138. // leases that needed to be revoked.
  139. func (le *lessor) findExpiredLeases() []*lease {
  140. le.mu.Lock()
  141. defer le.mu.Unlock()
  142. leases := make([]*lease, 0, 16)
  143. now := time.Now()
  144. for _, l := range le.leaseMap {
  145. if l.expiry.Sub(now) <= 0 {
  146. leases = append(leases, l)
  147. }
  148. }
  149. return leases
  150. }
  151. // get gets the lease with given id.
  152. // get is a helper fucntion for testing, at least for now.
  153. func (le *lessor) get(id LeaseID) *lease {
  154. le.mu.Lock()
  155. defer le.mu.Unlock()
  156. return le.leaseMap[id]
  157. }
  158. func (le *lessor) initAndRecover() {
  159. tx := le.b.BatchTx()
  160. tx.Lock()
  161. tx.UnsafeCreateBucket(leaseBucketName)
  162. _, vs := tx.UnsafeRange(leaseBucketName, int64ToBytes(0), int64ToBytes(math.MaxInt64), 0)
  163. // TODO: copy vs and do decoding outside tx lock if lock contention becomes an issue.
  164. for i := range vs {
  165. var lpb leasepb.Lease
  166. err := lpb.Unmarshal(vs[i])
  167. if err != nil {
  168. tx.Unlock()
  169. panic("failed to unmarshal lease proto item")
  170. }
  171. id := LeaseID(lpb.ID)
  172. le.leaseMap[id] = &lease{
  173. id: id,
  174. ttl: lpb.TTL,
  175. // itemSet will be filled in when recover key-value pairs
  176. expiry: minExpiry(time.Now(), time.Now().Add(time.Second*time.Duration(lpb.TTL))),
  177. }
  178. }
  179. tx.Unlock()
  180. le.b.ForceCommit()
  181. }
  182. type lease struct {
  183. id LeaseID
  184. ttl int64 // time to live in seconds
  185. itemSet map[leaseItem]struct{}
  186. // expiry time in unixnano
  187. expiry time.Time
  188. }
  189. func (l lease) persistTo(b backend.Backend) {
  190. key := int64ToBytes(int64(l.id))
  191. lpb := leasepb.Lease{ID: int64(l.id), TTL: int64(l.ttl)}
  192. val, err := lpb.Marshal()
  193. if err != nil {
  194. panic("failed to marshal lease proto item")
  195. }
  196. b.BatchTx().Lock()
  197. b.BatchTx().UnsafePut(leaseBucketName, key, val)
  198. b.BatchTx().Unlock()
  199. }
  200. func (l lease) removeFrom(b backend.Backend) {
  201. key := int64ToBytes(int64(l.id))
  202. b.BatchTx().Lock()
  203. b.BatchTx().UnsafeDelete(leaseBucketName, key)
  204. b.BatchTx().Unlock()
  205. }
  206. type leaseItem struct {
  207. key string
  208. }
  209. // minExpiry returns a minimal expiry. A minimal expiry is the larger on
  210. // between now + minLeaseTerm and the given expectedExpiry.
  211. func minExpiry(now time.Time, expectedExpiry time.Time) time.Time {
  212. minExpiry := time.Now().Add(minLeaseTerm)
  213. if expectedExpiry.Sub(minExpiry) < 0 {
  214. expectedExpiry = minExpiry
  215. }
  216. return expectedExpiry
  217. }
  218. func int64ToBytes(n int64) []byte {
  219. bytes := make([]byte, 8)
  220. binary.BigEndian.PutUint64(bytes, uint64(n))
  221. return bytes
  222. }