Browse Source

lease: fix deadlock with renew lease when the checkpointor is set

Signed-off-by: nolouch <nolouch@gmail.com>
nolouch 6 years ago
parent
commit
e20b9d9e16
2 changed files with 53 additions and 8 deletions
  1. 9 8
      lease/lessor.go
  2. 44 0
      lease/lessor_test.go

+ 9 - 8
lease/lessor.go

@@ -349,13 +349,10 @@ func (le *lessor) Checkpoint(id LeaseID, remainingTTL int64) error {
 // Renew renews an existing lease. If the given lease does not exist or
 // Renew renews an existing lease. If the given lease does not exist or
 // has expired, an error will be returned.
 // has expired, an error will be returned.
 func (le *lessor) Renew(id LeaseID) (int64, error) {
 func (le *lessor) Renew(id LeaseID) (int64, error) {
-	le.mu.Lock()
-
-	unlock := func() { le.mu.Unlock() }
-	defer func() { unlock() }()
-
+	le.mu.RLock()
 	if !le.isPrimary() {
 	if !le.isPrimary() {
 		// forward renew request to primary instead of returning error.
 		// forward renew request to primary instead of returning error.
+		le.mu.RUnlock()
 		return -1, ErrNotPrimary
 		return -1, ErrNotPrimary
 	}
 	}
 
 
@@ -363,12 +360,14 @@ func (le *lessor) Renew(id LeaseID) (int64, error) {
 
 
 	l := le.leaseMap[id]
 	l := le.leaseMap[id]
 	if l == nil {
 	if l == nil {
+		le.mu.RUnlock()
 		return -1, ErrLeaseNotFound
 		return -1, ErrLeaseNotFound
 	}
 	}
+	// Clear remaining TTL when we renew if it is set
+	clearRemainingTTL := le.cp != nil && l.remainingTTL > 0
 
 
+	le.mu.RUnlock()
 	if l.expired() {
 	if l.expired() {
-		le.mu.Unlock()
-		unlock = func() {}
 		select {
 		select {
 		// A expired lease might be pending for revoking or going through
 		// A expired lease might be pending for revoking or going through
 		// quorum to be revoked. To be accurate, renew request must wait for the
 		// quorum to be revoked. To be accurate, renew request must wait for the
@@ -387,13 +386,15 @@ func (le *lessor) Renew(id LeaseID) (int64, error) {
 	// Clear remaining TTL when we renew if it is set
 	// Clear remaining TTL when we renew if it is set
 	// By applying a RAFT entry only when the remainingTTL is already set, we limit the number
 	// By applying a RAFT entry only when the remainingTTL is already set, we limit the number
 	// of RAFT entries written per lease to a max of 2 per checkpoint interval.
 	// of RAFT entries written per lease to a max of 2 per checkpoint interval.
-	if le.cp != nil && l.remainingTTL > 0 {
+	if clearRemainingTTL {
 		le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: []*pb.LeaseCheckpoint{{ID: int64(l.ID), Remaining_TTL: 0}}})
 		le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: []*pb.LeaseCheckpoint{{ID: int64(l.ID), Remaining_TTL: 0}}})
 	}
 	}
 
 
+	le.mu.Lock()
 	l.refresh(0)
 	l.refresh(0)
 	item := &LeaseWithTime{id: l.ID, time: l.expiry.UnixNano()}
 	item := &LeaseWithTime{id: l.ID, time: l.expiry.UnixNano()}
 	heap.Push(&le.leaseHeap, item)
 	heap.Push(&le.leaseHeap, item)
+	le.mu.Unlock()
 
 
 	leaseRenewed.Inc()
 	leaseRenewed.Inc()
 	return l.ttl, nil
 	return l.ttl, nil

+ 44 - 0
lease/lessor_test.go

@@ -236,6 +236,50 @@ func TestLessorRenew(t *testing.T) {
 	}
 	}
 }
 }
 
 
+func TestLessorRenewWithCheckpointer(t *testing.T) {
+	lg := zap.NewNop()
+	dir, be := NewTestBackend(t)
+	defer be.Close()
+	defer os.RemoveAll(dir)
+
+	le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
+	fakerCheckerpointer := func(ctx context.Context, cp *pb.LeaseCheckpointRequest) {
+		for _, cp := range cp.GetCheckpoints() {
+			le.Checkpoint(LeaseID(cp.GetID()), cp.GetRemaining_TTL())
+		}
+	}
+	defer le.Stop()
+	// Set checkpointer
+	le.SetCheckpointer(fakerCheckerpointer)
+	le.Promote(0)
+
+	l, err := le.Grant(1, minLeaseTTL)
+	if err != nil {
+		t.Fatalf("failed to grant lease (%v)", err)
+	}
+
+	// manually change the ttl field
+	le.mu.Lock()
+	l.ttl = 10
+	l.remainingTTL = 10
+	le.mu.Unlock()
+	ttl, err := le.Renew(l.ID)
+	if err != nil {
+		t.Fatalf("failed to renew lease (%v)", err)
+	}
+	if ttl != l.ttl {
+		t.Errorf("ttl = %d, want %d", ttl, l.ttl)
+	}
+	if l.remainingTTL != 0 {
+		t.Fatalf("remianingTTL = %d, want %d", l.remainingTTL, 0)
+	}
+
+	l = le.Lookup(l.ID)
+	if l.Remaining() < 9*time.Second {
+		t.Errorf("failed to renew the lease")
+	}
+}
+
 // TestLessorRenewExtendPileup ensures Lessor extends leases on promotion if too many
 // TestLessorRenewExtendPileup ensures Lessor extends leases on promotion if too many
 // expire at the same time.
 // expire at the same time.
 func TestLessorRenewExtendPileup(t *testing.T) {
 func TestLessorRenewExtendPileup(t *testing.T) {