Bläddra i källkod

*: support purging old wal/snap files

Xiang Li 11 år sedan
förälder
incheckning
d3db010190
8 ändrade filer med 145 tillägg och 18 borttagningar
  1. 4 0
      etcdmain/etcd.go
  2. 2 0
      etcdserver/config.go
  3. 24 0
      etcdserver/server.go
  4. 14 0
      pkg/fileutil/fileutil.go
  5. 46 0
      pkg/fileutil/purge.go
  6. 50 0
      pkg/fileutil/purge_test.go
  7. 3 17
      wal/util.go
  8. 2 1
      wal/wal.go

+ 4 - 0
etcdmain/etcd.go

@@ -63,6 +63,8 @@ var (
 	snapCount       = fs.Uint64("snapshot-count", etcdserver.DefaultSnapCount, "Number of committed transactions to trigger a snapshot")
 	printVersion    = fs.Bool("version", false, "Print the version and exit")
 	forceNewCluster = fs.Bool("force-new-cluster", false, "Force to create a new one member cluster")
+	maxSnapFiles    = fs.Uint("max-snapshots", 5, "Maximum number of snapshot files to retain (0 is unlimited)")
+	maxWalFiles     = fs.Uint("max-wals", 5, "Maximum number of wal files to retain (0 is unlimited)")
 
 	initialCluster      = fs.String("initial-cluster", "default=http://localhost:2380,default=http://localhost:7001", "Initial cluster configuration for bootstrapping")
 	initialClusterToken = fs.String("initial-cluster-token", "etcd-cluster", "Initial cluster token for the etcd cluster during bootstrap")
@@ -280,6 +282,8 @@ func startEtcd() (<-chan struct{}, error) {
 		PeerURLs:        apurls,
 		DataDir:         *dir,
 		SnapCount:       *snapCount,
+		MaxSnapFiles:    *maxSnapFiles,
+		MaxWALFiles:     *maxWalFiles,
 		Cluster:         cls,
 		DiscoveryURL:    *durl,
 		DiscoveryProxy:  *dproxy,

+ 2 - 0
etcdserver/config.go

@@ -37,6 +37,8 @@ type ServerConfig struct {
 	PeerURLs        types.URLs
 	DataDir         string
 	SnapCount       uint64
+	MaxSnapFiles    uint
+	MaxWALFiles     uint
 	Cluster         *Cluster
 	NewCluster      bool
 	ForceNewCluster bool

+ 24 - 0
etcdserver/server.go

@@ -37,6 +37,7 @@ import (
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/etcdserver/stats"
 	"github.com/coreos/etcd/migrate"
+	"github.com/coreos/etcd/pkg/fileutil"
 	"github.com/coreos/etcd/pkg/pbutil"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/pkg/wait"
@@ -59,6 +60,8 @@ const (
 
 	StoreAdminPrefix = "/0"
 	StoreKeysPrefix  = "/1"
+
+	purgeFileInterval = 30 * time.Second
 )
 
 var (
@@ -157,6 +160,7 @@ type RaftTimer interface {
 
 // EtcdServer is the production implementation of the Server interface
 type EtcdServer struct {
+	cfg        *ServerConfig
 	w          wait.Wait
 	done       chan struct{}
 	stop       chan struct{}
@@ -301,6 +305,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
 	lstats := stats.NewLeaderStats(id.String())
 
 	srv := &EtcdServer{
+		cfg:         cfg,
 		store:       st,
 		node:        n,
 		raftStorage: s,
@@ -327,6 +332,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
 func (s *EtcdServer) Start() {
 	s.start()
 	go s.publish(defaultPublishRetryInterval)
+	go s.purgeFile()
 }
 
 // start prepares and starts server in a new goroutine. It is no longer safe to
@@ -346,6 +352,24 @@ func (s *EtcdServer) start() {
 	go s.run()
 }
 
+func (s *EtcdServer) purgeFile() {
+	var serrc, werrc <-chan error
+	if s.cfg.MaxSnapFiles > 0 {
+		serrc = fileutil.PurgeFile(s.cfg.SnapDir(), "snap", s.cfg.MaxSnapFiles, purgeFileInterval, s.done)
+	}
+	if s.cfg.MaxWALFiles > 0 {
+		werrc = fileutil.PurgeFile(s.cfg.WALDir(), "wal", s.cfg.MaxWALFiles, purgeFileInterval, s.done)
+	}
+	select {
+	case e := <-werrc:
+		log.Fatalf("etcdserver: failed to purge wal file %v", e)
+	case e := <-serrc:
+		log.Fatalf("etcdserver: failed to purge snap file %v", e)
+	case <-s.done:
+		return
+	}
+}
+
 func (s *EtcdServer) ID() types.ID { return s.id }
 
 func (s *EtcdServer) SenderFinder() rafthttp.SenderFinder { return s.sendhub }

+ 14 - 0
pkg/fileutil/fileutil.go

@@ -35,3 +35,17 @@ func IsDirWriteable(dir string) error {
 	}
 	return os.Remove(f)
 }
+
+// ReadDir returns the filenames in the given directory.
+func ReadDir(dirpath string) ([]string, error) {
+	dir, err := os.Open(dirpath)
+	if err != nil {
+		return nil, err
+	}
+	defer dir.Close()
+	names, err := dir.Readdirnames(-1)
+	if err != nil {
+		return nil, err
+	}
+	return names, nil
+}

+ 46 - 0
pkg/fileutil/purge.go

@@ -0,0 +1,46 @@
+package fileutil
+
+import (
+	"log"
+	"os"
+	"path"
+	"sort"
+	"strings"
+	"time"
+)
+
+func PurgeFile(dirname string, suffix string, max uint, interval time.Duration, stop <-chan struct{}) <-chan error {
+	errC := make(chan error, 1)
+	go func() {
+		for {
+			fnames, err := ReadDir(dirname)
+			if err != nil {
+				errC <- err
+				return
+			}
+			newfnames := make([]string, 0)
+			for _, fname := range fnames {
+				if strings.HasSuffix(fname, suffix) {
+					newfnames = append(newfnames, fname)
+				}
+			}
+			sort.Strings(newfnames)
+			for len(newfnames) > int(max) {
+				f := path.Join(dirname, newfnames[0])
+				err := os.Remove(f)
+				if err != nil {
+					errC <- err
+					return
+				}
+				log.Printf("filePurge: successfully remvoed file %s", f)
+				newfnames = newfnames[1:]
+			}
+			select {
+			case <-time.After(interval):
+			case <-stop:
+				return
+			}
+		}
+	}()
+	return errC
+}

+ 50 - 0
pkg/fileutil/purge_test.go

@@ -0,0 +1,50 @@
+package fileutil
+
+import (
+	"fmt"
+	"io/ioutil"
+	"os"
+	"path"
+	"reflect"
+	"testing"
+	"time"
+)
+
+func TestPurgeFile(t *testing.T) {
+	dir, err := ioutil.TempDir("", "purgefile")
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer os.RemoveAll(dir)
+
+	for i := 0; i < 5; i++ {
+		_, err := os.Create(path.Join(dir, fmt.Sprintf("%d.test", i)))
+		if err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	stop := make(chan struct{})
+	errch := PurgeFile(dir, "test", 3, time.Millisecond, stop)
+	for i := 5; i < 10; i++ {
+		_, err := os.Create(path.Join(dir, fmt.Sprintf("%d.test", i)))
+		if err != nil {
+			t.Fatal(err)
+		}
+		time.Sleep(time.Millisecond)
+	}
+	fnames, err := ReadDir(dir)
+	if err != nil {
+		t.Fatal(err)
+	}
+	wnames := []string{"7.test", "8.test", "9.test"}
+	if !reflect.DeepEqual(fnames, wnames) {
+		t.Errorf("filenames = %v, want %v", fnames, wnames)
+	}
+	select {
+	case err := <-errch:
+		t.Errorf("unexpected purge error %v", err)
+	case <-time.After(time.Millisecond):
+	}
+	close(stop)
+}

+ 3 - 17
wal/util.go

@@ -19,9 +19,9 @@ package wal
 import (
 	"fmt"
 	"log"
-	"os"
 	"path"
 
+	"github.com/coreos/etcd/pkg/fileutil"
 	"github.com/coreos/etcd/pkg/types"
 )
 
@@ -36,7 +36,7 @@ const (
 )
 
 func DetectVersion(dirpath string) WalVersion {
-	names, err := readDir(dirpath)
+	names, err := fileutil.ReadDir(dirpath)
 	if err != nil || len(names) == 0 {
 		return WALNotExist
 	}
@@ -56,7 +56,7 @@ func DetectVersion(dirpath string) WalVersion {
 }
 
 func Exist(dirpath string) bool {
-	names, err := readDir(dirpath)
+	names, err := fileutil.ReadDir(dirpath)
 	if err != nil {
 		return false
 	}
@@ -97,20 +97,6 @@ func isValidSeq(names []string) bool {
 	return true
 }
 
-// readDir returns the filenames in wal directory.
-func readDir(dirpath string) ([]string, error) {
-	dir, err := os.Open(dirpath)
-	if err != nil {
-		return nil, err
-	}
-	defer dir.Close()
-	names, err := dir.Readdirnames(-1)
-	if err != nil {
-		return nil, err
-	}
-	return names, nil
-}
-
 func checkWalNames(names []string) []string {
 	wnames := make([]string, 0)
 	for _, name := range names {

+ 2 - 1
wal/wal.go

@@ -26,6 +26,7 @@ import (
 	"reflect"
 	"sort"
 
+	"github.com/coreos/etcd/pkg/fileutil"
 	"github.com/coreos/etcd/pkg/pbutil"
 	"github.com/coreos/etcd/raft"
 	"github.com/coreos/etcd/raft/raftpb"
@@ -110,7 +111,7 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
 // index. The WAL cannot be appended to before reading out all of its
 // previous records.
 func OpenAtIndex(dirpath string, index uint64) (*WAL, error) {
-	names, err := readDir(dirpath)
+	names, err := fileutil.ReadDir(dirpath)
 	if err != nil {
 		return nil, err
 	}