Bläddra i källkod

Merge pull request #4621 from xiang90/auto-compaction

*: support time based auto compaction.
Xiang Li 9 år sedan
förälder
incheckning
9e493ccb14
8 ändrade filer med 289 tillägg och 23 borttagningar
  1. 133 0
      compactor/compactor.go
  2. 111 0
      compactor/compactor_test.go
  3. 4 2
      etcdmain/config.go
  4. 21 20
      etcdmain/etcd.go
  5. 2 0
      etcdmain/help.go
  6. 2 1
      etcdserver/config.go
  7. 6 0
      etcdserver/raft.go
  8. 10 0
      etcdserver/server.go

+ 133 - 0
compactor/compactor.go

@@ -0,0 +1,133 @@
+// Copyright 2016 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package compactor
+
+import (
+	"sync"
+	"time"
+
+	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog"
+	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork"
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	"github.com/coreos/etcd/storage"
+)
+
+var (
+	plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "etcdserver")
+)
+
+const (
+	checkCompactionInterval = 5 * time.Minute
+)
+
+type Compactable interface {
+	Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error)
+}
+
+type RevGetter interface {
+	Rev() int64
+}
+
+type Periodic struct {
+	clock        clockwork.Clock
+	periodInHour int
+
+	rg RevGetter
+	c  Compactable
+
+	revs   []int64
+	ctx    context.Context
+	cancel context.CancelFunc
+
+	mu     sync.Mutex
+	paused bool
+}
+
+func NewPeriodic(h int, rg RevGetter, c Compactable) *Periodic {
+	return &Periodic{
+		clock:        clockwork.NewRealClock(),
+		periodInHour: h,
+		rg:           rg,
+		c:            c,
+	}
+}
+
+func (t *Periodic) Run() {
+	t.ctx, t.cancel = context.WithCancel(context.Background())
+	t.revs = make([]int64, 0)
+	clock := t.clock
+
+	go func() {
+		last := clock.Now()
+		for {
+			t.revs = append(t.revs, t.rg.Rev())
+			select {
+			case <-t.ctx.Done():
+				return
+			case <-clock.After(checkCompactionInterval):
+				t.mu.Lock()
+				p := t.paused
+				t.mu.Unlock()
+				if p {
+					continue
+				}
+			}
+			if clock.Now().Sub(last) < time.Duration(t.periodInHour)*time.Hour {
+				continue
+			}
+
+			rev := t.getRev(t.periodInHour)
+			if rev < 0 {
+				continue
+			}
+
+			plog.Noticef("Starting auto-compaction at revision %d", rev)
+			_, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev})
+			if err == nil || err == storage.ErrCompacted {
+				t.revs = make([]int64, 0)
+				last = clock.Now()
+				plog.Noticef("Finished auto-compaction at revision %d", rev)
+			} else {
+				plog.Noticef("Failed auto-compaction at revision %d (%v)", err, rev)
+				plog.Noticef("Retry after %v", checkCompactionInterval)
+			}
+		}
+	}()
+}
+
+func (t *Periodic) Stop() {
+	t.cancel()
+}
+
+func (t *Periodic) Pause() {
+	t.mu.Lock()
+	defer t.mu.Unlock()
+	t.paused = true
+}
+
+func (t *Periodic) Resume() {
+	t.mu.Lock()
+	defer t.mu.Unlock()
+	t.paused = false
+}
+
+func (t *Periodic) getRev(h int) int64 {
+	i := len(t.revs) - int(time.Duration(h)*time.Hour/checkCompactionInterval)
+	if i < 0 {
+		return -1
+	}
+	return t.revs[i]
+}

+ 111 - 0
compactor/compactor_test.go

@@ -0,0 +1,111 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package compactor
+
+import (
+	"reflect"
+	"testing"
+	"time"
+
+	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork"
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	"github.com/coreos/etcd/pkg/testutil"
+)
+
+func TestPeriodic(t *testing.T) {
+	fc := clockwork.NewFakeClock()
+	compactable := &fakeCompactable{testutil.NewRecorderStream()}
+	tb := &Periodic{
+		clock:        fc,
+		periodInHour: 1,
+		rg:           &fakeRevGetter{},
+		c:            compactable,
+	}
+
+	tb.Run()
+	defer tb.Stop()
+
+	n := int(time.Hour / checkCompactionInterval)
+	for i := 0; i < 3; i++ {
+		for j := 0; j < n; j++ {
+			time.Sleep(5 * time.Millisecond)
+			fc.Advance(checkCompactionInterval)
+		}
+
+		a, err := compactable.Wait(1)
+		if err != nil {
+			t.Fatal(err)
+		}
+		if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: int64(i*n) + 1}) {
+			t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: int64(i*n) + 1})
+		}
+	}
+}
+
+func TestPeriodicPause(t *testing.T) {
+	fc := clockwork.NewFakeClock()
+	compactable := &fakeCompactable{testutil.NewRecorderStream()}
+	tb := &Periodic{
+		clock:        fc,
+		periodInHour: 1,
+		rg:           &fakeRevGetter{},
+		c:            compactable,
+	}
+
+	tb.Run()
+	tb.Pause()
+
+	n := int(time.Hour / checkCompactionInterval)
+	for i := 0; i < 3*n; i++ {
+		time.Sleep(5 * time.Millisecond)
+		fc.Advance(checkCompactionInterval)
+	}
+
+	select {
+	case a := <-compactable.Chan():
+		t.Fatal("unexpected action %v", a)
+	case <-time.After(10 * time.Millisecond):
+	}
+
+	tb.Resume()
+	fc.Advance(checkCompactionInterval)
+
+	a, err := compactable.Wait(1)
+	if err != nil {
+		t.Fatal(err)
+	}
+	if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: int64(2*n) + 2}) {
+		t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: int64(2*n) + 2})
+	}
+}
+
+type fakeCompactable struct {
+	testutil.Recorder
+}
+
+func (fc *fakeCompactable) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) {
+	fc.Record(testutil.Action{Name: "c", Params: []interface{}{r}})
+	return &pb.CompactionResponse{}, nil
+}
+
+type fakeRevGetter struct {
+	rev int64
+}
+
+func (fr *fakeRevGetter) Rev() int64 {
+	fr.rev++
+	return fr.rev
+}

+ 4 - 2
etcdmain/config.go

@@ -121,8 +121,9 @@ type config struct {
 
 	printVersion bool
 
-	v3demo   bool
-	gRPCAddr string
+	v3demo                  bool
+	gRPCAddr                string
+	autoCompactionRetention int
 
 	enablePprof bool
 
@@ -224,6 +225,7 @@ func NewConfig() *config {
 	// demo flag
 	fs.BoolVar(&cfg.v3demo, "experimental-v3demo", false, "Enable experimental v3 demo API.")
 	fs.StringVar(&cfg.gRPCAddr, "experimental-gRPC-addr", "127.0.0.1:2378", "gRPC address for experimental v3 demo API.")
+	fs.IntVar(&cfg.autoCompactionRetention, "experimental-auto-compaction-retention", 0, "Auto compaction retention in hour. 0 means disable auto compaction.")
 
 	// backwards-compatibility with v0.4.6
 	fs.Var(&flags.IPAddressPort{}, "addr", "DEPRECATED: Use -advertise-client-urls instead.")

+ 21 - 20
etcdmain/etcd.go

@@ -274,26 +274,27 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
 	}
 
 	srvcfg := &etcdserver.ServerConfig{
-		Name:                cfg.name,
-		ClientURLs:          cfg.acurls,
-		PeerURLs:            cfg.apurls,
-		DataDir:             cfg.dir,
-		DedicatedWALDir:     cfg.walDir,
-		SnapCount:           cfg.snapCount,
-		MaxSnapFiles:        cfg.maxSnapFiles,
-		MaxWALFiles:         cfg.maxWalFiles,
-		InitialPeerURLsMap:  urlsmap,
-		InitialClusterToken: token,
-		DiscoveryURL:        cfg.durl,
-		DiscoveryProxy:      cfg.dproxy,
-		NewCluster:          cfg.isNewCluster(),
-		ForceNewCluster:     cfg.forceNewCluster,
-		PeerTLSInfo:         cfg.peerTLSInfo,
-		TickMs:              cfg.TickMs,
-		ElectionTicks:       cfg.electionTicks(),
-		V3demo:              cfg.v3demo,
-		StrictReconfigCheck: cfg.strictReconfigCheck,
-		EnablePprof:         cfg.enablePprof,
+		Name:                    cfg.name,
+		ClientURLs:              cfg.acurls,
+		PeerURLs:                cfg.apurls,
+		DataDir:                 cfg.dir,
+		DedicatedWALDir:         cfg.walDir,
+		SnapCount:               cfg.snapCount,
+		MaxSnapFiles:            cfg.maxSnapFiles,
+		MaxWALFiles:             cfg.maxWalFiles,
+		InitialPeerURLsMap:      urlsmap,
+		InitialClusterToken:     token,
+		DiscoveryURL:            cfg.durl,
+		DiscoveryProxy:          cfg.dproxy,
+		NewCluster:              cfg.isNewCluster(),
+		ForceNewCluster:         cfg.forceNewCluster,
+		PeerTLSInfo:             cfg.peerTLSInfo,
+		TickMs:                  cfg.TickMs,
+		ElectionTicks:           cfg.electionTicks(),
+		V3demo:                  cfg.v3demo,
+		AutoCompactionRetention: cfg.autoCompactionRetention,
+		StrictReconfigCheck:     cfg.strictReconfigCheck,
+		EnablePprof:             cfg.enablePprof,
 	}
 	var s *etcdserver.EtcdServer
 	s, err = etcdserver.NewServer(srvcfg)

+ 2 - 0
etcdmain/help.go

@@ -137,6 +137,8 @@ experimental flags:
 
 	--experimental-v3demo 'false'
 		enable experimental v3 demo API.
+	--experimental-auto-compaction-retention '0'
+		auto compaction retention in hour. 0 means disable auto compaction.
 	--experimental-gRPC-addr '127.0.0.1:2378'
 		gRPC address for experimental v3 demo API.
 

+ 2 - 1
etcdserver/config.go

@@ -50,7 +50,8 @@ type ServerConfig struct {
 	ElectionTicks    int
 	BootstrapTimeout time.Duration
 
-	V3demo bool
+	V3demo                  bool
+	AutoCompactionRetention int
 
 	StrictReconfigCheck bool
 

+ 6 - 0
etcdserver/raft.go

@@ -159,10 +159,16 @@ func (r *raftNode) start(s *EtcdServer) {
 						if r.s.stats != nil {
 							r.s.stats.BecomeLeader()
 						}
+						if r.s.compactor != nil {
+							r.s.compactor.Resume()
+						}
 					} else {
 						if r.s.lessor != nil {
 							r.s.lessor.Demote()
 						}
+						if r.s.compactor != nil {
+							r.s.compactor.Pause()
+						}
 						syncC = nil
 					}
 				}

+ 10 - 0
etcdserver/server.go

@@ -30,6 +30,7 @@ import (
 	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver"
 	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog"
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
+	"github.com/coreos/etcd/compactor"
 	"github.com/coreos/etcd/discovery"
 	"github.com/coreos/etcd/etcdserver/etcdhttp/httptypes"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
@@ -179,6 +180,8 @@ type EtcdServer struct {
 	lstats *stats.LeaderStats
 
 	SyncTicker <-chan time.Time
+	// compactor is used to auto-compact the KV.
+	compactor *compactor.Periodic
 
 	// consistent index used to hold the offset of current executing entry
 	// It is initialized to 0 before executing any entry.
@@ -368,6 +371,10 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
 		srv.be = backend.NewDefaultBackend(path.Join(cfg.SnapDir(), databaseFilename))
 		srv.lessor = lease.NewLessor(srv.be)
 		srv.kv = dstorage.New(srv.be, srv.lessor, &srv.consistIndex)
+		if h := cfg.AutoCompactionRetention; h != 0 {
+			srv.compactor = compactor.NewPeriodic(h, srv.kv, srv)
+			srv.compactor.Run()
+		}
 	}
 
 	// TODO: move transport initialization near the definition of remote
@@ -518,6 +525,9 @@ func (s *EtcdServer) run() {
 		if s.be != nil {
 			s.be.Close()
 		}
+		if s.compactor != nil {
+			s.compactor.Stop()
+		}
 		close(s.done)
 	}()