|
|
@@ -32,15 +32,14 @@ const (
|
|
|
// NoLease is a special LeaseID representing the absence of a lease.
|
|
|
NoLease = LeaseID(0)
|
|
|
|
|
|
- // maximum number of leases to revoke per iteration
|
|
|
- // TODO: make this configurable?
|
|
|
- leaseRevokeRate = 1000
|
|
|
+ forever = monotime.Time(math.MaxInt64)
|
|
|
)
|
|
|
|
|
|
var (
|
|
|
leaseBucketName = []byte("lease")
|
|
|
|
|
|
- forever = monotime.Time(math.MaxInt64)
|
|
|
+ // maximum number of leases to revoke per second; configurable for tests
|
|
|
+ leaseRevokeRate = 1000
|
|
|
|
|
|
ErrNotPrimary = errors.New("not a primary lessor")
|
|
|
ErrLeaseNotFound = errors.New("lease not found")
|
|
|
@@ -328,8 +327,53 @@ func (le *lessor) Promote(extend time.Duration) {
|
|
|
for _, l := range le.leaseMap {
|
|
|
l.refresh(extend)
|
|
|
}
|
|
|
+
|
|
|
+ if len(le.leaseMap) < leaseRevokeRate {
|
|
|
+ // no possibility of lease pile-up
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ // adjust expiries in case of overlap
|
|
|
+ leases := make([]*Lease, 0, len(le.leaseMap))
|
|
|
+ for _, l := range le.leaseMap {
|
|
|
+ leases = append(leases, l)
|
|
|
+ }
|
|
|
+ sort.Sort(leasesByExpiry(leases))
|
|
|
+
|
|
|
+ baseWindow := leases[0].Remaining()
|
|
|
+ nextWindow := baseWindow + time.Second
|
|
|
+ expires := 0
|
|
|
+ // have fewer expires than the total revoke rate so piled up leases
|
|
|
+ // don't consume the entire revoke limit
|
|
|
+ targetExpiresPerSecond := (3 * leaseRevokeRate) / 4
|
|
|
+ for _, l := range leases {
|
|
|
+ remaining := l.Remaining()
|
|
|
+ if remaining > nextWindow {
|
|
|
+ baseWindow = remaining
|
|
|
+ nextWindow = baseWindow + time.Second
|
|
|
+ expires = 1
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ expires++
|
|
|
+ if expires <= targetExpiresPerSecond {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ rateDelay := float64(time.Second) * (float64(expires) / float64(targetExpiresPerSecond))
|
|
|
+ // If leases are extended by n seconds, leases n seconds ahead of the
|
|
|
+ // base window should be extended by only one second.
|
|
|
+ rateDelay -= float64(remaining - baseWindow)
|
|
|
+ delay := time.Duration(rateDelay)
|
|
|
+ nextWindow = baseWindow + delay
|
|
|
+ l.refresh(delay + extend)
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
+type leasesByExpiry []*Lease
|
|
|
+
|
|
|
+func (le leasesByExpiry) Len() int { return len(le) }
|
|
|
+func (le leasesByExpiry) Less(i, j int) bool { return le[i].Remaining() < le[j].Remaining() }
|
|
|
+func (le leasesByExpiry) Swap(i, j int) { le[i], le[j] = le[j], le[i] }
|
|
|
+
|
|
|
func (le *lessor) Demote() {
|
|
|
le.mu.Lock()
|
|
|
defer le.mu.Unlock()
|