Browse Source

Merge pull request #5498 from heyitsanthony/wal-tmpfile-fixes

wal: improve tmp file handling
Anthony Romano 9 years ago
parent
commit
310ebdd3e1
6 changed files with 34 additions and 27 deletions
  1. 9 8
      e2e/ctl_v2_test.go
  2. 4 6
      e2e/etcd_test.go
  3. 4 2
      wal/file_pipeline.go
  4. 1 5
      wal/repair.go
  5. 15 1
      wal/util.go
  6. 1 5
      wal/wal.go

+ 9 - 8
e2e/ctl_v2_test.go

@@ -15,7 +15,7 @@
 package e2e
 package e2e
 
 
 import (
 import (
-	"fmt"
+	"io/ioutil"
 	"os"
 	"os"
 	"strings"
 	"strings"
 	"testing"
 	"testing"
@@ -232,10 +232,12 @@ func TestCtlV2RoleList(t *testing.T) {
 func TestCtlV2Backup(t *testing.T) { // For https://github.com/coreos/etcd/issues/5360
 func TestCtlV2Backup(t *testing.T) { // For https://github.com/coreos/etcd/issues/5360
 	defer testutil.AfterTest(t)
 	defer testutil.AfterTest(t)
 
 
-	var (
-		backupDirPrefix = "testbackup"
-		backupDir       = fmt.Sprintf("%s0.etcd", backupDirPrefix)
-	)
+	backupDir, err := ioutil.TempDir("", "testbakcup0.etcd")
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer os.RemoveAll(backupDir)
+
 	epc1 := setupEtcdctlTest(t, &configNoTLS, false)
 	epc1 := setupEtcdctlTest(t, &configNoTLS, false)
 	if err := etcdctlSet(epc1, "foo1", "bar"); err != nil {
 	if err := etcdctlSet(epc1, "foo1", "bar"); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
@@ -244,7 +246,6 @@ func TestCtlV2Backup(t *testing.T) { // For https://github.com/coreos/etcd/issue
 	if err := etcdctlBackup(epc1, epc1.procs[0].cfg.dataDirPath, backupDir); err != nil {
 	if err := etcdctlBackup(epc1, epc1.procs[0].cfg.dataDirPath, backupDir); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
-	defer os.RemoveAll(backupDir)
 
 
 	if err := epc1.Close(); err != nil {
 	if err := epc1.Close(); err != nil {
 		t.Fatalf("error closing etcd processes (%v)", err)
 		t.Fatalf("error closing etcd processes (%v)", err)
@@ -252,7 +253,7 @@ func TestCtlV2Backup(t *testing.T) { // For https://github.com/coreos/etcd/issue
 
 
 	// restart from the backup directory
 	// restart from the backup directory
 	cfg2 := configNoTLS
 	cfg2 := configNoTLS
-	cfg2.dataDirPathPrefix = backupDirPrefix
+	cfg2.dataDirPath = backupDir
 	cfg2.keepDataDir = true
 	cfg2.keepDataDir = true
 	cfg2.forceNewCluster = true
 	cfg2.forceNewCluster = true
 	epc2 := setupEtcdctlTest(t, &cfg2, false)
 	epc2 := setupEtcdctlTest(t, &cfg2, false)
@@ -383,7 +384,7 @@ func etcdctlAuthEnable(clus *etcdProcessCluster) error {
 
 
 func etcdctlBackup(clus *etcdProcessCluster, dataDir, backupDir string) error {
 func etcdctlBackup(clus *etcdProcessCluster, dataDir, backupDir string) error {
 	cmdArgs := append(etcdctlPrefixArgs(clus), "backup", "--data-dir", dataDir, "--backup-dir", backupDir)
 	cmdArgs := append(etcdctlPrefixArgs(clus), "backup", "--data-dir", dataDir, "--backup-dir", backupDir)
-	return spawnWithExpect(cmdArgs, "wal: ignored file")
+	return spawnWithExpects(cmdArgs)
 }
 }
 
 
 func mustEtcdctl(t *testing.T) {
 func mustEtcdctl(t *testing.T) {

+ 4 - 6
e2e/etcd_test.go

@@ -137,8 +137,8 @@ type etcdProcessConfig struct {
 }
 }
 
 
 type etcdProcessClusterConfig struct {
 type etcdProcessClusterConfig struct {
-	dataDirPathPrefix string
-	keepDataDir       bool
+	dataDirPath string
+	keepDataDir bool
 
 
 	clusterSize       int
 	clusterSize       int
 	basePort          int
 	basePort          int
@@ -226,10 +226,8 @@ func (cfg *etcdProcessClusterConfig) etcdProcessConfigs() []*etcdProcessConfig {
 
 
 		purl := url.URL{Scheme: peerScheme, Host: fmt.Sprintf("localhost:%d", port+1)}
 		purl := url.URL{Scheme: peerScheme, Host: fmt.Sprintf("localhost:%d", port+1)}
 		name := fmt.Sprintf("testname%d", i)
 		name := fmt.Sprintf("testname%d", i)
-		var dataDirPath string
-		if cfg.dataDirPathPrefix != "" {
-			dataDirPath = fmt.Sprintf("%s%d.etcd", cfg.dataDirPathPrefix, i)
-		} else {
+		dataDirPath := cfg.dataDirPath
+		if cfg.dataDirPath == "" {
 			var derr error
 			var derr error
 			dataDirPath, derr = ioutil.TempDir("", name+".etcd")
 			dataDirPath, derr = ioutil.TempDir("", name+".etcd")
 			if derr != nil {
 			if derr != nil {

+ 4 - 2
wal/file_pipeline.go

@@ -48,7 +48,8 @@ func newFilePipeline(dir string, fileSize int64) *filePipeline {
 	return fp
 	return fp
 }
 }
 
 
-// Open returns a fresh file for writing
+// Open returns a fresh file for writing. Rename the file before calling
+// Open again or there will be file collisions.
 func (fp *filePipeline) Open() (f *fileutil.LockedFile, err error) {
 func (fp *filePipeline) Open() (f *fileutil.LockedFile, err error) {
 	select {
 	select {
 	case f = <-fp.filec:
 	case f = <-fp.filec:
@@ -63,7 +64,8 @@ func (fp *filePipeline) Close() error {
 }
 }
 
 
 func (fp *filePipeline) alloc() (f *fileutil.LockedFile, err error) {
 func (fp *filePipeline) alloc() (f *fileutil.LockedFile, err error) {
-	fpath := path.Join(fp.dir, fmt.Sprintf("%d.tmp", fp.count))
+	// count % 2 so this file isn't the same as the one last published
+	fpath := path.Join(fp.dir, fmt.Sprintf("%d.tmp", fp.count%2))
 	if f, err = fileutil.LockFile(fpath, os.O_CREATE|os.O_WRONLY, 0600); err != nil {
 	if f, err = fileutil.LockFile(fpath, os.O_CREATE|os.O_WRONLY, 0600); err != nil {
 		return nil, err
 		return nil, err
 	}
 	}

+ 1 - 5
wal/repair.go

@@ -90,14 +90,10 @@ func Repair(dirpath string) bool {
 
 
 // openLast opens the last wal file for read and write.
 // openLast opens the last wal file for read and write.
 func openLast(dirpath string) (*fileutil.LockedFile, error) {
 func openLast(dirpath string) (*fileutil.LockedFile, error) {
-	names, err := fileutil.ReadDir(dirpath)
+	names, err := readWalNames(dirpath)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	names = checkWalNames(names)
-	if len(names) == 0 {
-		return nil, ErrFileNotFound
-	}
 	last := path.Join(dirpath, names[len(names)-1])
 	last := path.Join(dirpath, names[len(names)-1])
 	return fileutil.LockFile(last, os.O_RDWR, 0600)
 	return fileutil.LockFile(last, os.O_RDWR, 0600)
 }
 }

+ 15 - 1
wal/util.go

@@ -67,12 +67,26 @@ func isValidSeq(names []string) bool {
 	}
 	}
 	return true
 	return true
 }
 }
+func readWalNames(dirpath string) ([]string, error) {
+	names, err := fileutil.ReadDir(dirpath)
+	if err != nil {
+		return nil, err
+	}
+	wnames := checkWalNames(names)
+	if len(wnames) == 0 {
+		return nil, ErrFileNotFound
+	}
+	return wnames, nil
+}
 
 
 func checkWalNames(names []string) []string {
 func checkWalNames(names []string) []string {
 	wnames := make([]string, 0)
 	wnames := make([]string, 0)
 	for _, name := range names {
 	for _, name := range names {
 		if _, _, err := parseWalName(name); err != nil {
 		if _, _, err := parseWalName(name); err != nil {
-			plog.Warningf("ignored file %v in wal", name)
+			// don't complain about left over tmp files
+			if !strings.HasSuffix(name, ".tmp") {
+				plog.Warningf("ignored file %v in wal", name)
+			}
 			continue
 			continue
 		}
 		}
 		wnames = append(wnames, name)
 		wnames = append(wnames, name)

+ 1 - 5
wal/wal.go

@@ -156,14 +156,10 @@ func OpenForRead(dirpath string, snap walpb.Snapshot) (*WAL, error) {
 }
 }
 
 
 func openAtIndex(dirpath string, snap walpb.Snapshot, write bool) (*WAL, error) {
 func openAtIndex(dirpath string, snap walpb.Snapshot, write bool) (*WAL, error) {
-	names, err := fileutil.ReadDir(dirpath)
+	names, err := readWalNames(dirpath)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	names = checkWalNames(names)
-	if len(names) == 0 {
-		return nil, ErrFileNotFound
-	}
 
 
 	nameIndex, ok := searchIndex(names, snap.Index)
 	nameIndex, ok := searchIndex(names, snap.Index)
 	if !ok || !isValidSeq(names[nameIndex:]) {
 	if !ok || !isValidSeq(names[nameIndex:]) {