Browse Source

wal: pre-create segment files

Pipeline file creation and allocation so it overlaps writes to the log.

Fixes #4773
Anthony Romano 9 years ago
parent
commit
0df732c052
2 changed files with 112 additions and 9 deletions
  1. 95 0
      wal/file_pipeline.go
  2. 17 9
      wal/wal.go

+ 95 - 0
wal/file_pipeline.go

@@ -0,0 +1,95 @@
+// Copyright 2016 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package wal
+
+import (
+	"fmt"
+	"os"
+	"path"
+
+	"github.com/coreos/etcd/pkg/fileutil"
+)
+
+// filePipeline pipelines allocating disk space
+type filePipeline struct {
+	// dir to put files
+	dir string
+	// size of files to make, in bytes
+	size int64
+	// count number of files generated
+	count int
+
+	filec chan *fileutil.LockedFile
+	errc  chan error
+	donec chan struct{}
+}
+
+func newFilePipeline(dir string, fileSize int64) *filePipeline {
+	fp := &filePipeline{
+		dir:   dir,
+		size:  fileSize,
+		filec: make(chan *fileutil.LockedFile),
+		errc:  make(chan error, 1),
+		donec: make(chan struct{}),
+	}
+	go fp.run()
+	return fp
+}
+
+// Open returns a fresh file for writing
+func (fp *filePipeline) Open() (f *fileutil.LockedFile, err error) {
+	select {
+	case f = <-fp.filec:
+	case err = <-fp.errc:
+	}
+	return
+}
+
+func (fp *filePipeline) Close() error {
+	close(fp.donec)
+	return <-fp.errc
+}
+
+func (fp *filePipeline) alloc() (f *fileutil.LockedFile, err error) {
+	fpath := path.Join(fp.dir, fmt.Sprintf("%d.tmp", fp.count))
+	if f, err = fileutil.LockFile(fpath, os.O_CREATE|os.O_WRONLY, 0600); err != nil {
+		return nil, err
+	}
+	if err = fileutil.Preallocate(f.File, fp.size, true); err != nil {
+		plog.Errorf("failed to allocate space when creating new wal file (%v)", err)
+		f.Close()
+		return nil, err
+	}
+	fp.count++
+	return f, nil
+}
+
+func (fp *filePipeline) run() {
+	defer close(fp.errc)
+	for {
+		f, err := fp.alloc()
+		if err != nil {
+			fp.errc <- err
+			return
+		}
+		select {
+		case fp.filec <- f:
+		case <-fp.donec:
+			os.Remove(f.Name())
+			f.Close()
+			return
+		}
+	}
+}

+ 17 - 9
wal/wal.go

@@ -80,6 +80,7 @@ type WAL struct {
 	encoder *encoder // encoder to encode records
 	encoder *encoder // encoder to encode records
 
 
 	locks []*fileutil.LockedFile // the locked files the WAL holds (the name is increasing)
 	locks []*fileutil.LockedFile // the locked files the WAL holds (the name is increasing)
+	fp    *filePipeline
 }
 }
 
 
 // Create creates a WAL ready for appending records. The given metadata is
 // Create creates a WAL ready for appending records. The given metadata is
@@ -109,6 +110,7 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
 		dir:      dirpath,
 		dir:      dirpath,
 		metadata: metadata,
 		metadata: metadata,
 		encoder:  newEncoder(f, 0),
 		encoder:  newEncoder(f, 0),
+		fp:       newFilePipeline(dirpath, segmentSizeBytes),
 	}
 	}
 	w.locks = append(w.locks, f)
 	w.locks = append(w.locks, f)
 	if err := w.saveCrc(0); err != nil {
 	if err := w.saveCrc(0); err != nil {
@@ -206,6 +208,7 @@ func openAtIndex(dirpath string, snap walpb.Snapshot, write bool) (*WAL, error)
 			plog.Errorf("failed to allocate space when creating new wal file (%v)", err)
 			plog.Errorf("failed to allocate space when creating new wal file (%v)", err)
 			return nil, err
 			return nil, err
 		}
 		}
+		w.fp = newFilePipeline(w.dir, segmentSizeBytes)
 	}
 	}
 
 
 	return w, nil
 	return w, nil
@@ -332,10 +335,9 @@ func (w *WAL) cut() error {
 	}
 	}
 
 
 	fpath := path.Join(w.dir, walName(w.seq()+1, w.enti+1))
 	fpath := path.Join(w.dir, walName(w.seq()+1, w.enti+1))
-	ftpath := fpath + ".tmp"
 
 
 	// create a temp wal file with name sequence + 1, or truncate the existing one
 	// create a temp wal file with name sequence + 1, or truncate the existing one
-	newTail, err := fileutil.LockFile(ftpath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
+	newTail, err := w.fp.Open()
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
@@ -357,7 +359,13 @@ func (w *WAL) cut() error {
 	if err = w.sync(); err != nil {
 	if err = w.sync(); err != nil {
 		return err
 		return err
 	}
 	}
-	if err = os.Rename(ftpath, fpath); err != nil {
+
+	off, err = w.tail().Seek(0, os.SEEK_CUR)
+	if err != nil {
+		return err
+	}
+
+	if err = os.Rename(newTail.Name(), fpath); err != nil {
 		return err
 		return err
 	}
 	}
 	newTail.Close()
 	newTail.Close()
@@ -365,7 +373,7 @@ func (w *WAL) cut() error {
 	if newTail, err = fileutil.LockFile(fpath, os.O_WRONLY, 0600); err != nil {
 	if newTail, err = fileutil.LockFile(fpath, os.O_WRONLY, 0600); err != nil {
 		return err
 		return err
 	}
 	}
-	if _, err = newTail.Seek(0, os.SEEK_END); err != nil {
+	if _, err = newTail.Seek(off, os.SEEK_SET); err != nil {
 		return err
 		return err
 	}
 	}
 
 
@@ -374,11 +382,6 @@ func (w *WAL) cut() error {
 	prevCrc = w.encoder.crc.Sum32()
 	prevCrc = w.encoder.crc.Sum32()
 	w.encoder = newEncoder(w.tail(), prevCrc)
 	w.encoder = newEncoder(w.tail(), prevCrc)
 
 
-	if err = fileutil.Preallocate(w.tail().File, segmentSizeBytes, true); err != nil {
-		plog.Errorf("failed to allocate space when creating new wal file (%v)", err)
-		return err
-	}
-
 	plog.Infof("segmented wal file %v is created", fpath)
 	plog.Infof("segmented wal file %v is created", fpath)
 	return nil
 	return nil
 }
 }
@@ -443,6 +446,11 @@ func (w *WAL) Close() error {
 	w.mu.Lock()
 	w.mu.Lock()
 	defer w.mu.Unlock()
 	defer w.mu.Unlock()
 
 
+	if w.fp != nil {
+		w.fp.Close()
+		w.fp = nil
+	}
+
 	if w.tail() != nil {
 	if w.tail() != nil {
 		if err := w.sync(); err != nil {
 		if err := w.sync(); err != nil {
 			return err
 			return err