Browse Source

compactor: support structured logger

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
Gyuho Lee 7 years ago
parent
commit
f269c42aad
5 changed files with 153 additions and 88 deletions
  1. 12 3
      compactor/compactor.go
  2. 75 49
      compactor/periodic.go
  3. 4 3
      compactor/periodic_test.go
  4. 59 31
      compactor/revision.go
  5. 3 2
      compactor/revision_test.go

+ 12 - 3
compactor/compactor.go

@@ -22,6 +22,8 @@ import (
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 
 	"github.com/coreos/pkg/capnslog"
+	"github.com/jonboulle/clockwork"
+	"go.uber.org/zap"
 )
 
 var (
@@ -54,12 +56,19 @@ type RevGetter interface {
 	Rev() int64
 }
 
-func New(mode string, retention time.Duration, rg RevGetter, c Compactable) (Compactor, error) {
+// New returns a new Compactor based on given "mode".
+func New(
+	lg *zap.Logger,
+	mode string,
+	retention time.Duration,
+	rg RevGetter,
+	c Compactable,
+) (Compactor, error) {
 	switch mode {
 	case ModePeriodic:
-		return NewPeriodic(retention, rg, c), nil
+		return newPeriodic(lg, clockwork.NewRealClock(), retention, rg, c), nil
 	case ModeRevision:
-		return NewRevision(int64(retention), rg, c), nil
+		return newRevision(lg, clockwork.NewRealClock(), int64(retention), rg, c), nil
 	default:
 		return nil, fmt.Errorf("unsupported compaction mode %s", mode)
 	}

+ 75 - 49
compactor/periodic.go

@@ -23,11 +23,13 @@ import (
 	"github.com/coreos/etcd/mvcc"
 
 	"github.com/jonboulle/clockwork"
+	"go.uber.org/zap"
 )
 
 // Periodic compacts the log by purging revisions older than
 // the configured retention time.
 type Periodic struct {
+	lg     *zap.Logger
 	clock  clockwork.Clock
 	period time.Duration
 
@@ -43,22 +45,19 @@ type Periodic struct {
 	paused bool
 }
 
-// NewPeriodic creates a new instance of Periodic compactor that purges
+// newPeriodic creates a new instance of Periodic compactor that purges
 // the log older than h Duration.
-func NewPeriodic(h time.Duration, rg RevGetter, c Compactable) *Periodic {
-	return newPeriodic(clockwork.NewRealClock(), h, rg, c)
-}
-
-func newPeriodic(clock clockwork.Clock, h time.Duration, rg RevGetter, c Compactable) *Periodic {
-	t := &Periodic{
+func newPeriodic(lg *zap.Logger, clock clockwork.Clock, h time.Duration, rg RevGetter, c Compactable) *Periodic {
+	pc := &Periodic{
+		lg:     lg,
 		clock:  clock,
 		period: h,
 		rg:     rg,
 		c:      c,
 		revs:   make([]int64, 0),
 	}
-	t.ctx, t.cancel = context.WithCancel(context.Background())
-	return t
+	pc.ctx, pc.cancel = context.WithCancel(context.Background())
+	return pc
 }
 
 /*
@@ -96,50 +95,77 @@ Compaction period 5-sec:
 */
 
 // Run runs periodic compactor.
-func (t *Periodic) Run() {
-	compactInterval := t.getCompactInterval()
-	retryInterval := t.getRetryInterval()
-	retentions := t.getRetentions()
+func (pc *Periodic) Run() {
+	compactInterval := pc.getCompactInterval()
+	retryInterval := pc.getRetryInterval()
+	retentions := pc.getRetentions()
 
 	go func() {
-		lastSuccess := t.clock.Now()
-		baseInterval := t.period
+		lastSuccess := pc.clock.Now()
+		baseInterval := pc.period
 		for {
-			t.revs = append(t.revs, t.rg.Rev())
-			if len(t.revs) > retentions {
-				t.revs = t.revs[1:] // t.revs[0] is always the rev at t.period ago
+			pc.revs = append(pc.revs, pc.rg.Rev())
+			if len(pc.revs) > retentions {
+				pc.revs = pc.revs[1:] // pc.revs[0] is always the rev at pc.period ago
 			}
 
 			select {
-			case <-t.ctx.Done():
+			case <-pc.ctx.Done():
 				return
-			case <-t.clock.After(retryInterval):
-				t.mu.Lock()
-				p := t.paused
-				t.mu.Unlock()
+			case <-pc.clock.After(retryInterval):
+				pc.mu.Lock()
+				p := pc.paused
+				pc.mu.Unlock()
 				if p {
 					continue
 				}
 			}
 
-			if t.clock.Now().Sub(lastSuccess) < baseInterval {
+			if pc.clock.Now().Sub(lastSuccess) < baseInterval {
 				continue
 			}
 
 			// wait up to initial given period
-			if baseInterval == t.period {
+			if baseInterval == pc.period {
 				baseInterval = compactInterval
 			}
-			rev := t.revs[0]
-
-			plog.Noticef("Starting auto-compaction at revision %d (retention: %v)", rev, t.period)
-			_, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev})
+			rev := pc.revs[0]
+
+			if pc.lg != nil {
+				pc.lg.Info(
+					"starting auto periodic compaction",
+					zap.Int64("revision", rev),
+					zap.Duration("compact-period", pc.period),
+				)
+			} else {
+				plog.Noticef("Starting auto-compaction at revision %d (retention: %v)", rev, pc.period)
+			}
+			_, err := pc.c.Compact(pc.ctx, &pb.CompactionRequest{Revision: rev})
 			if err == nil || err == mvcc.ErrCompacted {
-				lastSuccess = t.clock.Now()
-				plog.Noticef("Finished auto-compaction at revision %d", rev)
+				if pc.lg != nil {
+					pc.lg.Info(
+						"completed auto periodic compaction",
+						zap.Int64("revision", rev),
+						zap.Duration("compact-period", pc.period),
+						zap.Duration("took", time.Since(lastSuccess)),
+					)
+				} else {
+					plog.Noticef("Finished auto-compaction at revision %d", rev)
+				}
+				lastSuccess = pc.clock.Now()
 			} else {
-				plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err)
-				plog.Noticef("Retry after %v", retryInterval)
+				if pc.lg != nil {
+					pc.lg.Warn(
+						"failed auto periodic compaction",
+						zap.Int64("revision", rev),
+						zap.Duration("compact-period", pc.period),
+						zap.Duration("retry-interval", retryInterval),
+						zap.Error(err),
+					)
+				} else {
+					plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err)
+					plog.Noticef("Retry after %v", retryInterval)
+				}
 			}
 		}
 	}()
@@ -149,22 +175,22 @@ func (t *Periodic) Run() {
 // (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='10m', then compact every 10-minute)
 // if given compaction period x is >1-hour, compact every hour.
 // (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='2h', then compact every 1-hour)
-func (t *Periodic) getCompactInterval() time.Duration {
-	itv := t.period
+func (pc *Periodic) getCompactInterval() time.Duration {
+	itv := pc.period
 	if itv > time.Hour {
 		itv = time.Hour
 	}
 	return itv
 }
 
-func (t *Periodic) getRetentions() int {
-	return int(t.period/t.getRetryInterval()) + 1
+func (pc *Periodic) getRetentions() int {
+	return int(pc.period/pc.getRetryInterval()) + 1
 }
 
 const retryDivisor = 10
 
-func (t *Periodic) getRetryInterval() time.Duration {
-	itv := t.period
+func (pc *Periodic) getRetryInterval() time.Duration {
+	itv := pc.period
 	if itv > time.Hour {
 		itv = time.Hour
 	}
@@ -172,20 +198,20 @@ func (t *Periodic) getRetryInterval() time.Duration {
 }
 
 // Stop stops periodic compactor.
-func (t *Periodic) Stop() {
-	t.cancel()
+func (pc *Periodic) Stop() {
+	pc.cancel()
 }
 
 // Pause pauses periodic compactor.
-func (t *Periodic) Pause() {
-	t.mu.Lock()
-	defer t.mu.Unlock()
-	t.paused = true
+func (pc *Periodic) Pause() {
+	pc.mu.Lock()
+	pc.paused = true
+	pc.mu.Unlock()
 }
 
 // Resume resumes periodic compactor.
-func (t *Periodic) Resume() {
-	t.mu.Lock()
-	defer t.mu.Unlock()
-	t.paused = false
+func (pc *Periodic) Resume() {
+	pc.mu.Lock()
+	pc.paused = false
+	pc.mu.Unlock()
 }

+ 4 - 3
compactor/periodic_test.go

@@ -23,6 +23,7 @@ import (
 	"github.com/coreos/etcd/pkg/testutil"
 
 	"github.com/jonboulle/clockwork"
+	"go.uber.org/zap"
 )
 
 func TestPeriodicHourly(t *testing.T) {
@@ -32,7 +33,7 @@ func TestPeriodicHourly(t *testing.T) {
 	fc := clockwork.NewFakeClock()
 	rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
 	compactable := &fakeCompactable{testutil.NewRecorderStream()}
-	tb := newPeriodic(fc, retentionDuration, rg, compactable)
+	tb := newPeriodic(zap.NewExample(), fc, retentionDuration, rg, compactable)
 
 	tb.Run()
 	defer tb.Stop()
@@ -83,7 +84,7 @@ func TestPeriodicMinutes(t *testing.T) {
 	fc := clockwork.NewFakeClock()
 	rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
 	compactable := &fakeCompactable{testutil.NewRecorderStream()}
-	tb := newPeriodic(fc, retentionDuration, rg, compactable)
+	tb := newPeriodic(zap.NewExample(), fc, retentionDuration, rg, compactable)
 
 	tb.Run()
 	defer tb.Stop()
@@ -131,7 +132,7 @@ func TestPeriodicPause(t *testing.T) {
 	retentionDuration := time.Hour
 	rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
 	compactable := &fakeCompactable{testutil.NewRecorderStream()}
-	tb := newPeriodic(fc, retentionDuration, rg, compactable)
+	tb := newPeriodic(zap.NewExample(), fc, retentionDuration, rg, compactable)
 
 	tb.Run()
 	tb.Pause()

+ 59 - 31
compactor/revision.go

@@ -23,11 +23,14 @@ import (
 	"github.com/coreos/etcd/mvcc"
 
 	"github.com/jonboulle/clockwork"
+	"go.uber.org/zap"
 )
 
 // Revision compacts the log by purging revisions older than
 // the configured reivison number. Compaction happens every 5 minutes.
 type Revision struct {
+	lg *zap.Logger
+
 	clock     clockwork.Clock
 	retention int64
 
@@ -41,75 +44,100 @@ type Revision struct {
 	paused bool
 }
 
-// NewRevision creates a new instance of Revisonal compactor that purges
+// newRevision creates a new instance of Revisonal compactor that purges
 // the log older than retention revisions from the current revision.
-func NewRevision(retention int64, rg RevGetter, c Compactable) *Revision {
-	return newRevision(clockwork.NewRealClock(), retention, rg, c)
-}
-
-func newRevision(clock clockwork.Clock, retention int64, rg RevGetter, c Compactable) *Revision {
-	t := &Revision{
+func newRevision(lg *zap.Logger, clock clockwork.Clock, retention int64, rg RevGetter, c Compactable) *Revision {
+	rc := &Revision{
+		lg:        lg,
 		clock:     clock,
 		retention: retention,
 		rg:        rg,
 		c:         c,
 	}
-	t.ctx, t.cancel = context.WithCancel(context.Background())
-	return t
+	rc.ctx, rc.cancel = context.WithCancel(context.Background())
+	return rc
 }
 
 const revInterval = 5 * time.Minute
 
 // Run runs revision-based compactor.
-func (t *Revision) Run() {
+func (rc *Revision) Run() {
 	prev := int64(0)
 	go func() {
 		for {
 			select {
-			case <-t.ctx.Done():
+			case <-rc.ctx.Done():
 				return
-			case <-t.clock.After(revInterval):
-				t.mu.Lock()
-				p := t.paused
-				t.mu.Unlock()
+			case <-rc.clock.After(revInterval):
+				rc.mu.Lock()
+				p := rc.paused
+				rc.mu.Unlock()
 				if p {
 					continue
 				}
 			}
 
-			rev := t.rg.Rev() - t.retention
+			rev := rc.rg.Rev() - rc.retention
 			if rev <= 0 || rev == prev {
 				continue
 			}
 
-			plog.Noticef("Starting auto-compaction at revision %d (retention: %d revisions)", rev, t.retention)
-			_, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev})
+			now := time.Now()
+			if rc.lg != nil {
+				rc.lg.Info(
+					"starting auto revision compaction",
+					zap.Int64("revision", rev),
+					zap.Int64("revision-compaction-retention", rc.retention),
+				)
+			} else {
+				plog.Noticef("Starting auto-compaction at revision %d (retention: %d revisions)", rev, rc.retention)
+			}
+			_, err := rc.c.Compact(rc.ctx, &pb.CompactionRequest{Revision: rev})
 			if err == nil || err == mvcc.ErrCompacted {
 				prev = rev
-				plog.Noticef("Finished auto-compaction at revision %d", rev)
+				if rc.lg != nil {
+					rc.lg.Info(
+						"completed auto revision compaction",
+						zap.Int64("revision", rev),
+						zap.Int64("revision-compaction-retention", rc.retention),
+						zap.Duration("took", time.Since(now)),
+					)
+				} else {
+					plog.Noticef("Finished auto-compaction at revision %d", rev)
+				}
 			} else {
-				plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err)
-				plog.Noticef("Retry after %v", revInterval)
+				if rc.lg != nil {
+					rc.lg.Warn(
+						"failed auto revision compaction",
+						zap.Int64("revision", rev),
+						zap.Int64("revision-compaction-retention", rc.retention),
+						zap.Duration("retry-interval", revInterval),
+						zap.Error(err),
+					)
+				} else {
+					plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err)
+					plog.Noticef("Retry after %v", revInterval)
+				}
 			}
 		}
 	}()
 }
 
 // Stop stops revision-based compactor.
-func (t *Revision) Stop() {
-	t.cancel()
+func (rc *Revision) Stop() {
+	rc.cancel()
 }
 
 // Pause pauses revision-based compactor.
-func (t *Revision) Pause() {
-	t.mu.Lock()
-	defer t.mu.Unlock()
-	t.paused = true
+func (rc *Revision) Pause() {
+	rc.mu.Lock()
+	rc.paused = true
+	rc.mu.Unlock()
 }
 
 // Resume resumes revision-based compactor.
-func (t *Revision) Resume() {
-	t.mu.Lock()
-	defer t.mu.Unlock()
-	t.paused = false
+func (rc *Revision) Resume() {
+	rc.mu.Lock()
+	rc.paused = false
+	rc.mu.Unlock()
 }

+ 3 - 2
compactor/revision_test.go

@@ -23,13 +23,14 @@ import (
 	"github.com/coreos/etcd/pkg/testutil"
 
 	"github.com/jonboulle/clockwork"
+	"go.uber.org/zap"
 )
 
 func TestRevision(t *testing.T) {
 	fc := clockwork.NewFakeClock()
 	rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
 	compactable := &fakeCompactable{testutil.NewRecorderStream()}
-	tb := newRevision(fc, 10, rg, compactable)
+	tb := newRevision(zap.NewExample(), fc, 10, rg, compactable)
 
 	tb.Run()
 	defer tb.Stop()
@@ -72,7 +73,7 @@ func TestRevisionPause(t *testing.T) {
 	fc := clockwork.NewFakeClock()
 	rg := &fakeRevGetter{testutil.NewRecorderStream(), 99} // will be 100
 	compactable := &fakeCompactable{testutil.NewRecorderStream()}
-	tb := newRevision(fc, 10, rg, compactable)
+	tb := newRevision(zap.NewExample(), fc, 10, rg, compactable)
 
 	tb.Run()
 	tb.Pause()