Browse Source

lease/lessor: recheck if exprired lease is revoked

Signed-off-by: nolouch <nolouch@gmail.com>
nolouch 6 years ago
parent
commit
dc8a31eaf0
4 changed files with 142 additions and 46 deletions
  1. 8 1
      etcdserver/server.go
  2. 51 0
      lease/lease_queue.go
  3. 42 19
      lease/lease_queue_test.go
  4. 41 26
      lease/lessor.go

+ 8 - 1
etcdserver/server.go

@@ -524,7 +524,14 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
 
 	// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
 	// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
-	srv.lessor = lease.NewLessor(srv.getLogger(), srv.be, lease.LessorConfig{MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())), CheckpointInterval: cfg.LeaseCheckpointInterval})
+	srv.lessor = lease.NewLessor(
+		srv.getLogger(),
+		srv.be,
+		lease.LessorConfig{
+			MinLeaseTTL:                int64(math.Ceil(minTTL.Seconds())),
+			CheckpointInterval:         cfg.LeaseCheckpointInterval,
+			ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(),
+		})
 	srv.kv = mvcc.New(srv.getLogger(), srv.be, srv.lessor, &srv.consistIndex)
 	if beExist {
 		kvindex := srv.kv.ConsistentIndex()

+ 51 - 0
lease/lease_queue.go

@@ -14,6 +14,8 @@
 
 package lease
 
+import "container/heap"
+
 // LeaseWithTime contains lease object with a time.
 // For the lessor's lease heap, time identifies the lease expiration time.
 // For the lessor's lease checkpoint heap, the time identifies the next lease checkpoint time.
@@ -53,3 +55,52 @@ func (pq *LeaseQueue) Pop() interface{} {
 	*pq = old[0 : n-1]
 	return item
 }
+
+// LeaseExpiredNotifier is a queue used to notify lessor to revoke expired lease.
+// Only save one item for a lease, `Register` will update time of the corresponding lease.
+type LeaseExpiredNotifier struct {
+	m     map[LeaseID]*LeaseWithTime
+	queue LeaseQueue
+}
+
+func newLeaseExpiredNotifier() *LeaseExpiredNotifier {
+	return &LeaseExpiredNotifier{
+		m:     make(map[LeaseID]*LeaseWithTime),
+		queue: make(LeaseQueue, 0),
+	}
+}
+
+func (mq *LeaseExpiredNotifier) Init() {
+	heap.Init(&mq.queue)
+	mq.m = make(map[LeaseID]*LeaseWithTime)
+	for _, item := range mq.queue {
+		mq.m[item.id] = item
+	}
+}
+
+func (mq *LeaseExpiredNotifier) RegisterOrUpdate(item *LeaseWithTime) {
+	if old, ok := mq.m[item.id]; ok {
+		old.time = item.time
+		heap.Fix(&mq.queue, old.index)
+	} else {
+		heap.Push(&mq.queue, item)
+		mq.m[item.id] = item
+	}
+}
+
+func (mq *LeaseExpiredNotifier) Unregister() *LeaseWithTime {
+	item := heap.Pop(&mq.queue).(*LeaseWithTime)
+	delete(mq.m, item.id)
+	return item
+}
+
+func (mq *LeaseExpiredNotifier) Poll() *LeaseWithTime {
+	if mq.Len() == 0 {
+		return nil
+	}
+	return mq.queue[0]
+}
+
+func (mq *LeaseExpiredNotifier) Len() int {
+	return len(mq.m)
+}

+ 42 - 19
lease/lease_queue_test.go

@@ -15,17 +15,18 @@
 package lease
 
 import (
-	"container/heap"
 	"testing"
 	"time"
 )
 
 func TestLeaseQueue(t *testing.T) {
+	expiredRetryInterval := 100 * time.Millisecond
 	le := &lessor{
-		leaseHeap: make(LeaseQueue, 0),
-		leaseMap:  make(map[LeaseID]*Lease),
+		leaseExpiredNotifier:      newLeaseExpiredNotifier(),
+		leaseMap:                  make(map[LeaseID]*Lease),
+		expiredLeaseRetryInterval: expiredRetryInterval,
 	}
-	heap.Init(&le.leaseHeap)
+	le.leaseExpiredNotifier.Init()
 
 	// insert in reverse order of expiration time
 	for i := 50; i >= 1; i-- {
@@ -34,26 +35,48 @@ func TestLeaseQueue(t *testing.T) {
 			exp = time.Now().UnixNano()
 		}
 		le.leaseMap[LeaseID(i)] = &Lease{ID: LeaseID(i)}
-		heap.Push(&le.leaseHeap, &LeaseWithTime{id: LeaseID(i), time: exp})
+		le.leaseExpiredNotifier.RegisterOrUpdate(&LeaseWithTime{id: LeaseID(i), time: exp})
 	}
 
-	// first element must be front
-	if le.leaseHeap[0].id != LeaseID(1) {
-		t.Fatalf("first item expected lease ID %d, got %d", LeaseID(1), le.leaseHeap[0].id)
+	// first element is expired.
+	if le.leaseExpiredNotifier.Poll().id != LeaseID(1) {
+		t.Fatalf("first item expected lease ID %d, got %d", LeaseID(1), le.leaseExpiredNotifier.Poll().id)
 	}
 
-	l, ok, more := le.expireExists()
-	if l.ID != 1 {
-		t.Fatalf("first item expected lease ID %d, got %d", 1, l.ID)
-	}
-	if !ok {
-		t.Fatal("expect expiry lease exists")
-	}
-	if more {
-		t.Fatal("expect no more expiry lease")
+	existExpiredEvent := func() {
+		l, ok, more := le.expireExists()
+		if l.ID != 1 {
+			t.Fatalf("first item expected lease ID %d, got %d", 1, l.ID)
+		}
+		if !ok {
+			t.Fatal("expect expiry lease exists")
+		}
+		if more {
+			t.Fatal("expect no more expiry lease")
+		}
+
+		if le.leaseExpiredNotifier.Len() != 50 {
+			t.Fatalf("expected the expired lease to be pushed back to the heap, heap size got %d", le.leaseExpiredNotifier.Len())
+		}
+
+		if le.leaseExpiredNotifier.Poll().id != LeaseID(1) {
+			t.Fatalf("first item expected lease ID %d, got %d", LeaseID(1), le.leaseExpiredNotifier.Poll().id)
+		}
 	}
 
-	if le.leaseHeap.Len() != 49 {
-		t.Fatalf("expected lease heap pop, got %d", le.leaseHeap.Len())
+	noExpiredEvent := func() {
+		// re-acquire the expired item, nothing exists
+		_, ok, more := le.expireExists()
+		if ok {
+			t.Fatal("expect no expiry lease exists")
+		}
+		if more {
+			t.Fatal("expect no more expiry lease")
+		}
 	}
+
+	existExpiredEvent() // first acquire
+	noExpiredEvent()    // second acquire
+	time.Sleep(expiredRetryInterval)
+	existExpiredEvent() // acquire after retry interval
 }

+ 41 - 26
lease/lessor.go

@@ -47,9 +47,15 @@ var (
 	// maximum number of lease checkpoints recorded to the consensus log per second; configurable for tests
 	leaseCheckpointRate = 1000
 
+	// the default interval of lease checkpoint
+	defaultLeaseCheckpointInterval = 5 * time.Minute
+
 	// maximum number of lease checkpoints to batch into a single consensus log entry
 	maxLeaseCheckpointBatchSize = 1000
 
+	// the default interval to check if the expired lease is revoked
+	defaultExpiredleaseRetryInterval = 3 * time.Second
+
 	ErrNotPrimary       = errors.New("not a primary lessor")
 	ErrLeaseNotFound    = errors.New("lease not found")
 	ErrLeaseExists      = errors.New("lease already exists")
@@ -142,10 +148,10 @@ type lessor struct {
 	// demotec will be closed if the lessor is demoted.
 	demotec chan struct{}
 
-	leaseMap            map[LeaseID]*Lease
-	leaseHeap           LeaseQueue
-	leaseCheckpointHeap LeaseQueue
-	itemMap             map[LeaseItem]LeaseID
+	leaseMap             map[LeaseID]*Lease
+	leaseExpiredNotifier *LeaseExpiredNotifier
+	leaseCheckpointHeap  LeaseQueue
+	itemMap              map[LeaseItem]LeaseID
 
 	// When a lease expires, the lessor will delete the
 	// leased range (or key) by the RangeDeleter.
@@ -173,11 +179,14 @@ type lessor struct {
 
 	// Wait duration between lease checkpoints.
 	checkpointInterval time.Duration
+	// the interval to check if the expired lease is revoked
+	expiredLeaseRetryInterval time.Duration
 }
 
 type LessorConfig struct {
-	MinLeaseTTL        int64
-	CheckpointInterval time.Duration
+	MinLeaseTTL                int64
+	CheckpointInterval         time.Duration
+	ExpiredLeasesRetryInterval time.Duration
 }
 
 func NewLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) Lessor {
@@ -186,17 +195,22 @@ func NewLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) Lessor {
 
 func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor {
 	checkpointInterval := cfg.CheckpointInterval
+	expiredLeaseRetryInterval := cfg.ExpiredLeasesRetryInterval
 	if checkpointInterval == 0 {
-		checkpointInterval = 5 * time.Minute
+		checkpointInterval = defaultLeaseCheckpointInterval
+	}
+	if expiredLeaseRetryInterval == 0 {
+		expiredLeaseRetryInterval = defaultExpiredleaseRetryInterval
 	}
 	l := &lessor{
-		leaseMap:            make(map[LeaseID]*Lease),
-		itemMap:             make(map[LeaseItem]LeaseID),
-		leaseHeap:           make(LeaseQueue, 0),
-		leaseCheckpointHeap: make(LeaseQueue, 0),
-		b:                   b,
-		minLeaseTTL:         cfg.MinLeaseTTL,
-		checkpointInterval:  checkpointInterval,
+		leaseMap:                  make(map[LeaseID]*Lease),
+		itemMap:                   make(map[LeaseItem]LeaseID),
+		leaseExpiredNotifier:      newLeaseExpiredNotifier(),
+		leaseCheckpointHeap:       make(LeaseQueue, 0),
+		b:                         b,
+		minLeaseTTL:               cfg.MinLeaseTTL,
+		checkpointInterval:        checkpointInterval,
+		expiredLeaseRetryInterval: expiredLeaseRetryInterval,
 		// expiredC is a small buffered chan to avoid unnecessary blocking.
 		expiredC: make(chan []*Lease, 16),
 		stopC:    make(chan struct{}),
@@ -278,7 +292,7 @@ func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) {
 
 	le.leaseMap[id] = l
 	item := &LeaseWithTime{id: l.ID, time: l.expiry.UnixNano()}
-	heap.Push(&le.leaseHeap, item)
+	le.leaseExpiredNotifier.RegisterOrUpdate(item)
 	l.persistTo(le.b)
 
 	leaseTotalTTLs.Observe(float64(l.ttl))
@@ -393,7 +407,7 @@ func (le *lessor) Renew(id LeaseID) (int64, error) {
 	le.mu.Lock()
 	l.refresh(0)
 	item := &LeaseWithTime{id: l.ID, time: l.expiry.UnixNano()}
-	heap.Push(&le.leaseHeap, item)
+	le.leaseExpiredNotifier.RegisterOrUpdate(item)
 	le.mu.Unlock()
 
 	leaseRenewed.Inc()
@@ -432,7 +446,7 @@ func (le *lessor) Promote(extend time.Duration) {
 	for _, l := range le.leaseMap {
 		l.refresh(extend)
 		item := &LeaseWithTime{id: l.ID, time: l.expiry.UnixNano()}
-		heap.Push(&le.leaseHeap, item)
+		le.leaseExpiredNotifier.RegisterOrUpdate(item)
 	}
 
 	if len(le.leaseMap) < leaseRevokeRate {
@@ -470,7 +484,7 @@ func (le *lessor) Promote(extend time.Duration) {
 		nextWindow = baseWindow + delay
 		l.refresh(delay + extend)
 		item := &LeaseWithTime{id: l.ID, time: l.expiry.UnixNano()}
-		heap.Push(&le.leaseHeap, item)
+		le.leaseExpiredNotifier.RegisterOrUpdate(item)
 		le.scheduleCheckpointIfNeeded(l)
 	}
 }
@@ -638,27 +652,28 @@ func (le *lessor) clearScheduledLeasesCheckpoints() {
 // It pops only when expiry item exists.
 // "next" is true, to indicate that it may exist in next attempt.
 func (le *lessor) expireExists() (l *Lease, ok bool, next bool) {
-	if le.leaseHeap.Len() == 0 {
+	if le.leaseExpiredNotifier.Len() == 0 {
 		return nil, false, false
 	}
 
-	item := le.leaseHeap[0]
+	item := le.leaseExpiredNotifier.Poll()
 	l = le.leaseMap[item.id]
 	if l == nil {
 		// lease has expired or been revoked
 		// no need to revoke (nothing is expiry)
-		heap.Pop(&le.leaseHeap) // O(log N)
+		le.leaseExpiredNotifier.Unregister() // O(log N)
 		return nil, false, true
 	}
-
-	if time.Now().UnixNano() < item.time /* expiration time */ {
+	now := time.Now()
+	if now.UnixNano() < item.time /* expiration time */ {
 		// Candidate expirations are caught up, reinsert this item
 		// and no need to revoke (nothing is expiry)
 		return l, false, false
 	}
-	// if the lease is actually expired, add to the removal list. If it is not expired, we can ignore it because another entry will have been inserted into the heap
 
-	heap.Pop(&le.leaseHeap) // O(log N)
+	// recheck if revoke is complete after retry interval
+	item.time = now.Add(le.expiredLeaseRetryInterval).UnixNano()
+	le.leaseExpiredNotifier.RegisterOrUpdate(item)
 	return l, true, false
 }
 
@@ -775,7 +790,7 @@ func (le *lessor) initAndRecover() {
 			revokec: make(chan struct{}),
 		}
 	}
-	heap.Init(&le.leaseHeap)
+	le.leaseExpiredNotifier.Init()
 	heap.Init(&le.leaseCheckpointHeap)
 	tx.Unlock()