Browse Source

*: revoke expired leases

Xiang Li 10 years ago
parent
commit
2566699a48
3 changed files with 65 additions and 17 deletions
  1. 20 5
      etcdserver/server.go
  2. 40 10
      lease/lessor.go
  3. 5 2
      lease/lessor_test.go

+ 20 - 5
etcdserver/server.go

@@ -553,11 +553,26 @@ func (s *EtcdServer) run() {
 		<-appdonec
 		<-appdonec
 	}()
 	}()
 
 
-	select {
-	case err := <-s.errorc:
-		plog.Errorf("%s", err)
-		plog.Infof("the data-dir used by this member must be removed.")
-	case <-s.stop:
+	var expiredLeaseC <-chan []*lease.Lease
+	if s.lessor != nil {
+		expiredLeaseC = s.lessor.ExpiredLeasesC()
+	}
+
+	for {
+		select {
+		case leases := <-expiredLeaseC:
+			go func() {
+				for _, l := range leases {
+					s.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: int64(l.ID)})
+				}
+			}()
+		case err := <-s.errorc:
+			plog.Errorf("%s", err)
+			plog.Infof("the data-dir used by this member must be removed.")
+			return
+		case <-s.stop:
+			return
+		}
 	}
 	}
 }
 }
 
 

+ 40 - 10
lease/lessor.go

@@ -36,7 +36,9 @@ var (
 	minLeaseTerm = 5 * time.Second
 	minLeaseTerm = 5 * time.Second
 
 
 	leaseBucketName = []byte("lease")
 	leaseBucketName = []byte("lease")
-	ErrNotPrimary   = errors.New("not a primary lessor")
+	forever         = time.Unix(math.MaxInt64, 0)
+
+	ErrNotPrimary = errors.New("not a primary lessor")
 )
 )
 
 
 type LeaseID int64
 type LeaseID int64
@@ -69,6 +71,9 @@ type Lessor interface {
 	// Renew renews a lease with given ID.  If the ID does not exist, an error
 	// Renew renews a lease with given ID.  If the ID does not exist, an error
 	// will be returned.
 	// will be returned.
 	Renew(id LeaseID) error
 	Renew(id LeaseID) error
+
+	// ExpiredLeasesC returens a chan that is used to receive expired leases.
+	ExpiredLeasesC() <-chan []*Lease
 }
 }
 
 
 // lessor implements Lessor interface.
 // lessor implements Lessor interface.
@@ -108,6 +113,8 @@ type lessor struct {
 	// The leased items can be recovered by iterating all the keys in kv.
 	// The leased items can be recovered by iterating all the keys in kv.
 	b backend.Backend
 	b backend.Backend
 
 
+	expiredC chan []*Lease
+
 	idgen *idutil.Generator
 	idgen *idutil.Generator
 }
 }
 
 
@@ -126,6 +133,8 @@ func newLessor(lessorID uint8, b backend.Backend, dr DeleteableRange) *lessor {
 		leaseMap: make(map[LeaseID]*Lease),
 		leaseMap: make(map[LeaseID]*Lease),
 		b:        b,
 		b:        b,
 		dr:       dr,
 		dr:       dr,
+		// expiredC is a small buffered chan to avoid unncessary blocking.
+		expiredC: make(chan []*Lease, 16),
 		idgen:    idutil.NewGenerator(lessorID, time.Now()),
 		idgen:    idutil.NewGenerator(lessorID, time.Now()),
 	}
 	}
 	l.initAndRecover()
 	l.initAndRecover()
@@ -203,12 +212,22 @@ func (le *lessor) Promote() {
 	defer le.mu.Unlock()
 	defer le.mu.Unlock()
 
 
 	le.primary = true
 	le.primary = true
+
+	// refresh the expiries of all leases.
+	for _, l := range le.leaseMap {
+		l.expiry = minExpiry(time.Now(), time.Now().Add(time.Duration(l.TTL)*time.Second))
+	}
 }
 }
 
 
 func (le *lessor) Demote() {
 func (le *lessor) Demote() {
 	le.mu.Lock()
 	le.mu.Lock()
 	defer le.mu.Unlock()
 	defer le.mu.Unlock()
 
 
+	// set the expiries of all leases to forever
+	for _, l := range le.leaseMap {
+		l.expiry = forever
+	}
+
 	le.primary = false
 	le.primary = false
 }
 }
 
 
@@ -241,28 +260,38 @@ func (le *lessor) Recover(b backend.Backend, dr DeleteableRange) {
 	le.initAndRecover()
 	le.initAndRecover()
 }
 }
 
 
+func (le *lessor) ExpiredLeasesC() <-chan []*Lease {
+	return le.expiredC
+}
+
 func (le *lessor) runLoop() {
 func (le *lessor) runLoop() {
 	// TODO: stop runLoop
 	// TODO: stop runLoop
 	for {
 	for {
+		var ls []*Lease
+
 		le.mu.Lock()
 		le.mu.Lock()
 		if le.primary {
 		if le.primary {
-			le.revokeExpiredLeases(le.findExpiredLeases())
+			ls = le.findExpiredLeases()
 		}
 		}
 		le.mu.Unlock()
 		le.mu.Unlock()
+
+		if len(ls) != 0 {
+			select {
+			case le.expiredC <- ls:
+			default:
+				// the receiver of expiredC is probably busy handling
+				// other stuff
+				// let's try this next time after 500ms
+			}
+		}
+
 		time.Sleep(500 * time.Millisecond)
 		time.Sleep(500 * time.Millisecond)
 	}
 	}
 }
 }
 
 
-func (le *lessor) revokeExpiredLeases(expired []*Lease) {
-	// TODO: send revoke request to these expired lease through raft.
-}
-
 // findExpiredLeases loops all the leases in the leaseMap and returns the expired
 // findExpiredLeases loops all the leases in the leaseMap and returns the expired
 // leases that needed to be revoked.
 // leases that needed to be revoked.
 func (le *lessor) findExpiredLeases() []*Lease {
 func (le *lessor) findExpiredLeases() []*Lease {
-	le.mu.Lock()
-	defer le.mu.Unlock()
-
 	leases := make([]*Lease, 0, 16)
 	leases := make([]*Lease, 0, 16)
 	now := time.Now()
 	now := time.Now()
 
 
@@ -306,7 +335,8 @@ func (le *lessor) initAndRecover() {
 			TTL: lpb.TTL,
 			TTL: lpb.TTL,
 
 
 			// itemSet will be filled in when recover key-value pairs
 			// itemSet will be filled in when recover key-value pairs
-			expiry: minExpiry(time.Now(), time.Now().Add(time.Second*time.Duration(lpb.TTL))),
+			// set expiry to forever, refresh when promoted
+			expiry: forever,
 		}
 		}
 	}
 	}
 	tx.Unlock()
 	tx.Unlock()

+ 5 - 2
lease/lessor_test.go

@@ -114,12 +114,15 @@ func TestLessorRenew(t *testing.T) {
 	defer os.RemoveAll(dir)
 	defer os.RemoveAll(dir)
 
 
 	le := newLessor(1, be, &fakeDeleteable{})
 	le := newLessor(1, be, &fakeDeleteable{})
+	le.Promote()
 	l := le.Grant(5)
 	l := le.Grant(5)
 
 
 	// manually change the ttl field
 	// manually change the ttl field
 	l.TTL = 10
 	l.TTL = 10
-
-	le.Renew(l.ID)
+	err := le.Renew(l.ID)
+	if err != nil {
+		t.Fatalf("failed to renew lease (%v)", err)
+	}
 	l = le.get(l.ID)
 	l = le.get(l.ID)
 
 
 	if l.expiry.Sub(time.Now()) < 9*time.Second {
 	if l.expiry.Sub(time.Now()) < 9*time.Second {