Browse Source

Merge pull request #6570 from xiang90/lease_expire

Fix lease expire
Xiang Li 9 years ago
parent
commit
f0469f7f25
4 changed files with 174 additions and 29 deletions
  1. 1 5
      etcdserver/raft.go
  2. 5 0
      etcdserver/server.go
  3. 66 24
      lease/lessor.go
  4. 102 0
      lease/lessor_test.go

+ 1 - 5
etcdserver/raft.go

@@ -172,12 +172,8 @@ func (r *raftNode) start(s *EtcdServer) {
 					atomic.StoreUint64(&r.lead, rd.SoftState.Lead)
 					if rd.RaftState == raft.StateLeader {
 						islead = true
-						// TODO: raft should send server a notification through chan when
-						// it promotes or demotes instead of modifying server directly.
 						syncC = r.s.SyncTicker
-						if r.s.lessor != nil {
-							r.s.lessor.Promote(r.s.Cfg.electionTimeout())
-						}
+
 						// TODO: remove the nil checking
 						// current test utility does not provide the stats
 						if r.s.stats != nil {

+ 5 - 0
etcdserver/server.go

@@ -1210,6 +1210,11 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
 		case s.forceVersionC <- struct{}{}:
 		default:
 		}
+		// promote lessor when the local member is leader and finished
+		// applying all entries from the last term.
+		if s.isLeader() {
+			s.lessor.Promote(s.Cfg.electionTimeout())
+		}
 		return
 	}
 

+ 66 - 24
lease/lessor.go

@@ -111,20 +111,9 @@ type Lessor interface {
 type lessor struct {
 	mu sync.Mutex
 
-	// primary indicates if this lessor is the primary lessor. The primary
-	// lessor manages lease expiration and renew.
-	//
-	// in etcd, raft leader is the primary. Thus there might be two primary
-	// leaders at the same time (raft allows concurrent leader but with different term)
-	// for at most a leader election timeout.
-	// The old primary leader cannot affect the correctness since its proposal has a
-	// smaller term and will not be committed.
-	//
-	// TODO: raft follower do not forward lease management proposals. There might be a
-	// very small window (within second normally which depends on go scheduling) that
-	// a raft follow is the primary between the raft leader demotion and lessor demotion.
-	// Usually this should not be a problem. Lease should not be that sensitive to timing.
-	primary bool
+	// demotec is set when the lessor is the primary.
+	// demotec will be closed if the lessor is demoted.
+	demotec chan struct{}
 
 	// TODO: probably this should be a heap with a secondary
 	// id index.
@@ -174,6 +163,23 @@ func newLessor(b backend.Backend, minLeaseTTL int64) *lessor {
 	return l
 }
 
+// isPrimary indicates if this lessor is the primary lessor. The primary
+// lessor manages lease expiration and renew.
+//
+// in etcd, raft leader is the primary. Thus there might be two primary
+// leaders at the same time (raft allows concurrent leader but with different term)
+// for at most a leader election timeout.
+// The old primary leader cannot affect the correctness since its proposal has a
+// smaller term and will not be committed.
+//
+// TODO: raft follower do not forward lease management proposals. There might be a
+// very small window (within second normally which depends on go scheduling) that
+// a raft follow is the primary between the raft leader demotion and lessor demotion.
+// Usually this should not be a problem. Lease should not be that sensitive to timing.
+func (le *lessor) isPrimary() bool {
+	return le.demotec != nil
+}
+
 func (le *lessor) SetRangeDeleter(rd RangeDeleter) {
 	le.mu.Lock()
 	defer le.mu.Unlock()
@@ -188,7 +194,12 @@ func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) {
 
 	// TODO: when lessor is under high load, it should give out lease
 	// with longer TTL to reduce renew load.
-	l := &Lease{ID: id, TTL: ttl, itemSet: make(map[LeaseItem]struct{})}
+	l := &Lease{
+		ID:      id,
+		TTL:     ttl,
+		itemSet: make(map[LeaseItem]struct{}),
+		revokec: make(chan struct{}),
+	}
 
 	le.mu.Lock()
 	defer le.mu.Unlock()
@@ -201,7 +212,7 @@ func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) {
 		l.TTL = le.minLeaseTTL
 	}
 
-	if le.primary {
+	if le.isPrimary() {
 		l.refresh(0)
 	} else {
 		l.forever()
@@ -221,6 +232,7 @@ func (le *lessor) Revoke(id LeaseID) error {
 		le.mu.Unlock()
 		return ErrLeaseNotFound
 	}
+	defer close(l.revokec)
 	// unlock before doing external work
 	le.mu.Unlock()
 
@@ -264,18 +276,40 @@ func (le *lessor) Revoke(id LeaseID) error {
 // has expired, an error will be returned.
 func (le *lessor) Renew(id LeaseID) (int64, error) {
 	le.mu.Lock()
-	defer le.mu.Unlock()
 
-	if !le.primary {
+	unlock := func() { le.mu.Unlock() }
+	defer func() { unlock() }()
+
+	if !le.isPrimary() {
 		// forward renew request to primary instead of returning error.
 		return -1, ErrNotPrimary
 	}
 
+	demotec := le.demotec
+
 	l := le.leaseMap[id]
 	if l == nil {
 		return -1, ErrLeaseNotFound
 	}
 
+	if l.expired() {
+		le.mu.Unlock()
+		unlock = func() {}
+		select {
+		// A expired lease might be pending for revoking or going through
+		// quorum to be revoked. To be accurate, renew request must wait for the
+		// deletion to complete.
+		case <-l.revokec:
+			return -1, ErrLeaseNotFound
+		// The expired lease might fail to be revoked if the primary changes.
+		// The caller will retry on ErrNotPrimary.
+		case <-demotec:
+			return -1, ErrNotPrimary
+		case <-le.stopC:
+			return -1, ErrNotPrimary
+		}
+	}
+
 	l.refresh(0)
 	return l.TTL, nil
 }
@@ -290,7 +324,7 @@ func (le *lessor) Promote(extend time.Duration) {
 	le.mu.Lock()
 	defer le.mu.Unlock()
 
-	le.primary = true
+	le.demotec = make(chan struct{})
 
 	// refresh the expiries of all leases.
 	for _, l := range le.leaseMap {
@@ -307,7 +341,10 @@ func (le *lessor) Demote() {
 		l.forever()
 	}
 
-	le.primary = false
+	if le.demotec != nil {
+		close(le.demotec)
+		le.demotec = nil
+	}
 }
 
 // Attach attaches items to the lease with given ID. When the lease
@@ -372,7 +409,7 @@ func (le *lessor) runLoop() {
 		var ls []*Lease
 
 		le.mu.Lock()
-		if le.primary {
+		if le.isPrimary() {
 			ls = le.findExpiredLeases()
 		}
 		le.mu.Unlock()
@@ -401,12 +438,11 @@ func (le *lessor) runLoop() {
 // leases that needed to be revoked.
 func (le *lessor) findExpiredLeases() []*Lease {
 	leases := make([]*Lease, 0, 16)
-	now := time.Now()
 
 	for _, l := range le.leaseMap {
 		// TODO: probably should change to <= 100-500 millisecond to
 		// make up committing latency.
-		if l.expiry.Sub(now) <= 0 {
+		if l.expired() {
 			leases = append(leases, l)
 		}
 	}
@@ -439,6 +475,7 @@ func (le *lessor) initAndRecover() {
 			// set expiry to forever, refresh when promoted
 			itemSet: make(map[LeaseItem]struct{}),
 			expiry:  forever,
+			revokec: make(chan struct{}),
 		}
 	}
 	tx.Unlock()
@@ -452,7 +489,12 @@ type Lease struct {
 
 	itemSet map[LeaseItem]struct{}
 	// expiry time in unixnano
-	expiry time.Time
+	expiry  time.Time
+	revokec chan struct{}
+}
+
+func (l Lease) expired() bool {
+	return l.Remaining() <= 0
 }
 
 func (l Lease) persistTo(b backend.Backend) {

+ 102 - 0
lease/lessor_test.go

@@ -221,6 +221,108 @@ func TestLessorRecover(t *testing.T) {
 	}
 }
 
+func TestLessorExpire(t *testing.T) {
+	dir, be := NewTestBackend(t)
+	defer os.RemoveAll(dir)
+	defer be.Close()
+
+	testMinTTL := int64(1)
+
+	le := newLessor(be, testMinTTL)
+	defer le.Stop()
+
+	le.Promote(1 * time.Second)
+	l, err := le.Grant(1, testMinTTL)
+	if err != nil {
+		t.Fatalf("failed to create lease: %v", err)
+	}
+
+	select {
+	case el := <-le.ExpiredLeasesC():
+		if el[0].ID != l.ID {
+			t.Fatalf("expired id = %x, want %x", el[0].ID, l.ID)
+		}
+	case <-time.After(10 * time.Second):
+		t.Fatalf("failed to receive expired lease")
+	}
+
+	donec := make(chan struct{})
+	go func() {
+		// expired lease cannot be renewed
+		if _, err := le.Renew(l.ID); err != ErrLeaseNotFound {
+			t.Fatalf("unexpected renew")
+		}
+		donec <- struct{}{}
+	}()
+
+	select {
+	case <-donec:
+		t.Fatalf("renew finished before lease revocation")
+	case <-time.After(50 * time.Millisecond):
+	}
+
+	// expired lease can be revoked
+	if err := le.Revoke(l.ID); err != nil {
+		t.Fatalf("failed to revoke expired lease: %v", err)
+	}
+
+	select {
+	case <-donec:
+	case <-time.After(10 * time.Second):
+		t.Fatalf("renew has not returned after lease revocation")
+	}
+}
+
+func TestLessorExpireAndDemote(t *testing.T) {
+	dir, be := NewTestBackend(t)
+	defer os.RemoveAll(dir)
+	defer be.Close()
+
+	testMinTTL := int64(1)
+
+	le := newLessor(be, testMinTTL)
+	defer le.Stop()
+
+	le.Promote(1 * time.Second)
+	l, err := le.Grant(1, testMinTTL)
+	if err != nil {
+		t.Fatalf("failed to create lease: %v", err)
+	}
+
+	select {
+	case el := <-le.ExpiredLeasesC():
+		if el[0].ID != l.ID {
+			t.Fatalf("expired id = %x, want %x", el[0].ID, l.ID)
+		}
+	case <-time.After(10 * time.Second):
+		t.Fatalf("failed to receive expired lease")
+	}
+
+	donec := make(chan struct{})
+	go func() {
+		// expired lease cannot be renewed
+		if _, err := le.Renew(l.ID); err != ErrNotPrimary {
+			t.Fatalf("unexpected renew: %v", err)
+		}
+		donec <- struct{}{}
+	}()
+
+	select {
+	case <-donec:
+		t.Fatalf("renew finished before demotion")
+	case <-time.After(50 * time.Millisecond):
+	}
+
+	// demote will cause the renew request to fail with ErrNotPrimary
+	le.Demote()
+
+	select {
+	case <-donec:
+	case <-time.After(10 * time.Second):
+		t.Fatalf("renew has not returned after lessor demotion")
+	}
+}
+
 type fakeDeleter struct {
 	deleted []string
 }