Browse Source

Merge pull request #4167 from xiang90/lease_promote

*: expose Lessor Promote and Demote interface
Xiang Li 10 years ago
parent
commit
7de0e9130c
2 changed files with 75 additions and 0 deletions
  1. 8 0
      etcdserver/raft.go
  2. 67 0
      lease/lessor.go

+ 8 - 0
etcdserver/raft.go

@@ -148,13 +148,21 @@ func (r *raftNode) start(s *EtcdServer) {
 					}
 					atomic.StoreUint64(&r.lead, rd.SoftState.Lead)
 					if rd.RaftState == raft.StateLeader {
+						// TODO: raft should send server a notification through chan when
+						// it promotes or demotes instead of modifying server directly.
 						syncC = r.s.SyncTicker
+						if r.s.lessor != nil {
+							r.s.lessor.Promote()
+						}
 						// TODO: remove the nil checking
 						// current test utility does not provide the stats
 						if r.s.stats != nil {
 							r.s.stats.BecomeLeader()
 						}
 					} else {
+						if r.s.lessor != nil {
+							r.s.lessor.Demote()
+						}
 						syncC = nil
 					}
 				}

+ 67 - 0
lease/lessor.go

@@ -16,6 +16,7 @@ package lease
 
 import (
 	"encoding/binary"
+	"errors"
 	"fmt"
 	"math"
 	"sync"
@@ -35,6 +36,7 @@ var (
 	minLeaseTerm = 5 * time.Second
 
 	leaseBucketName = []byte("lease")
+	ErrNotPrimary   = errors.New("not a primary lessor")
 )
 
 type LeaseID int64
@@ -56,12 +58,39 @@ type Lessor interface {
 	// given lease will be removed. If the ID does not exist, an error
 	// will be returned.
 	Revoke(id LeaseID) error
+
+	// Promote promotes the lessor to be the primary lessor. Primary lessor manages
+	// the expiration and renew of leases.
+	Promote()
+
+	// Demote demotes the lessor from being the primary lessor.
+	Demote()
+
+	// Renew renews a lease with given ID.  If the ID does not exist, an error
+	// will be returned.
+	Renew(id LeaseID) error
 }
 
 // lessor implements Lessor interface.
 // TODO: use clockwork for testability.
 type lessor struct {
 	mu sync.Mutex
+
+	// primary indicates if this lessor is the primary lessor. The primary
+	// lessor manages lease expiration and renew.
+	//
+	// in etcd, raft leader is the primary. Thus there might be two primary
+	// leaders at the same time (raft allows concurrent leader but with different term)
+	// for at most a leader election timeout.
+	// The old primary leader cannot affect the correctness since its proposal has a
+	// smaller term and will not be committed.
+	//
+	// TODO: raft follower do not forward lease management proposals. There might be a
+	// very small window (within second normally which depends on go scheduling) that
+	// a raft follow is the primary between the raft leader demotion and lessor demotion.
+	// Usually this should not be a problem. Lease should not be that sensitive to timing.
+	primary bool
+
 	// TODO: probably this should be a heap with a secondary
 	// id index.
 	// Now it is O(N) to loop over the leases to find expired ones.
@@ -101,6 +130,8 @@ func newLessor(lessorID uint8, b backend.Backend, dr DeleteableRange) *lessor {
 	}
 	l.initAndRecover()
 
+	go l.runLoop()
+
 	return l
 }
 
@@ -153,6 +184,10 @@ func (le *lessor) Renew(id LeaseID) error {
 	le.mu.Lock()
 	defer le.mu.Unlock()
 
+	if !le.primary {
+		return ErrNotPrimary
+	}
+
 	l := le.leaseMap[id]
 	if l == nil {
 		return fmt.Errorf("lease: cannot find lease %x", id)
@@ -163,6 +198,20 @@ func (le *lessor) Renew(id LeaseID) error {
 	return nil
 }
 
+func (le *lessor) Promote() {
+	le.mu.Lock()
+	defer le.mu.Unlock()
+
+	le.primary = true
+}
+
+func (le *lessor) Demote() {
+	le.mu.Lock()
+	defer le.mu.Unlock()
+
+	le.primary = false
+}
+
 // Attach attaches items to the lease with given ID. When the lease
 // expires, the attached items will be automatically removed.
 // If the given lease does not exist, an error will be returned.
@@ -192,6 +241,22 @@ func (le *lessor) Recover(b backend.Backend, dr DeleteableRange) {
 	le.initAndRecover()
 }
 
+func (le *lessor) runLoop() {
+	// TODO: stop runLoop
+	for {
+		le.mu.Lock()
+		if le.primary {
+			le.revokeExpiredLeases(le.findExpiredLeases())
+		}
+		le.mu.Unlock()
+		time.Sleep(500 * time.Millisecond)
+	}
+}
+
+func (le *lessor) revokeExpiredLeases(expired []*Lease) {
+	// TODO: send revoke request to these expired lease through raft.
+}
+
 // findExpiredLeases loops all the leases in the leaseMap and returns the expired
 // leases that needed to be revoked.
 func (le *lessor) findExpiredLeases() []*Lease {
@@ -202,6 +267,8 @@ func (le *lessor) findExpiredLeases() []*Lease {
 	now := time.Now()
 
 	for _, l := range le.leaseMap {
+		// TODO: probably should change to <= 100-500 millisecond to
+		// make up committing latency.
 		if l.expiry.Sub(now) <= 0 {
 			leases = append(leases, l)
 		}