Browse Source

compactor: clean up

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
Gyuho Lee 7 years ago
parent
commit
7ce69b256a
5 changed files with 58 additions and 56 deletions
  1. 0 2
      compactor/compactor.go
  2. 23 10
      compactor/periodic.go
  3. 5 14
      compactor/periodic_test.go
  4. 20 11
      compactor/revision.go
  5. 10 19
      compactor/revision_test.go

+ 0 - 2
compactor/compactor.go

@@ -29,8 +29,6 @@ var (
 )
 
 const (
-	checkCompactionInterval = 5 * time.Minute
-
 	ModePeriodic = "periodic"
 	ModeRevision = "revision"
 )

+ 23 - 10
compactor/periodic.go

@@ -46,30 +46,35 @@ type Periodic struct {
 // NewPeriodic creates a new instance of Periodic compactor that purges
 // the log older than h Duration.
 func NewPeriodic(h time.Duration, rg RevGetter, c Compactable) *Periodic {
-	return &Periodic{
-		clock:  clockwork.NewRealClock(),
+	return newPeriodic(clockwork.NewRealClock(), h, rg, c)
+}
+
+func newPeriodic(clock clockwork.Clock, h time.Duration, rg RevGetter, c Compactable) *Periodic {
+	t := &Periodic{
+		clock:  clock,
 		period: h,
 		rg:     rg,
 		c:      c,
+		revs:   make([]int64, 0),
 	}
+	t.ctx, t.cancel = context.WithCancel(context.Background())
+	return t
 }
 
 // periodDivisor divides Periodic.period in into checkCompactInterval duration
 const periodDivisor = 10
 
+// Run runs periodic compactor.
 func (t *Periodic) Run() {
-	t.ctx, t.cancel = context.WithCancel(context.Background())
-	t.revs = make([]int64, 0)
-	clock := t.clock
-	checkCompactInterval := t.period / time.Duration(periodDivisor)
+	interval := t.period / time.Duration(periodDivisor)
 	go func() {
-		last := clock.Now()
+		initialWait := t.clock.Now()
 		for {
 			t.revs = append(t.revs, t.rg.Rev())
 			select {
 			case <-t.ctx.Done():
 				return
-			case <-clock.After(checkCompactInterval):
+			case <-t.clock.After(interval):
 				t.mu.Lock()
 				p := t.paused
 				t.mu.Unlock()
@@ -77,36 +82,44 @@ func (t *Periodic) Run() {
 					continue
 				}
 			}
-			if clock.Now().Sub(last) < t.period {
+
+			// wait up to initial given period
+			if t.clock.Now().Sub(initialWait) < t.period {
 				continue
 			}
+
 			rev, remaining := t.getRev()
 			if rev < 0 {
 				continue
 			}
+
 			plog.Noticef("Starting auto-compaction at revision %d (retention: %v)", rev, t.period)
 			_, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev})
 			if err == nil || err == mvcc.ErrCompacted {
+				// move to next sliding window
 				t.revs = remaining
 				plog.Noticef("Finished auto-compaction at revision %d", rev)
 			} else {
 				plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err)
-				plog.Noticef("Retry after %v", checkCompactInterval)
+				plog.Noticef("Retry after %v", interval)
 			}
 		}
 	}()
 }
 
+// Stop stops periodic compactor.
 func (t *Periodic) Stop() {
 	t.cancel()
 }
 
+// Pause pauses periodic compactor.
 func (t *Periodic) Pause() {
 	t.mu.Lock()
 	defer t.mu.Unlock()
 	t.paused = true
 }
 
+// Resume resumes periodic compactor.
 func (t *Periodic) Resume() {
 	t.mu.Lock()
 	defer t.mu.Unlock()

+ 5 - 14
compactor/periodic_test.go

@@ -21,6 +21,7 @@ import (
 
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/pkg/testutil"
+
 	"github.com/jonboulle/clockwork"
 )
 
@@ -31,12 +32,7 @@ func TestPeriodic(t *testing.T) {
 	fc := clockwork.NewFakeClock()
 	rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
 	compactable := &fakeCompactable{testutil.NewRecorderStream()}
-	tb := &Periodic{
-		clock:  fc,
-		period: retentionDuration,
-		rg:     rg,
-		c:      compactable,
-	}
+	tb := newPeriodic(fc, retentionDuration, rg, compactable)
 
 	tb.Run()
 	defer tb.Stop()
@@ -70,15 +66,10 @@ func TestPeriodic(t *testing.T) {
 
 func TestPeriodicPause(t *testing.T) {
 	fc := clockwork.NewFakeClock()
-	compactable := &fakeCompactable{testutil.NewRecorderStream()}
-	rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
 	retentionDuration := time.Hour
-	tb := &Periodic{
-		clock:  fc,
-		period: retentionDuration,
-		rg:     rg,
-		c:      compactable,
-	}
+	rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
+	compactable := &fakeCompactable{testutil.NewRecorderStream()}
+	tb := newPeriodic(fc, retentionDuration, rg, compactable)
 
 	tb.Run()
 	tb.Pause()

+ 20 - 11
compactor/revision.go

@@ -17,6 +17,7 @@ package compactor
 import (
 	"context"
 	"sync"
+	"time"
 
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/mvcc"
@@ -43,25 +44,31 @@ type Revision struct {
 // NewRevision creates a new instance of Revisonal compactor that purges
 // the log older than retention revisions from the current revision.
 func NewRevision(retention int64, rg RevGetter, c Compactable) *Revision {
-	return &Revision{
-		clock:     clockwork.NewRealClock(),
+	return newRevision(clockwork.NewRealClock(), retention, rg, c)
+}
+
+func newRevision(clock clockwork.Clock, retention int64, rg RevGetter, c Compactable) *Revision {
+	t := &Revision{
+		clock:     clock,
 		retention: retention,
 		rg:        rg,
 		c:         c,
 	}
+	t.ctx, t.cancel = context.WithCancel(context.Background())
+	return t
 }
 
-func (t *Revision) Run() {
-	t.ctx, t.cancel = context.WithCancel(context.Background())
-	clock := t.clock
-	previous := int64(0)
+const revInterval = 5 * time.Minute
 
+// Run runs revision-based compactor.
+func (t *Revision) Run() {
+	prev := int64(0)
 	go func() {
 		for {
 			select {
 			case <-t.ctx.Done():
 				return
-			case <-clock.After(checkCompactionInterval):
+			case <-t.clock.After(revInterval):
 				t.mu.Lock()
 				p := t.paused
 				t.mu.Unlock()
@@ -71,34 +78,36 @@ func (t *Revision) Run() {
 			}
 
 			rev := t.rg.Rev() - t.retention
-
-			if rev <= 0 || rev == previous {
+			if rev <= 0 || rev == prev {
 				continue
 			}
 
 			plog.Noticef("Starting auto-compaction at revision %d (retention: %d revisions)", rev, t.retention)
 			_, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev})
 			if err == nil || err == mvcc.ErrCompacted {
-				previous = rev
+				prev = rev
 				plog.Noticef("Finished auto-compaction at revision %d", rev)
 			} else {
 				plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err)
-				plog.Noticef("Retry after %v", checkCompactionInterval)
+				plog.Noticef("Retry after %v", revInterval)
 			}
 		}
 	}()
 }
 
+// Stop stops revision-based compactor.
 func (t *Revision) Stop() {
 	t.cancel()
 }
 
+// Pause pauses revision-based compactor.
 func (t *Revision) Pause() {
 	t.mu.Lock()
 	defer t.mu.Unlock()
 	t.paused = true
 }
 
+// Resume resumes revision-based compactor.
 func (t *Revision) Resume() {
 	t.mu.Lock()
 	defer t.mu.Unlock()

+ 10 - 19
compactor/revision_test.go

@@ -21,6 +21,7 @@ import (
 
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/pkg/testutil"
+
 	"github.com/jonboulle/clockwork"
 )
 
@@ -28,23 +29,18 @@ func TestRevision(t *testing.T) {
 	fc := clockwork.NewFakeClock()
 	rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
 	compactable := &fakeCompactable{testutil.NewRecorderStream()}
-	tb := &Revision{
-		clock:     fc,
-		retention: 10,
-		rg:        rg,
-		c:         compactable,
-	}
+	tb := newRevision(fc, 10, rg, compactable)
 
 	tb.Run()
 	defer tb.Stop()
 
-	fc.Advance(checkCompactionInterval)
+	fc.Advance(revInterval)
 	rg.Wait(1)
 	// nothing happens
 
 	rg.SetRev(99) // will be 100
 	expectedRevision := int64(90)
-	fc.Advance(checkCompactionInterval)
+	fc.Advance(revInterval)
 	rg.Wait(1)
 	a, err := compactable.Wait(1)
 	if err != nil {
@@ -61,7 +57,7 @@ func TestRevision(t *testing.T) {
 
 	rg.SetRev(199) // will be 200
 	expectedRevision = int64(190)
-	fc.Advance(checkCompactionInterval)
+	fc.Advance(revInterval)
 	rg.Wait(1)
 	a, err = compactable.Wait(1)
 	if err != nil {
@@ -74,22 +70,17 @@ func TestRevision(t *testing.T) {
 
 func TestRevisionPause(t *testing.T) {
 	fc := clockwork.NewFakeClock()
-	compactable := &fakeCompactable{testutil.NewRecorderStream()}
 	rg := &fakeRevGetter{testutil.NewRecorderStream(), 99} // will be 100
-	tb := &Revision{
-		clock:     fc,
-		retention: 10,
-		rg:        rg,
-		c:         compactable,
-	}
+	compactable := &fakeCompactable{testutil.NewRecorderStream()}
+	tb := newRevision(fc, 10, rg, compactable)
 
 	tb.Run()
 	tb.Pause()
 
 	// tb will collect 3 hours of revisions but not compact since paused
-	n := int(time.Hour / checkCompactionInterval)
+	n := int(time.Hour / revInterval)
 	for i := 0; i < 3*n; i++ {
-		fc.Advance(checkCompactionInterval)
+		fc.Advance(revInterval)
 	}
 	// tb ends up waiting for the clock
 
@@ -103,7 +94,7 @@ func TestRevisionPause(t *testing.T) {
 	tb.Resume()
 
 	// unblock clock, will kick off a compaction at hour 3:05
-	fc.Advance(checkCompactionInterval)
+	fc.Advance(revInterval)
 	rg.Wait(1)
 	a, err := compactable.Wait(1)
 	if err != nil {