Browse Source

lease: Add a heap to optimize lease expiration checks

This adds a heap acting as a priority queue to keep track of lease
exiprations. Previously the whole lease map had to be iterated through
each time.

The queue allows us to check only those leases which might be expired.
When the expiration changes, we add an additional entry. If we check an
entry that isn't expired, it means that the lease got extended.
If we find a entry in the heap that doesn't have a corresponding entry in
the map, we know that the lease has already been expired or revoked.
micah 7 years ago
parent
commit
6f271d8bf1
2 changed files with 83 additions and 6 deletions
  1. 51 0
      lease/lease_queue.go
  2. 32 6
      lease/lessor.go

+ 51 - 0
lease/lease_queue.go

@@ -0,0 +1,51 @@
+// Copyright 2018 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package lease
+
+type LeaseWithTime struct {
+	leaseId    LeaseID
+	expiration int64
+	index      int
+}
+
+type LeaseQueue []*LeaseWithTime
+
+func (pq LeaseQueue) Len() int { return len(pq) }
+
+func (pq LeaseQueue) Less(i, j int) bool {
+	return pq[i].expiration < pq[j].expiration
+}
+
+func (pq LeaseQueue) Swap(i, j int) {
+	pq[i], pq[j] = pq[j], pq[i]
+	pq[i].index = i
+	pq[j].index = j
+}
+
+func (pq *LeaseQueue) Push(x interface{}) {
+	n := len(*pq)
+	item := x.(*LeaseWithTime)
+	item.index = n
+	*pq = append(*pq, item)
+}
+
+func (pq *LeaseQueue) Pop() interface{} {
+	old := *pq
+	n := len(old)
+	item := old[n-1]
+	item.index = -1 // for safety
+	*pq = old[0 : n-1]
+	return item
+}

+ 32 - 6
lease/lessor.go

@@ -15,6 +15,7 @@
 package lease
 package lease
 
 
 import (
 import (
+	"container/heap"
 	"encoding/binary"
 	"encoding/binary"
 	"errors"
 	"errors"
 	"math"
 	"math"
@@ -128,9 +129,9 @@ type lessor struct {
 	// We want to make Grant, Revoke, and findExpiredLeases all O(logN) and
 	// We want to make Grant, Revoke, and findExpiredLeases all O(logN) and
 	// Renew O(1).
 	// Renew O(1).
 	// findExpiredLeases and Renew should be the most frequent operations.
 	// findExpiredLeases and Renew should be the most frequent operations.
-	leaseMap map[LeaseID]*Lease
-
-	itemMap map[LeaseItem]LeaseID
+	leaseMap  map[LeaseID]*Lease
+	leaseHeap LeaseQueue
+	itemMap   map[LeaseItem]LeaseID
 
 
 	// When a lease expires, the lessor will delete the
 	// When a lease expires, the lessor will delete the
 	// leased range (or key) by the RangeDeleter.
 	// leased range (or key) by the RangeDeleter.
@@ -159,6 +160,7 @@ func newLessor(b backend.Backend, minLeaseTTL int64) *lessor {
 	l := &lessor{
 	l := &lessor{
 		leaseMap:    make(map[LeaseID]*Lease),
 		leaseMap:    make(map[LeaseID]*Lease),
 		itemMap:     make(map[LeaseItem]LeaseID),
 		itemMap:     make(map[LeaseItem]LeaseID),
+		leaseHeap:   make(LeaseQueue, 0),
 		b:           b,
 		b:           b,
 		minLeaseTTL: minLeaseTTL,
 		minLeaseTTL: minLeaseTTL,
 		// expiredC is a small buffered chan to avoid unnecessary blocking.
 		// expiredC is a small buffered chan to avoid unnecessary blocking.
@@ -233,6 +235,8 @@ func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) {
 	}
 	}
 
 
 	le.leaseMap[id] = l
 	le.leaseMap[id] = l
+	item := &LeaseWithTime{leaseId: l.ID, expiration: l.expiry.UnixNano()}
+	heap.Push(&le.leaseHeap, item)
 	l.persistTo(le.b)
 	l.persistTo(le.b)
 
 
 	return l, nil
 	return l, nil
@@ -315,6 +319,8 @@ func (le *lessor) Renew(id LeaseID) (int64, error) {
 	}
 	}
 
 
 	l.refresh(0)
 	l.refresh(0)
+	item := &LeaseWithTime{leaseId: l.ID, expiration: l.expiry.UnixNano()}
+	heap.Push(&le.leaseHeap, item)
 	return l.ttl, nil
 	return l.ttl, nil
 }
 }
 
 
@@ -349,6 +355,8 @@ func (le *lessor) Promote(extend time.Duration) {
 	// refresh the expiries of all leases.
 	// refresh the expiries of all leases.
 	for _, l := range le.leaseMap {
 	for _, l := range le.leaseMap {
 		l.refresh(extend)
 		l.refresh(extend)
+		item := &LeaseWithTime{leaseId: l.ID, expiration: l.expiry.UnixNano()}
+		heap.Push(&le.leaseHeap, item)
 	}
 	}
 
 
 	if len(le.leaseMap) < leaseRevokeRate {
 	if len(le.leaseMap) < leaseRevokeRate {
@@ -384,6 +392,8 @@ func (le *lessor) Promote(extend time.Duration) {
 		delay := time.Duration(rateDelay)
 		delay := time.Duration(rateDelay)
 		nextWindow = baseWindow + delay
 		nextWindow = baseWindow + delay
 		l.refresh(delay + extend)
 		l.refresh(delay + extend)
+		item := &LeaseWithTime{leaseId: l.ID, expiration: l.expiry.UnixNano()}
+		heap.Push(&le.leaseHeap, item)
 	}
 	}
 }
 }
 
 
@@ -516,9 +526,24 @@ func (le *lessor) runLoop() {
 func (le *lessor) findExpiredLeases(limit int) []*Lease {
 func (le *lessor) findExpiredLeases(limit int) []*Lease {
 	leases := make([]*Lease, 0, 16)
 	leases := make([]*Lease, 0, 16)
 
 
-	for _, l := range le.leaseMap {
-		// TODO: probably should change to <= 100-500 millisecond to
-		// make up committing latency.
+	for {
+		if le.leaseHeap.Len() == 0 {
+			break
+		}
+
+		item := heap.Pop(&le.leaseHeap).(*LeaseWithTime)
+		l := le.leaseMap[item.leaseId]
+		if l == nil {
+			// lease has expired or been revoked, continue
+			continue
+		}
+		if time.Now().UnixNano() < item.expiration {
+			// Candidate expirations are caught up, reinsert this item
+			heap.Push(&le.leaseHeap, item)
+			break
+		}
+		// if the lease is actually expired, add to the removal list. If it is not expired, we can ignore it because another entry will have been inserted into the heap
+
 		if l.expired() {
 		if l.expired() {
 			leases = append(leases, l)
 			leases = append(leases, l)
 
 
@@ -560,6 +585,7 @@ func (le *lessor) initAndRecover() {
 			revokec: make(chan struct{}),
 			revokec: make(chan struct{}),
 		}
 		}
 	}
 	}
+	heap.Init(&le.leaseHeap)
 	tx.Unlock()
 	tx.Unlock()
 
 
 	le.b.ForceCommit()
 	le.b.ForceCommit()