浏览代码

Merge pull request #8149 from heyitsanthony/lease-sort-promote

lessor: extend leases on promote if expires will be rate limited
Anthony Romano 8 年之前
父节点
当前提交
f7df3c80d5
共有 3 个文件被更改,包括 82 次插入29 次删除
  1. 7 1
      integration/v3_lease_test.go
  2. 47 16
      lease/lessor.go
  3. 28 12
      lease/lessor_test.go

+ 7 - 1
integration/v3_lease_test.go

@@ -66,10 +66,16 @@ func TestV3LeasePrmote(t *testing.T) {
 	// it was going to expire anyway.
 	time.Sleep(3 * time.Second)
 
-	// expiring lease should be renewed with randomized delta
 	if !leaseExist(t, clus, lresp.ID) {
 		t.Error("unexpected lease not exists")
 	}
+
+	// let lease expires. total lease = 5 seconds and we already
+	// waits for 3 seconds, so 3 seconds more is enough.
+	time.Sleep(3 * time.Second)
+	if leaseExist(t, clus, lresp.ID) {
+		t.Error("unexpected lease exists")
+	}
 }
 
 // TestV3LeaseRevoke ensures a key is deleted once its lease is revoked.

+ 47 - 16
lease/lessor.go

@@ -18,7 +18,6 @@ import (
 	"encoding/binary"
 	"errors"
 	"math"
-	"math/rand"
 	"sort"
 	"sync"
 	"sync/atomic"
@@ -33,15 +32,14 @@ const (
 	// NoLease is a special LeaseID representing the absence of a lease.
 	NoLease = LeaseID(0)
 
-	// maximum number of leases to revoke per iteration
-	// TODO: make this configurable?
-	leaseRevokeRate = 1000
+	forever = monotime.Time(math.MaxInt64)
 )
 
 var (
 	leaseBucketName = []byte("lease")
 
-	forever = monotime.Time(math.MaxInt64)
+	// maximum number of leases to revoke per second; configurable for tests
+	leaseRevokeRate = 1000
 
 	ErrNotPrimary    = errors.New("not a primary lessor")
 	ErrLeaseNotFound = errors.New("lease not found")
@@ -327,22 +325,55 @@ func (le *lessor) Promote(extend time.Duration) {
 
 	// refresh the expiries of all leases.
 	for _, l := range le.leaseMap {
-		// randomize expiry with 士10%, otherwise leases of same TTL
-		// will expire all at the same time,
-		l.refresh(extend + computeRandomDelta(l.ttl))
+		l.refresh(extend)
 	}
-}
 
-func computeRandomDelta(seconds int64) time.Duration {
-	var delta int64
-	if seconds > 10 {
-		delta = int64(float64(seconds) * 0.1 * rand.Float64())
-	} else {
-		delta = rand.Int63n(10)
+	if len(le.leaseMap) < leaseRevokeRate {
+		// no possibility of lease pile-up
+		return
+	}
+
+	// adjust expiries in case of overlap
+	leases := make([]*Lease, 0, len(le.leaseMap))
+	for _, l := range le.leaseMap {
+		leases = append(leases, l)
+	}
+	sort.Sort(leasesByExpiry(leases))
+
+	baseWindow := leases[0].Remaining()
+	nextWindow := baseWindow + time.Second
+	expires := 0
+	// have fewer expires than the total revoke rate so piled up leases
+	// don't consume the entire revoke limit
+	targetExpiresPerSecond := (3 * leaseRevokeRate) / 4
+	for _, l := range leases {
+		remaining := l.Remaining()
+		if remaining > nextWindow {
+			baseWindow = remaining
+			nextWindow = baseWindow + time.Second
+			expires = 1
+			continue
+		}
+		expires++
+		if expires <= targetExpiresPerSecond {
+			continue
+		}
+		rateDelay := float64(time.Second) * (float64(expires) / float64(targetExpiresPerSecond))
+		// If leases are extended by n seconds, leases n seconds ahead of the
+		// base window should be extended by only one second.
+		rateDelay -= float64(remaining - baseWindow)
+		delay := time.Duration(rateDelay)
+		nextWindow = baseWindow + delay
+		l.refresh(delay + extend)
 	}
-	return time.Duration(delta) * time.Second
 }
 
+type leasesByExpiry []*Lease
+
+func (le leasesByExpiry) Len() int           { return len(le) }
+func (le leasesByExpiry) Less(i, j int) bool { return le[i].Remaining() < le[j].Remaining() }
+func (le leasesByExpiry) Swap(i, j int)      { le[i], le[j] = le[j], le[i] }
+
 func (le *lessor) Demote() {
 	le.mu.Lock()
 	defer le.mu.Unlock()

+ 28 - 12
lease/lessor_test.go

@@ -26,7 +26,6 @@ import (
 	"time"
 
 	"github.com/coreos/etcd/mvcc/backend"
-	"github.com/coreos/etcd/pkg/monotime"
 )
 
 const (
@@ -211,14 +210,24 @@ func TestLessorRenew(t *testing.T) {
 	}
 }
 
-// TestLessorRenewRandomize ensures Lessor renews with randomized expiry.
-func TestLessorRenewRandomize(t *testing.T) {
+// TestLessorRenewExtendPileup ensures Lessor extends leases on promotion if too many
+// expire at the same time.
+func TestLessorRenewExtendPileup(t *testing.T) {
+	oldRevokeRate := leaseRevokeRate
+	defer func() { leaseRevokeRate = oldRevokeRate }()
+	leaseRevokeRate = 10
+
 	dir, be := NewTestBackend(t)
 	defer os.RemoveAll(dir)
 
 	le := newLessor(be, minLeaseTTL)
-	for i := LeaseID(1); i <= 10; i++ {
-		if _, err := le.Grant(i, 3600); err != nil {
+	ttl := int64(10)
+	for i := 1; i <= leaseRevokeRate*10; i++ {
+		if _, err := le.Grant(LeaseID(2*i), ttl); err != nil {
+			t.Fatal(err)
+		}
+		// ttls that overlap spillover for ttl=10
+		if _, err := le.Grant(LeaseID(2*i+1), ttl+1); err != nil {
 			t.Fatal(err)
 		}
 	}
@@ -232,16 +241,23 @@ func TestLessorRenewRandomize(t *testing.T) {
 	defer be.Close()
 	le = newLessor(be, minLeaseTTL)
 
-	now := monotime.Now()
-
-	// extend after recovery should randomize expiries
+	// extend after recovery should extend expiration on lease pile-up
 	le.Promote(0)
 
+	windowCounts := make(map[int64]int)
 	for _, l := range le.leaseMap {
-		leftSeconds := uint64(float64(l.expiry-now) * float64(1e-9))
-		pc := (float64(leftSeconds-3600) / float64(3600)) * 100
-		if pc > 10.0 || pc < -10.0 || pc == 0 { // should be within 士10%
-			t.Fatalf("expected randomized expiry, got %d seconds (ttl: 3600)", leftSeconds)
+		// round up slightly for baseline ttl
+		s := int64(l.Remaining().Seconds() + 0.1)
+		windowCounts[s]++
+	}
+
+	for i := ttl; i < ttl+20; i++ {
+		c := windowCounts[i]
+		if c > leaseRevokeRate {
+			t.Errorf("expected at most %d expiring at %ds, got %d", leaseRevokeRate, i, c)
+		}
+		if c < leaseRevokeRate/2 {
+			t.Errorf("expected at least %d expiring at %ds, got %d", leaseRevokeRate/2, i, c)
 		}
 	}
 }