فهرست منبع

Merge pull request #4188 from xiang90/lease_stop

*: stop lessor when etcdserver is stopped
Xiang Li 10 سال پیش
والد
کامیت
3aa5cee1f3
2فایلهای تغییر یافته به همراه31 افزوده شده و 4 حذف شده
  1. 4 1
      etcdserver/server.go
  2. 27 3
      lease/lessor.go

+ 4 - 1
etcdserver/server.go

@@ -541,8 +541,11 @@ func (s *EtcdServer) run() {
 
 	defer func() {
 		s.r.stop()
-		// kv and backend can be nil if running without v3 enabled
+		// kv, lessor and backend can be nil if running without v3 enabled
 		// or running unit tests.
+		if s.lessor != nil {
+			s.lessor.Stop()
+		}
 		if s.kv != nil {
 			s.kv.Close()
 		}

+ 27 - 3
lease/lessor.go

@@ -83,8 +83,12 @@ type Lessor interface {
 	// an error will be returned.
 	Renew(id LeaseID) (int64, error)
 
-	// ExpiredLeasesC returens a chan that is used to receive expired leases.
+	// ExpiredLeasesC returns a chan that is used to receive expired leases.
 	ExpiredLeasesC() <-chan []*Lease
+
+	// Stop stops the lessor for managing leases. The behavior of calling Stop multiple
+	// times is undefined.
+	Stop()
 }
 
 // lessor implements Lessor interface.
@@ -124,6 +128,10 @@ type lessor struct {
 	b backend.Backend
 
 	expiredC chan []*Lease
+	// stopC is a channel whose closure indicates that the lessor should be stopped.
+	stopC chan struct{}
+	// doneC is a channel whose closure indicates that the lessor is stopped.
+	doneC chan struct{}
 
 	idgen *idutil.Generator
 }
@@ -144,6 +152,8 @@ func newLessor(lessorID uint8, b backend.Backend) *lessor {
 		b:        b,
 		// expiredC is a small buffered chan to avoid unncessary blocking.
 		expiredC: make(chan []*Lease, 16),
+		stopC:    make(chan struct{}),
+		doneC:    make(chan struct{}),
 		idgen:    idutil.NewGenerator(lessorID, time.Now()),
 	}
 	l.initAndRecover()
@@ -284,8 +294,14 @@ func (le *lessor) ExpiredLeasesC() <-chan []*Lease {
 	return le.expiredC
 }
 
+func (le *lessor) Stop() {
+	close(le.stopC)
+	<-le.doneC
+}
+
 func (le *lessor) runLoop() {
-	// TODO: stop runLoop
+	defer close(le.doneC)
+
 	for {
 		var ls []*Lease
 
@@ -297,6 +313,8 @@ func (le *lessor) runLoop() {
 
 		if len(ls) != 0 {
 			select {
+			case <-le.stopC:
+				return
 			case le.expiredC <- ls:
 			default:
 				// the receiver of expiredC is probably busy handling
@@ -305,7 +323,11 @@ func (le *lessor) runLoop() {
 			}
 		}
 
-		time.Sleep(500 * time.Millisecond)
+		select {
+		case <-time.After(500 * time.Millisecond):
+		case <-le.stopC:
+			return
+		}
 	}
 }
 
@@ -441,3 +463,5 @@ func (fl *FakeLessor) Demote() {}
 func (fl *FakeLessor) Renew(id LeaseID) (int64, error) { return 10, nil }
 
 func (fl *FakeLessor) ExpiredLeasesC() <-chan []*Lease { return nil }
+
+func (fl *FakeLessor) Stop() {}