lease_queue.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. // Copyright 2018 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package lease
  15. import "container/heap"
  16. // LeaseWithTime contains lease object with a time.
  17. // For the lessor's lease heap, time identifies the lease expiration time.
  18. // For the lessor's lease checkpoint heap, the time identifies the next lease checkpoint time.
  19. type LeaseWithTime struct {
  20. id LeaseID
  21. // Unix nanos timestamp.
  22. time int64
  23. index int
  24. }
  25. type LeaseQueue []*LeaseWithTime
  26. func (pq LeaseQueue) Len() int { return len(pq) }
  27. func (pq LeaseQueue) Less(i, j int) bool {
  28. return pq[i].time < pq[j].time
  29. }
  30. func (pq LeaseQueue) Swap(i, j int) {
  31. pq[i], pq[j] = pq[j], pq[i]
  32. pq[i].index = i
  33. pq[j].index = j
  34. }
  35. func (pq *LeaseQueue) Push(x interface{}) {
  36. n := len(*pq)
  37. item := x.(*LeaseWithTime)
  38. item.index = n
  39. *pq = append(*pq, item)
  40. }
  41. func (pq *LeaseQueue) Pop() interface{} {
  42. old := *pq
  43. n := len(old)
  44. item := old[n-1]
  45. item.index = -1 // for safety
  46. *pq = old[0 : n-1]
  47. return item
  48. }
  49. // LeaseExpiredNotifier is a queue used to notify lessor to revoke expired lease.
  50. // Only save one item for a lease, `Register` will update time of the corresponding lease.
  51. type LeaseExpiredNotifier struct {
  52. m map[LeaseID]*LeaseWithTime
  53. queue LeaseQueue
  54. }
  55. func newLeaseExpiredNotifier() *LeaseExpiredNotifier {
  56. return &LeaseExpiredNotifier{
  57. m: make(map[LeaseID]*LeaseWithTime),
  58. queue: make(LeaseQueue, 0),
  59. }
  60. }
  61. func (mq *LeaseExpiredNotifier) Init() {
  62. heap.Init(&mq.queue)
  63. mq.m = make(map[LeaseID]*LeaseWithTime)
  64. for _, item := range mq.queue {
  65. mq.m[item.id] = item
  66. }
  67. }
  68. func (mq *LeaseExpiredNotifier) RegisterOrUpdate(item *LeaseWithTime) {
  69. if old, ok := mq.m[item.id]; ok {
  70. old.time = item.time
  71. heap.Fix(&mq.queue, old.index)
  72. } else {
  73. heap.Push(&mq.queue, item)
  74. mq.m[item.id] = item
  75. }
  76. }
  77. func (mq *LeaseExpiredNotifier) Unregister() *LeaseWithTime {
  78. item := heap.Pop(&mq.queue).(*LeaseWithTime)
  79. delete(mq.m, item.id)
  80. return item
  81. }
  82. func (mq *LeaseExpiredNotifier) Poll() *LeaseWithTime {
  83. if mq.Len() == 0 {
  84. return nil
  85. }
  86. return mq.queue[0]
  87. }
  88. func (mq *LeaseExpiredNotifier) Len() int {
  89. return len(mq.m)
  90. }