Prechádzať zdrojové kódy

Merge pull request #4785 from heyitsanthony/gce-fallocate

wal: extend WAL file to segment size on fallocate
Anthony Romano 9 rokov pred
rodič
commit
adebd91114

+ 21 - 16
pkg/fileutil/preallocate.go

@@ -12,31 +12,36 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-// +build linux
-
 package fileutil
 
-import (
-	"os"
-	"syscall"
-)
+import "os"
 
 // Preallocate tries to allocate the space for given
 // file. This operation is only supported on linux by a
 // few filesystems (btrfs, ext4, etc.).
 // If the operation is unsupported, no error will be returned.
 // Otherwise, the error encountered will be returned.
-func Preallocate(f *os.File, sizeInBytes int) error {
-	// use mode = 1 to keep size
-	// see FALLOC_FL_KEEP_SIZE
-	err := syscall.Fallocate(int(f.Fd()), 1, 0, int64(sizeInBytes))
+func Preallocate(f *os.File, sizeInBytes int64, extendFile bool) error {
+	if extendFile {
+		preallocExtend(f, sizeInBytes)
+	}
+	return preallocFixed(f, sizeInBytes)
+}
+
+func preallocExtendTrunc(f *os.File, sizeInBytes int64) error {
+	curOff, err := f.Seek(0, os.SEEK_CUR)
+	if err != nil {
+		return err
+	}
+	size, err := f.Seek(sizeInBytes, os.SEEK_END)
 	if err != nil {
-		errno, ok := err.(syscall.Errno)
-		// treat not support as nil error
-		if ok && errno == syscall.ENOTSUP {
-			return nil
-		}
 		return err
 	}
-	return nil
+	if _, err = f.Seek(curOff, os.SEEK_SET); err != nil {
+		return err
+	}
+	if sizeInBytes > size {
+		return nil
+	}
+	return f.Truncate(sizeInBytes)
 }

+ 43 - 0
pkg/fileutil/preallocate_darwin.go

@@ -0,0 +1,43 @@
+// Copyright 2016 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// +build darwin
+
+package fileutil
+
+import (
+	"os"
+	"syscall"
+	"unsafe"
+)
+
+func preallocExtend(f *os.File, sizeInBytes int64) error {
+	if err := preallocFixed(f, sizeInBytes); err != nil {
+		return err
+	}
+	return preallocExtendTrunc(f, sizeInBytes)
+}
+
+func preallocFixed(f *os.File, sizeInBytes int64) error {
+	fstore := &syscall.Fstore_t{
+		Flags:   syscall.F_ALLOCATEALL,
+		Posmode: syscall.F_PEOFPOSMODE,
+		Length:  sizeInBytes}
+	p := unsafe.Pointer(fstore)
+	_, _, errno := syscall.Syscall(syscall.SYS_FCNTL, f.Fd(), uintptr(syscall.F_PREALLOCATE), uintptr(p))
+	if errno == 0 || errno == syscall.ENOTSUP {
+		return nil
+	}
+	return errno
+}

+ 27 - 13
pkg/fileutil/preallocate_test.go

@@ -17,29 +17,29 @@ package fileutil
 import (
 	"io/ioutil"
 	"os"
-	"runtime"
 	"testing"
 )
 
-func TestPreallocate(t *testing.T) {
-	if runtime.GOOS != "linux" {
-		t.Skipf("skip testPreallocate, OS = %s", runtime.GOOS)
-	}
-
-	p, err := ioutil.TempDir(os.TempDir(), "preallocateTest")
-	if err != nil {
+func TestPreallocateExtend(t *testing.T) { runPreallocTest(t, testPreallocateExtend) }
+func testPreallocateExtend(t *testing.T, f *os.File) {
+	size := int64(64 * 1000)
+	if err := Preallocate(f, size, true); err != nil {
 		t.Fatal(err)
 	}
-	defer os.RemoveAll(p)
 
-	f, err := ioutil.TempFile(p, "")
+	stat, err := f.Stat()
 	if err != nil {
 		t.Fatal(err)
 	}
+	if stat.Size() != size {
+		t.Errorf("size = %d, want %d", stat.Size(), size)
+	}
+}
 
-	size := 64 * 1000
-	err = Preallocate(f, size)
-	if err != nil {
+func TestPreallocateFixed(t *testing.T) { runPreallocTest(t, testPreallocateFixed) }
+func testPreallocateFixed(t *testing.T, f *os.File) {
+	size := int64(64 * 1000)
+	if err := Preallocate(f, size, false); err != nil {
 		t.Fatal(err)
 	}
 
@@ -51,3 +51,17 @@ func TestPreallocate(t *testing.T) {
 		t.Errorf("size = %d, want %d", stat.Size(), 0)
 	}
 }
+
+func runPreallocTest(t *testing.T, test func(*testing.T, *os.File)) {
+	p, err := ioutil.TempDir(os.TempDir(), "preallocateTest")
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer os.RemoveAll(p)
+
+	f, err := ioutil.TempFile(p, "")
+	if err != nil {
+		t.Fatal(err)
+	}
+	test(t, f)
+}

+ 48 - 0
pkg/fileutil/preallocate_unix.go

@@ -0,0 +1,48 @@
+// Copyright 2016 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// +build linux
+
+package fileutil
+
+import (
+	"os"
+	"syscall"
+)
+
+func preallocExtend(f *os.File, sizeInBytes int64) error {
+	// use mode = 0 to change size
+	err := syscall.Fallocate(int(f.Fd()), 0, 0, sizeInBytes)
+	if err != nil {
+		errno, ok := err.(syscall.Errno)
+		// treat not support as nil error
+		if ok && errno == syscall.ENOTSUP {
+			return preallocExtendTrunc(f, sizeInBytes)
+		}
+	}
+	return err
+}
+
+func preallocFixed(f *os.File, sizeInBytes int64) error {
+	// use mode = 1 to keep size; see FALLOC_FL_KEEP_SIZE
+	err := syscall.Fallocate(int(f.Fd()), 1, 0, sizeInBytes)
+	if err != nil {
+		errno, ok := err.(syscall.Errno)
+		// treat not supported as nil error
+		if ok && errno == syscall.ENOTSUP {
+			return nil
+		}
+	}
+	return err
+}

+ 5 - 8
pkg/fileutil/perallocate_unsupported.go → pkg/fileutil/preallocate_unsupported.go

@@ -12,17 +12,14 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-// +build !linux
+// +build !linux,!darwin
 
 package fileutil
 
 import "os"
 
-// Preallocate tries to allocate the space for given
-// file. This operation is only supported on linux by a
-// few filesystems (btrfs, ext4, etc.).
-// If the operation is unsupported, no error will be returned.
-// Otherwise, the error encountered will be returned.
-func Preallocate(f *os.File, sizeInBytes int) error {
-	return nil
+func preallocExtend(f *os.File, sizeInBytes int64) error {
+	return preallocExtendTrunc(f, sizeInBytes)
 }
+
+func preallocFixed(f *os.File, sizeInBytes int64) error { return nil }

+ 47 - 12
wal/decoder.go

@@ -28,30 +28,58 @@ import (
 )
 
 type decoder struct {
-	mu sync.Mutex
-	br *bufio.Reader
+	mu  sync.Mutex
+	brs []*bufio.Reader
 
-	crc hash.Hash32
+	// lastValidOff file offset following the last valid decoded record
+	lastValidOff int64
+	crc          hash.Hash32
 }
 
-func newDecoder(r io.Reader) *decoder {
+func newDecoder(r ...io.Reader) *decoder {
+	readers := make([]*bufio.Reader, len(r))
+	for i := range r {
+		readers[i] = bufio.NewReader(r[i])
+	}
 	return &decoder{
-		br:  bufio.NewReader(r),
+		brs: readers,
 		crc: crc.New(0, crcTable),
 	}
 }
 
 func (d *decoder) decode(rec *walpb.Record) error {
+	rec.Reset()
 	d.mu.Lock()
 	defer d.mu.Unlock()
+	return d.decodeRecord(rec)
+}
+
+func (d *decoder) decodeRecord(rec *walpb.Record) error {
+	if len(d.brs) == 0 {
+		return io.EOF
+	}
+
+	l, err := readInt64(d.brs[0])
+	if err == io.EOF {
+		d.brs = d.brs[1:]
+		d.lastValidOff = 0
+		return d.decodeRecord(rec)
+	}
 
-	rec.Reset()
-	l, err := readInt64(d.br)
 	if err != nil {
 		return err
 	}
+	if l == 0 {
+		// hit preallocated space
+		d.brs = d.brs[1:]
+		if len(d.brs) == 0 {
+			return io.EOF
+		}
+		d.lastValidOff = 0
+		return d.decodeRecord(rec)
+	}
 	data := make([]byte, l)
-	if _, err = io.ReadFull(d.br, data); err != nil {
+	if _, err = io.ReadFull(d.brs[0], data); err != nil {
 		// ReadFull returns io.EOF only if no bytes were read
 		// the decoder should treat this as an ErrUnexpectedEOF instead.
 		if err == io.EOF {
@@ -62,12 +90,17 @@ func (d *decoder) decode(rec *walpb.Record) error {
 	if err := rec.Unmarshal(data); err != nil {
 		return err
 	}
+
 	// skip crc checking if the record type is crcType
-	if rec.Type == crcType {
-		return nil
+	if rec.Type != crcType {
+		d.crc.Write(rec.Data)
+		if err := rec.Validate(d.crc.Sum32()); err != nil {
+			return err
+		}
 	}
-	d.crc.Write(rec.Data)
-	return rec.Validate(d.crc.Sum32())
+	// record decoded as valid; point last valid offset to end of record
+	d.lastValidOff += l + 8
+	return nil
 }
 
 func (d *decoder) updateCRC(prevCrc uint32) {
@@ -78,6 +111,8 @@ func (d *decoder) lastCRC() uint32 {
 	return d.crc.Sum32()
 }
 
+func (d *decoder) lastOffset() int64 { return d.lastValidOff }
+
 func mustUnmarshalEntry(d []byte) raftpb.Entry {
 	var e raftpb.Entry
 	pbutil.MustUnmarshal(&e, d)

+ 95 - 0
wal/file_pipeline.go

@@ -0,0 +1,95 @@
+// Copyright 2016 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package wal
+
+import (
+	"fmt"
+	"os"
+	"path"
+
+	"github.com/coreos/etcd/pkg/fileutil"
+)
+
+// filePipeline pipelines allocating disk space
+type filePipeline struct {
+	// dir to put files
+	dir string
+	// size of files to make, in bytes
+	size int64
+	// count number of files generated
+	count int
+
+	filec chan *fileutil.LockedFile
+	errc  chan error
+	donec chan struct{}
+}
+
+func newFilePipeline(dir string, fileSize int64) *filePipeline {
+	fp := &filePipeline{
+		dir:   dir,
+		size:  fileSize,
+		filec: make(chan *fileutil.LockedFile),
+		errc:  make(chan error, 1),
+		donec: make(chan struct{}),
+	}
+	go fp.run()
+	return fp
+}
+
+// Open returns a fresh file for writing
+func (fp *filePipeline) Open() (f *fileutil.LockedFile, err error) {
+	select {
+	case f = <-fp.filec:
+	case err = <-fp.errc:
+	}
+	return
+}
+
+func (fp *filePipeline) Close() error {
+	close(fp.donec)
+	return <-fp.errc
+}
+
+func (fp *filePipeline) alloc() (f *fileutil.LockedFile, err error) {
+	fpath := path.Join(fp.dir, fmt.Sprintf("%d.tmp", fp.count))
+	if f, err = fileutil.LockFile(fpath, os.O_CREATE|os.O_WRONLY, 0600); err != nil {
+		return nil, err
+	}
+	if err = fileutil.Preallocate(f.File, fp.size, true); err != nil {
+		plog.Errorf("failed to allocate space when creating new wal file (%v)", err)
+		f.Close()
+		return nil, err
+	}
+	fp.count++
+	return f, nil
+}
+
+func (fp *filePipeline) run() {
+	defer close(fp.errc)
+	for {
+		f, err := fp.alloc()
+		if err != nil {
+			fp.errc <- err
+			return
+		}
+		select {
+		case fp.filec <- f:
+		case <-fp.donec:
+			os.Remove(f.Name())
+			f.Close()
+			return
+		}
+	}
+}

+ 0 - 45
wal/multi_readcloser.go

@@ -1,45 +0,0 @@
-// Copyright 2015 CoreOS, Inc.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package wal
-
-import "io"
-
-type multiReadCloser struct {
-	closers []io.Closer
-	reader  io.Reader
-}
-
-func (mc *multiReadCloser) Close() error {
-	var err error
-	for i := range mc.closers {
-		err = mc.closers[i].Close()
-	}
-	return err
-}
-
-func (mc *multiReadCloser) Read(p []byte) (int, error) {
-	return mc.reader.Read(p)
-}
-
-func MultiReadCloser(readClosers ...io.ReadCloser) io.ReadCloser {
-	cs := make([]io.Closer, len(readClosers))
-	rs := make([]io.Reader, len(readClosers))
-	for i := range readClosers {
-		cs[i] = readClosers[i]
-		rs[i] = readClosers[i]
-	}
-	r := io.MultiReader(rs...)
-	return &multiReadCloser{cs, r}
-}

+ 3 - 0
wal/repair.go

@@ -55,6 +55,9 @@ func Repair(dirpath string) bool {
 			continue
 		case io.EOF:
 			return true
+		case ErrZeroTrailer:
+			plog.Noticef("found zero trailer in %v", f.Name())
+			fallthrough
 		case io.ErrUnexpectedEOF:
 			plog.Noticef("repairing %v", f.Name())
 			bf, bferr := os.Create(f.Name() + ".broken")

+ 6 - 6
wal/repair_test.go

@@ -44,6 +44,10 @@ func TestRepair(t *testing.T) {
 			t.Fatal(err)
 		}
 	}
+	offset, err := w.tail().Seek(0, os.SEEK_CUR)
+	if err != nil {
+		t.Fatal(err)
+	}
 	w.Close()
 
 	// break the wal.
@@ -51,11 +55,7 @@ func TestRepair(t *testing.T) {
 	if err != nil {
 		t.Fatal(err)
 	}
-	offset, err := f.Seek(-4, os.SEEK_END)
-	if err != nil {
-		t.Fatal(err)
-	}
-	err = f.Truncate(offset)
+	err = f.Truncate(offset - 4)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -67,7 +67,7 @@ func TestRepair(t *testing.T) {
 	}
 	_, _, _, err = w.ReadAll()
 	if err != io.ErrUnexpectedEOF {
-		t.Fatalf("err = %v, want %v", err, io.ErrUnexpectedEOF)
+		t.Fatalf("err = %v, want error %v", err, io.ErrUnexpectedEOF)
 	}
 	w.Close()
 

+ 69 - 28
wal/wal.go

@@ -57,6 +57,7 @@ var (
 	ErrCRCMismatch      = errors.New("wal: crc mismatch")
 	ErrSnapshotMismatch = errors.New("wal: snapshot mismatch")
 	ErrSnapshotNotFound = errors.New("wal: snapshot not found")
+	ErrZeroTrailer      = errors.New("wal: zero trailer")
 	crcTable            = crc32.MakeTable(crc32.Castagnoli)
 )
 
@@ -72,13 +73,14 @@ type WAL struct {
 
 	start     walpb.Snapshot // snapshot to start reading
 	decoder   *decoder       // decoder to decode records
-	readClose io.Closer      // closer for decode reader
+	readClose func() error   // closer for decode reader
 
 	mu      sync.Mutex
 	enti    uint64   // index of the last entry saved to the wal
 	encoder *encoder // encoder to encode records
 
 	locks []*fileutil.LockedFile // the locked files the WAL holds (the name is increasing)
+	fp    *filePipeline
 }
 
 // Create creates a WAL ready for appending records. The given metadata is
@@ -93,15 +95,22 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
 	}
 
 	p := path.Join(dirpath, walName(0, 0))
-	f, err := fileutil.LockFile(p, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600)
+	f, err := fileutil.LockFile(p, os.O_WRONLY|os.O_CREATE, 0600)
 	if err != nil {
 		return nil, err
 	}
+	if _, err := f.Seek(0, os.SEEK_END); err != nil {
+		return nil, err
+	}
+	if err := fileutil.Preallocate(f.File, segmentSizeBytes, true); err != nil {
+		return nil, err
+	}
 
 	w := &WAL{
 		dir:      dirpath,
 		metadata: metadata,
 		encoder:  newEncoder(f, 0),
+		fp:       newFilePipeline(dirpath, segmentSizeBytes),
 	}
 	w.locks = append(w.locks, f)
 	if err := w.saveCrc(0); err != nil {
@@ -149,13 +158,14 @@ func openAtIndex(dirpath string, snap walpb.Snapshot, write bool) (*WAL, error)
 
 	// open the wal files
 	rcs := make([]io.ReadCloser, 0)
+	rs := make([]io.Reader, 0)
 	ls := make([]*fileutil.LockedFile, 0)
 	for _, name := range names[nameIndex:] {
 		p := path.Join(dirpath, name)
 		if write {
 			l, err := fileutil.TryLockFile(p, os.O_RDWR, 0600)
 			if err != nil {
-				MultiReadCloser(rcs...).Close()
+				closeAll(rcs...)
 				return nil, err
 			}
 			ls = append(ls, l)
@@ -163,40 +173,42 @@ func openAtIndex(dirpath string, snap walpb.Snapshot, write bool) (*WAL, error)
 		} else {
 			rf, err := os.OpenFile(p, os.O_RDONLY, 0600)
 			if err != nil {
+				closeAll(rcs...)
 				return nil, err
 			}
 			ls = append(ls, nil)
 			rcs = append(rcs, rf)
 		}
+		rs = append(rs, rcs[len(rcs)-1])
 	}
 
-	rc := MultiReadCloser(rcs...)
-	c := rc
-	if write {
-		// write reuses the file descriptors from read; don't close so
-		// WAL can append without dropping the file lock
-		c = nil
-	}
+	closer := func() error { return closeAll(rcs...) }
 
 	// create a WAL ready for reading
 	w := &WAL{
 		dir:       dirpath,
 		start:     snap,
-		decoder:   newDecoder(rc),
-		readClose: c,
+		decoder:   newDecoder(rs...),
+		readClose: closer,
 		locks:     ls,
 	}
 
 	if write {
+		// write reuses the file descriptors from read; don't close so
+		// WAL can append without dropping the file lock
+		w.readClose = nil
+
 		if _, _, err := parseWalName(path.Base(w.tail().Name())); err != nil {
-			rc.Close()
+			closer()
 			return nil, err
 		}
-		if err := fileutil.Preallocate(w.tail().File, segmentSizeBytes); err != nil {
-			rc.Close()
+		// don't resize file for preallocation in case tail is corrupted
+		if err := fileutil.Preallocate(w.tail().File, segmentSizeBytes, false); err != nil {
+			closer()
 			plog.Errorf("failed to allocate space when creating new wal file (%v)", err)
 			return nil, err
 		}
+		w.fp = newFilePipeline(w.dir, segmentSizeBytes)
 	}
 
 	return w, nil
@@ -261,6 +273,9 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
 		}
 	}
 
+	if err == ErrZeroTrailer {
+		err = io.EOF
+	}
 	switch w.tail() {
 	case nil:
 		// We do not have to read out all entries in read mode.
@@ -285,7 +300,7 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
 
 	// close decoder, disable reading
 	if w.readClose != nil {
-		w.readClose.Close()
+		w.readClose()
 		w.readClose = nil
 	}
 	w.start = walpb.Snapshot{}
@@ -294,6 +309,7 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
 
 	if w.tail() != nil {
 		// create encoder (chain crc with the decoder), enable appending
+		_, err = w.tail().Seek(w.decoder.lastOffset(), os.SEEK_SET)
 		w.encoder = newEncoder(w.tail(), w.decoder.lastCRC())
 		lastIndexSaved.Set(float64(w.enti))
 	}
@@ -306,16 +322,22 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
 // cut first creates a temp wal file and writes necessary headers into it.
 // Then cut atomically rename temp wal file to a wal file.
 func (w *WAL) cut() error {
-	// close old wal file
+	// close old wal file; truncate to avoid wasting space if an early cut
+	off, serr := w.tail().Seek(0, os.SEEK_CUR)
+	if serr != nil {
+		return serr
+	}
+	if err := w.tail().Truncate(off); err != nil {
+		return err
+	}
 	if err := w.sync(); err != nil {
 		return err
 	}
 
 	fpath := path.Join(w.dir, walName(w.seq()+1, w.enti+1))
-	ftpath := fpath + ".tmp"
 
 	// create a temp wal file with name sequence + 1, or truncate the existing one
-	newTail, err := fileutil.LockFile(ftpath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
+	newTail, err := w.fp.Open()
 	if err != nil {
 		return err
 	}
@@ -337,24 +359,29 @@ func (w *WAL) cut() error {
 	if err = w.sync(); err != nil {
 		return err
 	}
-	if err = os.Rename(ftpath, fpath); err != nil {
+
+	off, err = w.tail().Seek(0, os.SEEK_CUR)
+	if err != nil {
+		return err
+	}
+
+	if err = os.Rename(newTail.Name(), fpath); err != nil {
 		return err
 	}
 	newTail.Close()
 
-	if newTail, err = fileutil.LockFile(fpath, os.O_WRONLY|os.O_APPEND, 0600); err != nil {
+	if newTail, err = fileutil.LockFile(fpath, os.O_WRONLY, 0600); err != nil {
+		return err
+	}
+	if _, err = newTail.Seek(off, os.SEEK_SET); err != nil {
 		return err
 	}
+
 	w.locks[len(w.locks)-1] = newTail
 
 	prevCrc = w.encoder.crc.Sum32()
 	w.encoder = newEncoder(w.tail(), prevCrc)
 
-	if err = fileutil.Preallocate(w.tail().File, segmentSizeBytes); err != nil {
-		plog.Errorf("failed to allocate space when creating new wal file (%v)", err)
-		return err
-	}
-
 	plog.Infof("segmented wal file %v is created", fpath)
 	return nil
 }
@@ -419,6 +446,11 @@ func (w *WAL) Close() error {
 	w.mu.Lock()
 	defer w.mu.Unlock()
 
+	if w.fp != nil {
+		w.fp.Close()
+		w.fp = nil
+	}
+
 	if w.tail() != nil {
 		if err := w.sync(); err != nil {
 			return err
@@ -478,11 +510,11 @@ func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {
 		return err
 	}
 
-	fstat, err := w.tail().Stat()
+	curOff, err := w.tail().Seek(0, os.SEEK_CUR)
 	if err != nil {
 		return err
 	}
-	if fstat.Size() < segmentSizeBytes {
+	if curOff < segmentSizeBytes {
 		if mustSync {
 			return w.sync()
 		}
@@ -544,3 +576,12 @@ func mustSync(st, prevst raftpb.HardState, entsnum int) bool {
 	}
 	return false
 }
+
+func closeAll(rcs ...io.ReadCloser) error {
+	for _, f := range rcs {
+		if err := f.Close(); err != nil {
+			return err
+		}
+	}
+	return nil
+}

+ 13 - 1
wal/wal_test.go

@@ -16,6 +16,7 @@ package wal
 
 import (
 	"bytes"
+	"io"
 	"io/ioutil"
 	"os"
 	"path"
@@ -42,8 +43,19 @@ func TestNew(t *testing.T) {
 		t.Errorf("name = %+v, want %+v", g, walName(0, 0))
 	}
 	defer w.Close()
-	gd, err := ioutil.ReadFile(w.tail().Name())
+
+	// file is preallocated to segment size; only read data written by wal
+	off, err := w.tail().Seek(0, os.SEEK_CUR)
+	if err != nil {
+		t.Fatal(err)
+	}
+	gd := make([]byte, off)
+	f, err := os.Open(w.tail().Name())
 	if err != nil {
+		t.Fatal(err)
+	}
+	defer f.Close()
+	if _, err = io.ReadFull(f, gd); err != nil {
 		t.Fatalf("err = %v, want nil", err)
 	}