Преглед на файлове

Merge pull request #9418 from mgates/use_heap_to_track_lease_expirations

lease: Add a heap to optimize lease expiration checks
Gyuho Lee преди 7 години
родител
ревизия
2aa3decc38
променени са 4 файла, в които са добавени 205 реда и са изтрити 6 реда
  1. 1 0
      CHANGELOG-3.4.md
  2. 51 0
      lease/lease_queue.go
  3. 32 6
      lease/lessor.go
  4. 121 0
      lease/lessor_bench_test.go

+ 1 - 0
CHANGELOG-3.4.md

@@ -28,6 +28,7 @@ See [code changes](https://github.com/coreos/etcd/compare/v3.3.0...v3.4.0) and [
   - The retention window of compaction period moves for every given compaction period or hour.
   - For instance, when hourly writes are 100 and `--auto-compaction-mode=periodic --auto-compaction-retention=24h`, `v3.2.x`, `v3.3.0`, `v3.3.1`, and `v3.3.2` compact revision 2400, 2640, and 2880 for every 2.4-hour, while `v3.3.3` *or later* compacts revision 2400, 2500, 2600 for every 1-hour.
   - Futhermore, when `--auto-compaction-mode=periodic --auto-compaction-retention=30m` and writes per minute are about 1000, `v3.3.0`, `v3.3.1`, and `v3.3.2` compact revision 30000, 33000, and 36000, for every 3-minute, while `v3.3.3` *or later* compacts revision 30000, 60000, and 90000, for every 30-minute.
+- Improve [lease expire/revoke operation performance](https://github.com/coreos/etcd/pull/9418), address [lease scalability issue](https://github.com/coreos/etcd/issues/9496).
 - Make [Lease `Lookup` non-blocking with concurrent `Grant`/`Revoke`](https://github.com/coreos/etcd/pull/9229).
 
 ### Breaking Changes

+ 51 - 0
lease/lease_queue.go

@@ -0,0 +1,51 @@
+// Copyright 2018 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package lease
+
+type LeaseWithTime struct {
+	leaseId    LeaseID
+	expiration int64
+	index      int
+}
+
+type LeaseQueue []*LeaseWithTime
+
+func (pq LeaseQueue) Len() int { return len(pq) }
+
+func (pq LeaseQueue) Less(i, j int) bool {
+	return pq[i].expiration < pq[j].expiration
+}
+
+func (pq LeaseQueue) Swap(i, j int) {
+	pq[i], pq[j] = pq[j], pq[i]
+	pq[i].index = i
+	pq[j].index = j
+}
+
+func (pq *LeaseQueue) Push(x interface{}) {
+	n := len(*pq)
+	item := x.(*LeaseWithTime)
+	item.index = n
+	*pq = append(*pq, item)
+}
+
+func (pq *LeaseQueue) Pop() interface{} {
+	old := *pq
+	n := len(old)
+	item := old[n-1]
+	item.index = -1 // for safety
+	*pq = old[0 : n-1]
+	return item
+}

+ 32 - 6
lease/lessor.go

@@ -15,6 +15,7 @@
 package lease
 
 import (
+	"container/heap"
 	"encoding/binary"
 	"errors"
 	"math"
@@ -128,9 +129,9 @@ type lessor struct {
 	// We want to make Grant, Revoke, and findExpiredLeases all O(logN) and
 	// Renew O(1).
 	// findExpiredLeases and Renew should be the most frequent operations.
-	leaseMap map[LeaseID]*Lease
-
-	itemMap map[LeaseItem]LeaseID
+	leaseMap  map[LeaseID]*Lease
+	leaseHeap LeaseQueue
+	itemMap   map[LeaseItem]LeaseID
 
 	// When a lease expires, the lessor will delete the
 	// leased range (or key) by the RangeDeleter.
@@ -159,6 +160,7 @@ func newLessor(b backend.Backend, minLeaseTTL int64) *lessor {
 	l := &lessor{
 		leaseMap:    make(map[LeaseID]*Lease),
 		itemMap:     make(map[LeaseItem]LeaseID),
+		leaseHeap:   make(LeaseQueue, 0),
 		b:           b,
 		minLeaseTTL: minLeaseTTL,
 		// expiredC is a small buffered chan to avoid unnecessary blocking.
@@ -233,6 +235,8 @@ func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) {
 	}
 
 	le.leaseMap[id] = l
+	item := &LeaseWithTime{leaseId: l.ID, expiration: l.expiry.UnixNano()}
+	heap.Push(&le.leaseHeap, item)
 	l.persistTo(le.b)
 
 	return l, nil
@@ -315,6 +319,8 @@ func (le *lessor) Renew(id LeaseID) (int64, error) {
 	}
 
 	l.refresh(0)
+	item := &LeaseWithTime{leaseId: l.ID, expiration: l.expiry.UnixNano()}
+	heap.Push(&le.leaseHeap, item)
 	return l.ttl, nil
 }
 
@@ -349,6 +355,8 @@ func (le *lessor) Promote(extend time.Duration) {
 	// refresh the expiries of all leases.
 	for _, l := range le.leaseMap {
 		l.refresh(extend)
+		item := &LeaseWithTime{leaseId: l.ID, expiration: l.expiry.UnixNano()}
+		heap.Push(&le.leaseHeap, item)
 	}
 
 	if len(le.leaseMap) < leaseRevokeRate {
@@ -384,6 +392,8 @@ func (le *lessor) Promote(extend time.Duration) {
 		delay := time.Duration(rateDelay)
 		nextWindow = baseWindow + delay
 		l.refresh(delay + extend)
+		item := &LeaseWithTime{leaseId: l.ID, expiration: l.expiry.UnixNano()}
+		heap.Push(&le.leaseHeap, item)
 	}
 }
 
@@ -516,9 +526,24 @@ func (le *lessor) runLoop() {
 func (le *lessor) findExpiredLeases(limit int) []*Lease {
 	leases := make([]*Lease, 0, 16)
 
-	for _, l := range le.leaseMap {
-		// TODO: probably should change to <= 100-500 millisecond to
-		// make up committing latency.
+	for {
+		if le.leaseHeap.Len() == 0 {
+			break
+		}
+
+		item := heap.Pop(&le.leaseHeap).(*LeaseWithTime)
+		l := le.leaseMap[item.leaseId]
+		if l == nil {
+			// lease has expired or been revoked, continue
+			continue
+		}
+		if time.Now().UnixNano() < item.expiration {
+			// Candidate expirations are caught up, reinsert this item
+			heap.Push(&le.leaseHeap, item)
+			break
+		}
+		// if the lease is actually expired, add to the removal list. If it is not expired, we can ignore it because another entry will have been inserted into the heap
+
 		if l.expired() {
 			leases = append(leases, l)
 
@@ -560,6 +585,7 @@ func (le *lessor) initAndRecover() {
 			revokec: make(chan struct{}),
 		}
 	}
+	heap.Init(&le.leaseHeap)
 	tx.Unlock()
 
 	le.b.ForceCommit()

+ 121 - 0
lease/lessor_bench_test.go

@@ -0,0 +1,121 @@
+// Copyright 2018 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package lease
+
+import (
+	"os"
+	"testing"
+
+	"github.com/coreos/etcd/mvcc/backend"
+)
+
+func BenchmarkLessorFindExpired1(b *testing.B)       { benchmarkLessorFindExpired(1, b) }
+func BenchmarkLessorFindExpired10(b *testing.B)      { benchmarkLessorFindExpired(10, b) }
+func BenchmarkLessorFindExpired100(b *testing.B)     { benchmarkLessorFindExpired(100, b) }
+func BenchmarkLessorFindExpired1000(b *testing.B)    { benchmarkLessorFindExpired(1000, b) }
+func BenchmarkLessorFindExpired10000(b *testing.B)   { benchmarkLessorFindExpired(10000, b) }
+func BenchmarkLessorFindExpired100000(b *testing.B)  { benchmarkLessorFindExpired(100000, b) }
+func BenchmarkLessorFindExpired1000000(b *testing.B) { benchmarkLessorFindExpired(1000000, b) }
+
+func BenchmarkLessorGrant1(b *testing.B)       { benchmarkLessorGrant(1, b) }
+func BenchmarkLessorGrant10(b *testing.B)      { benchmarkLessorGrant(10, b) }
+func BenchmarkLessorGrant100(b *testing.B)     { benchmarkLessorGrant(100, b) }
+func BenchmarkLessorGrant1000(b *testing.B)    { benchmarkLessorGrant(1000, b) }
+func BenchmarkLessorGrant10000(b *testing.B)   { benchmarkLessorGrant(10000, b) }
+func BenchmarkLessorGrant100000(b *testing.B)  { benchmarkLessorGrant(100000, b) }
+func BenchmarkLessorGrant1000000(b *testing.B) { benchmarkLessorGrant(1000000, b) }
+
+func BenchmarkLessorRenew1(b *testing.B)       { benchmarkLessorRenew(1, b) }
+func BenchmarkLessorRenew10(b *testing.B)      { benchmarkLessorRenew(10, b) }
+func BenchmarkLessorRenew100(b *testing.B)     { benchmarkLessorRenew(100, b) }
+func BenchmarkLessorRenew1000(b *testing.B)    { benchmarkLessorRenew(1000, b) }
+func BenchmarkLessorRenew10000(b *testing.B)   { benchmarkLessorRenew(10000, b) }
+func BenchmarkLessorRenew100000(b *testing.B)  { benchmarkLessorRenew(100000, b) }
+func BenchmarkLessorRenew1000000(b *testing.B) { benchmarkLessorRenew(1000000, b) }
+
+func BenchmarkLessorRevoke1(b *testing.B)       { benchmarkLessorRevoke(1, b) }
+func BenchmarkLessorRevoke10(b *testing.B)      { benchmarkLessorRevoke(10, b) }
+func BenchmarkLessorRevoke100(b *testing.B)     { benchmarkLessorRevoke(100, b) }
+func BenchmarkLessorRevoke1000(b *testing.B)    { benchmarkLessorRevoke(1000, b) }
+func BenchmarkLessorRevoke10000(b *testing.B)   { benchmarkLessorRevoke(10000, b) }
+func BenchmarkLessorRevoke100000(b *testing.B)  { benchmarkLessorRevoke(100000, b) }
+func BenchmarkLessorRevoke1000000(b *testing.B) { benchmarkLessorRevoke(1000000, b) }
+
+func benchmarkLessorFindExpired(size int, b *testing.B) {
+	be, tmpPath := backend.NewDefaultTmpBackend()
+	le := newLessor(be, minLeaseTTL)
+	defer le.Stop()
+	defer cleanup(be, tmpPath)
+	le.Promote(0)
+	for i := 0; i < size; i++ {
+		le.Grant(LeaseID(i), int64(100+i))
+	}
+	le.mu.Lock() //Stop the findExpiredLeases call in the runloop
+	defer le.mu.Unlock()
+	b.ResetTimer()
+	for i := 0; i < b.N; i++ {
+		le.findExpiredLeases(1000)
+	}
+}
+
+func benchmarkLessorGrant(size int, b *testing.B) {
+	be, tmpPath := backend.NewDefaultTmpBackend()
+	le := newLessor(be, minLeaseTTL)
+	defer le.Stop()
+	defer cleanup(be, tmpPath)
+	for i := 0; i < size; i++ {
+		le.Grant(LeaseID(i), int64(100+i))
+	}
+	b.ResetTimer()
+	for i := 0; i < b.N; i++ {
+		le.Grant(LeaseID(i+size), int64(100+i+size))
+	}
+}
+
+func benchmarkLessorRevoke(size int, b *testing.B) {
+	be, tmpPath := backend.NewDefaultTmpBackend()
+	le := newLessor(be, minLeaseTTL)
+	defer le.Stop()
+	defer cleanup(be, tmpPath)
+	for i := 0; i < size; i++ {
+		le.Grant(LeaseID(i), int64(100+i))
+	}
+	for i := 0; i < b.N; i++ {
+		le.Grant(LeaseID(i+size), int64(100+i+size))
+	}
+	b.ResetTimer()
+	for i := 0; i < b.N; i++ {
+		le.Revoke(LeaseID(i + size))
+	}
+}
+
+func benchmarkLessorRenew(size int, b *testing.B) {
+	be, tmpPath := backend.NewDefaultTmpBackend()
+	le := newLessor(be, minLeaseTTL)
+	defer le.Stop()
+	defer cleanup(be, tmpPath)
+	for i := 0; i < size; i++ {
+		le.Grant(LeaseID(i), int64(100+i))
+	}
+	b.ResetTimer()
+	for i := 0; i < b.N; i++ {
+		le.Renew(LeaseID(i))
+	}
+}
+
+func cleanup(b backend.Backend, path string) {
+	b.Close()
+	os.Remove(path)
+}