Browse Source

*: add Revision compactor

Iwasaki Yudai 8 years ago
parent
commit
a3f8f47422

+ 27 - 95
compactor/compactor.go

@@ -15,14 +15,13 @@
 package compactor
 
 import (
-	"sync"
+	"fmt"
 	"time"
 
-	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
-	"github.com/coreos/etcd/mvcc"
 	"github.com/coreos/pkg/capnslog"
-	"github.com/jonboulle/clockwork"
 	"golang.org/x/net/context"
+
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 )
 
 var (
@@ -32,8 +31,24 @@ var (
 const (
 	checkCompactionInterval   = 5 * time.Minute
 	executeCompactionInterval = time.Hour
+
+	ModePeriodic = "periodic"
+	ModeRevision = "revision"
 )
 
+// Compactor purges old log from the storage periodically.
+type Compactor interface {
+	// Run starts the main loop of the compactor in background.
+	// Use Stop() to halt the loop and release the resource.
+	Run()
+	// Stop halts the main loop of the compactor.
+	Stop()
+	// Pause temporally suspend the compactor not to run compaction. Resume() to unpose.
+	Pause()
+	// Resume restarts the compactor suspended by Pause().
+	Resume()
+}
+
 type Compactable interface {
 	Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error)
 }
@@ -42,96 +57,13 @@ type RevGetter interface {
 	Rev() int64
 }
 
-// Periodic compacts the log by purging revisions older than
-// the configured retention time. Compaction happens hourly.
-type Periodic struct {
-	clock        clockwork.Clock
-	periodInHour int
-
-	rg RevGetter
-	c  Compactable
-
-	revs   []int64
-	ctx    context.Context
-	cancel context.CancelFunc
-
-	mu     sync.Mutex
-	paused bool
-}
-
-func NewPeriodic(h int, rg RevGetter, c Compactable) *Periodic {
-	return &Periodic{
-		clock:        clockwork.NewRealClock(),
-		periodInHour: h,
-		rg:           rg,
-		c:            c,
-	}
-}
-
-func (t *Periodic) Run() {
-	t.ctx, t.cancel = context.WithCancel(context.Background())
-	t.revs = make([]int64, 0)
-	clock := t.clock
-
-	go func() {
-		last := clock.Now()
-		for {
-			t.revs = append(t.revs, t.rg.Rev())
-			select {
-			case <-t.ctx.Done():
-				return
-			case <-clock.After(checkCompactionInterval):
-				t.mu.Lock()
-				p := t.paused
-				t.mu.Unlock()
-				if p {
-					continue
-				}
-			}
-
-			if clock.Now().Sub(last) < executeCompactionInterval {
-				continue
-			}
-
-			rev, remaining := t.getRev(t.periodInHour)
-			if rev < 0 {
-				continue
-			}
-
-			plog.Noticef("Starting auto-compaction at revision %d", rev)
-			_, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev})
-			if err == nil || err == mvcc.ErrCompacted {
-				t.revs = remaining
-				last = clock.Now()
-				plog.Noticef("Finished auto-compaction at revision %d", rev)
-			} else {
-				plog.Noticef("Failed auto-compaction at revision %d (%v)", err, rev)
-				plog.Noticef("Retry after %v", checkCompactionInterval)
-			}
-		}
-	}()
-}
-
-func (t *Periodic) Stop() {
-	t.cancel()
-}
-
-func (t *Periodic) Pause() {
-	t.mu.Lock()
-	defer t.mu.Unlock()
-	t.paused = true
-}
-
-func (t *Periodic) Resume() {
-	t.mu.Lock()
-	defer t.mu.Unlock()
-	t.paused = false
-}
-
-func (t *Periodic) getRev(h int) (int64, []int64) {
-	i := len(t.revs) - int(time.Duration(h)*time.Hour/checkCompactionInterval)
-	if i < 0 {
-		return -1, t.revs
+func New(mode string, retention int, rg RevGetter, c Compactable) (Compactor, error) {
+	switch mode {
+	case ModePeriodic:
+		return NewPeriodic(retention, rg, c), nil
+	case ModeRevision:
+		return NewRevision(int64(retention), rg, c), nil
+	default:
+		return nil, fmt.Errorf("unsupported compaction mode %s", mode)
 	}
-	return t.revs[i], t.revs[i+1:]
 }

+ 0 - 97
compactor/compactor_test.go

@@ -15,108 +15,11 @@
 package compactor
 
 import (
-	"reflect"
-	"testing"
-	"time"
-
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/pkg/testutil"
-	"github.com/jonboulle/clockwork"
 	"golang.org/x/net/context"
 )
 
-func TestPeriodic(t *testing.T) {
-	retentionHours := 2
-
-	fc := clockwork.NewFakeClock()
-	rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
-	compactable := &fakeCompactable{testutil.NewRecorderStream()}
-	tb := &Periodic{
-		clock:        fc,
-		periodInHour: retentionHours,
-		rg:           rg,
-		c:            compactable,
-	}
-
-	tb.Run()
-	defer tb.Stop()
-
-	n := int(time.Hour / checkCompactionInterval)
-	// collect 5 hours of revisions
-	for i := 0; i < 5; i++ {
-		// advance one hour, one revision for each interval
-		for j := 0; j < n; j++ {
-			rg.Wait(1)
-			fc.Advance(checkCompactionInterval)
-		}
-
-		// compaction doesn't happen til 2 hours elapses
-		if i+1 < retentionHours {
-			continue
-		}
-
-		a, err := compactable.Wait(1)
-		if err != nil {
-			t.Fatal(err)
-		}
-		expectedRevision := int64(1 + (i+1)*n - retentionHours*n)
-		if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
-			t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
-		}
-	}
-
-	// unblock the rev getter, so we can stop the compactor routine.
-	_, err := rg.Wait(1)
-	if err != nil {
-		t.Fatal(err)
-	}
-}
-
-func TestPeriodicPause(t *testing.T) {
-	fc := clockwork.NewFakeClock()
-	compactable := &fakeCompactable{testutil.NewRecorderStream()}
-	rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
-	tb := &Periodic{
-		clock:        fc,
-		periodInHour: 1,
-		rg:           rg,
-		c:            compactable,
-	}
-
-	tb.Run()
-	tb.Pause()
-
-	// tb will collect 3 hours of revisions but not compact since paused
-	n := int(time.Hour / checkCompactionInterval)
-	for i := 0; i < 3*n; i++ {
-		rg.Wait(1)
-		fc.Advance(checkCompactionInterval)
-	}
-	// tb ends up waiting for the clock
-
-	select {
-	case a := <-compactable.Chan():
-		t.Fatalf("unexpected action %v", a)
-	case <-time.After(10 * time.Millisecond):
-	}
-
-	// tb resumes to being blocked on the clock
-	tb.Resume()
-
-	// unblock clock, will kick off a compaction at hour 3:05
-	rg.Wait(1)
-	fc.Advance(checkCompactionInterval)
-	a, err := compactable.Wait(1)
-	if err != nil {
-		t.Fatal(err)
-	}
-	// compact the revision from hour 2:05
-	wreq := &pb.CompactionRequest{Revision: int64(1 + 2*n + 1)}
-	if !reflect.DeepEqual(a[0].Params[0], wreq) {
-		t.Errorf("compact request = %v, want %v", a[0].Params[0], wreq.Revision)
-	}
-}
-
 type fakeCompactable struct {
 	testutil.Recorder
 }

+ 122 - 0
compactor/periodic.go

@@ -0,0 +1,122 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package compactor
+
+import (
+	"sync"
+	"time"
+
+	"github.com/jonboulle/clockwork"
+	"golang.org/x/net/context"
+
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	"github.com/coreos/etcd/mvcc"
+)
+
+// Periodic compacts the log by purging revisions older than
+// the configured retention time. Compaction happens hourly.
+type Periodic struct {
+	clock        clockwork.Clock
+	periodInHour int
+
+	rg RevGetter
+	c  Compactable
+
+	revs   []int64
+	ctx    context.Context
+	cancel context.CancelFunc
+
+	mu     sync.Mutex
+	paused bool
+}
+
+// NewPeriodic creates a new instance of Periodic compactor that purges
+// the log older than h hours.
+func NewPeriodic(h int, rg RevGetter, c Compactable) *Periodic {
+	return &Periodic{
+		clock:        clockwork.NewRealClock(),
+		periodInHour: h,
+		rg:           rg,
+		c:            c,
+	}
+}
+
+func (t *Periodic) Run() {
+	t.ctx, t.cancel = context.WithCancel(context.Background())
+	t.revs = make([]int64, 0)
+	clock := t.clock
+
+	go func() {
+		last := clock.Now()
+		for {
+			t.revs = append(t.revs, t.rg.Rev())
+			select {
+			case <-t.ctx.Done():
+				return
+			case <-clock.After(checkCompactionInterval):
+				t.mu.Lock()
+				p := t.paused
+				t.mu.Unlock()
+				if p {
+					continue
+				}
+			}
+
+			if clock.Now().Sub(last) < executeCompactionInterval {
+				continue
+			}
+
+			rev, remaining := t.getRev(t.periodInHour)
+			if rev < 0 {
+				continue
+			}
+
+			plog.Noticef("Starting auto-compaction at revision %d (retention: %d hours)", rev, t.periodInHour)
+			_, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev})
+			if err == nil || err == mvcc.ErrCompacted {
+				t.revs = remaining
+				last = clock.Now()
+				plog.Noticef("Finished auto-compaction at revision %d", rev)
+			} else {
+				plog.Noticef("Failed auto-compaction at revision %d (%v)", err, rev)
+				plog.Noticef("Retry after %v", checkCompactionInterval)
+			}
+		}
+	}()
+}
+
+func (t *Periodic) Stop() {
+	t.cancel()
+}
+
+func (t *Periodic) Pause() {
+	t.mu.Lock()
+	defer t.mu.Unlock()
+	t.paused = true
+}
+
+func (t *Periodic) Resume() {
+	t.mu.Lock()
+	defer t.mu.Unlock()
+	t.paused = false
+}
+
+func (t *Periodic) getRev(h int) (int64, []int64) {
+	i := len(t.revs) - int(time.Duration(h)*time.Hour/checkCompactionInterval)
+	if i < 0 {
+		return -1, t.revs
+	}
+	return t.revs[i], t.revs[i+1:]
+}

+ 117 - 0
compactor/periodic_test.go

@@ -0,0 +1,117 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package compactor
+
+import (
+	"reflect"
+	"testing"
+	"time"
+
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	"github.com/coreos/etcd/pkg/testutil"
+	"github.com/jonboulle/clockwork"
+)
+
+func TestPeriodic(t *testing.T) {
+	retentionHours := 2
+
+	fc := clockwork.NewFakeClock()
+	rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
+	compactable := &fakeCompactable{testutil.NewRecorderStream()}
+	tb := &Periodic{
+		clock:        fc,
+		periodInHour: retentionHours,
+		rg:           rg,
+		c:            compactable,
+	}
+
+	tb.Run()
+	defer tb.Stop()
+
+	n := int(time.Hour / checkCompactionInterval)
+	// collect 5 hours of revisions
+	for i := 0; i < 5; i++ {
+		// advance one hour, one revision for each interval
+		for j := 0; j < n; j++ {
+			rg.Wait(1)
+			fc.Advance(checkCompactionInterval)
+		}
+
+		// compaction doesn't happen til 2 hours elapses
+		if i+1 < retentionHours {
+			continue
+		}
+
+		a, err := compactable.Wait(1)
+		if err != nil {
+			t.Fatal(err)
+		}
+		expectedRevision := int64(1 + (i+1)*n - retentionHours*n)
+		if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
+			t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
+		}
+	}
+
+	// unblock the rev getter, so we can stop the compactor routine.
+	_, err := rg.Wait(1)
+	if err != nil {
+		t.Fatal(err)
+	}
+}
+
+func TestPeriodicPause(t *testing.T) {
+	fc := clockwork.NewFakeClock()
+	compactable := &fakeCompactable{testutil.NewRecorderStream()}
+	rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
+	tb := &Periodic{
+		clock:        fc,
+		periodInHour: 1,
+		rg:           rg,
+		c:            compactable,
+	}
+
+	tb.Run()
+	tb.Pause()
+
+	// tb will collect 3 hours of revisions but not compact since paused
+	n := int(time.Hour / checkCompactionInterval)
+	for i := 0; i < 3*n; i++ {
+		rg.Wait(1)
+		fc.Advance(checkCompactionInterval)
+	}
+	// tb ends up waiting for the clock
+
+	select {
+	case a := <-compactable.Chan():
+		t.Fatalf("unexpected action %v", a)
+	case <-time.After(10 * time.Millisecond):
+	}
+
+	// tb resumes to being blocked on the clock
+	tb.Resume()
+
+	// unblock clock, will kick off a compaction at hour 3:05
+	rg.Wait(1)
+	fc.Advance(checkCompactionInterval)
+	a, err := compactable.Wait(1)
+	if err != nil {
+		t.Fatal(err)
+	}
+	// compact the revision from hour 2:05
+	wreq := &pb.CompactionRequest{Revision: int64(1 + 2*n + 1)}
+	if !reflect.DeepEqual(a[0].Params[0], wreq) {
+		t.Errorf("compact request = %v, want %v", a[0].Params[0], wreq.Revision)
+	}
+}

+ 106 - 0
compactor/revision.go

@@ -0,0 +1,106 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package compactor
+
+import (
+	"sync"
+
+	"github.com/jonboulle/clockwork"
+	"golang.org/x/net/context"
+
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	"github.com/coreos/etcd/mvcc"
+)
+
+// Revision compacts the log by purging revisions older than
+// the configured reivison number. Compaction happens every 5 minutes.
+type Revision struct {
+	clock     clockwork.Clock
+	retention int64
+
+	rg RevGetter
+	c  Compactable
+
+	ctx    context.Context
+	cancel context.CancelFunc
+
+	mu     sync.Mutex
+	paused bool
+}
+
+// NewRevision creates a new instance of Revisonal compactor that purges
+// the log older than retention revisions from the current revision.
+func NewRevision(retention int64, rg RevGetter, c Compactable) *Revision {
+	return &Revision{
+		clock:     clockwork.NewRealClock(),
+		retention: retention,
+		rg:        rg,
+		c:         c,
+	}
+}
+
+func (t *Revision) Run() {
+	t.ctx, t.cancel = context.WithCancel(context.Background())
+	clock := t.clock
+	previous := int64(0)
+
+	go func() {
+		for {
+			select {
+			case <-t.ctx.Done():
+				return
+			case <-clock.After(checkCompactionInterval):
+				t.mu.Lock()
+				p := t.paused
+				t.mu.Unlock()
+				if p {
+					continue
+				}
+			}
+
+			rev := t.rg.Rev() - t.retention
+
+			if rev <= 0 || rev == previous {
+				continue
+			}
+
+			plog.Noticef("Starting auto-compaction at revision %d (retention: %d revisions)", rev, t.retention)
+			_, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev})
+			if err == nil || err == mvcc.ErrCompacted {
+				previous = rev
+				plog.Noticef("Finished auto-compaction at revision %d", rev)
+			} else {
+				plog.Noticef("Failed auto-compaction at revision %d (%v)", err, rev)
+				plog.Noticef("Retry after %v", checkCompactionInterval)
+			}
+		}
+	}()
+}
+
+func (t *Revision) Stop() {
+	t.cancel()
+}
+
+func (t *Revision) Pause() {
+	t.mu.Lock()
+	defer t.mu.Unlock()
+	t.paused = true
+}
+
+func (t *Revision) Resume() {
+	t.mu.Lock()
+	defer t.mu.Unlock()
+	t.paused = false
+}

+ 117 - 0
compactor/revision_test.go

@@ -0,0 +1,117 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package compactor
+
+import (
+	"reflect"
+	"testing"
+	"time"
+
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	"github.com/coreos/etcd/pkg/testutil"
+	"github.com/jonboulle/clockwork"
+)
+
+func TestRevision(t *testing.T) {
+	fc := clockwork.NewFakeClock()
+	rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
+	compactable := &fakeCompactable{testutil.NewRecorderStream()}
+	tb := &Revision{
+		clock:     fc,
+		retention: 10,
+		rg:        rg,
+		c:         compactable,
+	}
+
+	tb.Run()
+	defer tb.Stop()
+
+	fc.Advance(checkCompactionInterval)
+	rg.Wait(1)
+	// nothing happens
+
+	rg.rev = 99 // will be 100
+	expectedRevision := int64(90)
+	fc.Advance(checkCompactionInterval)
+	rg.Wait(1)
+	a, err := compactable.Wait(1)
+	if err != nil {
+		t.Fatal(err)
+	}
+	if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
+		t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
+	}
+
+	// skip the same revision
+	rg.rev = 99 // will be 100
+	expectedRevision = int64(90)
+	rg.Wait(1)
+	// nothing happens
+
+	rg.rev = 199 // will be 200
+	expectedRevision = int64(190)
+	fc.Advance(checkCompactionInterval)
+	rg.Wait(1)
+	a, err = compactable.Wait(1)
+	if err != nil {
+		t.Fatal(err)
+	}
+	if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
+		t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
+	}
+}
+
+func TestRevisionPause(t *testing.T) {
+	fc := clockwork.NewFakeClock()
+	compactable := &fakeCompactable{testutil.NewRecorderStream()}
+	rg := &fakeRevGetter{testutil.NewRecorderStream(), 99} // will be 100
+	tb := &Revision{
+		clock:     fc,
+		retention: 10,
+		rg:        rg,
+		c:         compactable,
+	}
+
+	tb.Run()
+	tb.Pause()
+
+	// tb will collect 3 hours of revisions but not compact since paused
+	n := int(time.Hour / checkCompactionInterval)
+	for i := 0; i < 3*n; i++ {
+		fc.Advance(checkCompactionInterval)
+	}
+	// tb ends up waiting for the clock
+
+	select {
+	case a := <-compactable.Chan():
+		t.Fatalf("unexpected action %v", a)
+	case <-time.After(10 * time.Millisecond):
+	}
+
+	// tb resumes to being blocked on the clock
+	tb.Resume()
+
+	// unblock clock, will kick off a compaction at hour 3:05
+	fc.Advance(checkCompactionInterval)
+	rg.Wait(1)
+	a, err := compactable.Wait(1)
+	if err != nil {
+		t.Fatal(err)
+	}
+	wreq := &pb.CompactionRequest{Revision: int64(90)}
+	if !reflect.DeepEqual(a[0].Params[0], wreq) {
+		t.Errorf("compact request = %v, want %v", a[0].Params[0], wreq.Revision)
+	}
+}

+ 1 - 0
embed/config.go

@@ -80,6 +80,7 @@ type Config struct {
 	Name                    string `json:"name"`
 	SnapCount               uint64 `json:"snapshot-count"`
 	AutoCompactionRetention int    `json:"auto-compaction-retention"`
+	AutoCompactionMode      string `json:"auto-compaction-mode"`
 
 	// TickMs is the number of milliseconds between heartbeat ticks.
 	// TODO: decouple tickMs and heartbeat tick (current heartbeat tick = 1).

+ 1 - 0
embed/etcd.go

@@ -138,6 +138,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
 		TickMs:                  cfg.TickMs,
 		ElectionTicks:           cfg.ElectionTicks(),
 		AutoCompactionRetention: cfg.AutoCompactionRetention,
+		AutoCompactionMode:      cfg.AutoCompactionMode,
 		QuotaBackendBytes:       cfg.QuotaBackendBytes,
 		MaxTxnOps:               cfg.MaxTxnOps,
 		MaxRequestBytes:         cfg.MaxRequestBytes,

+ 2 - 1
etcdmain/config.go

@@ -199,7 +199,8 @@ func newConfig() *config {
 	// version
 	fs.BoolVar(&cfg.printVersion, "version", false, "Print the version and exit.")
 
-	fs.IntVar(&cfg.AutoCompactionRetention, "auto-compaction-retention", 0, "Auto compaction retention for mvcc key value store in hour. 0 means disable auto compaction.")
+	fs.IntVar(&cfg.AutoCompactionRetention, "auto-compaction-retention", 0, "Auto compaction retention for mvcc key value store. 0 means disable auto compaction.")
+	fs.StringVar(&cfg.AutoCompactionMode, "auto-compaction-mode", "periodic", "Interpret 'auto-compaction-retention' as hours when 'periodic', as revision numbers when 'revision'.")
 
 	// pprof profiler via HTTP
 	fs.BoolVar(&cfg.EnablePprof, "enable-pprof", false, "Enable runtime profiling data via HTTP server. Address is at client URL + \"/debug/pprof/\"")

+ 3 - 1
etcdmain/help.go

@@ -97,7 +97,9 @@ clustering flags:
 	--strict-reconfig-check
 		reject reconfiguration requests that would cause quorum loss.
 	--auto-compaction-retention '0'
-		auto compaction retention in hour. 0 means disable auto compaction.
+		auto compaction retention length. 0 means disable auto compaction.
+	--auto-compaction-mode 'periodic'
+		'periodic' means hours, 'revision' means revision numbers to retain by auto compaction
 	--enable-v2
 		Accept etcd V2 client requests.
 

+ 1 - 0
etcdserver/config.go

@@ -53,6 +53,7 @@ type ServerConfig struct {
 	BootstrapTimeout time.Duration
 
 	AutoCompactionRetention int
+	AutoCompactionMode      string
 	QuotaBackendBytes       int64
 	MaxTxnOps               uint
 

+ 6 - 3
etcdserver/server.go

@@ -221,7 +221,7 @@ type EtcdServer struct {
 
 	SyncTicker *time.Ticker
 	// compactor is used to auto-compact the KV.
-	compactor *compactor.Periodic
+	compactor compactor.Compactor
 
 	// peerRt used to send requests (version, lease) to peers.
 	peerRt   http.RoundTripper
@@ -469,8 +469,11 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
 		return nil, err
 	}
 	srv.authStore = auth.NewAuthStore(srv.be, tp)
-	if h := cfg.AutoCompactionRetention; h != 0 {
-		srv.compactor = compactor.NewPeriodic(h, srv.kv, srv)
+	if num := cfg.AutoCompactionRetention; num != 0 {
+		srv.compactor, err = compactor.New(cfg.AutoCompactionMode, num, srv.kv, srv)
+		if err != nil {
+			return nil, err
+		}
 		srv.compactor.Run()
 	}