Browse Source

wal: preallocate WAL files with initial size equal to segment size

Avoids having to update file size metadata during fdatasync on common path.

Fixes #4755
Anthony Romano 9 years ago
parent
commit
24b806d2ee
6 changed files with 122 additions and 84 deletions
  1. 47 12
      wal/decoder.go
  2. 0 45
      wal/multi_readcloser.go
  3. 3 0
      wal/repair.go
  4. 6 6
      wal/repair_test.go
  5. 53 20
      wal/wal.go
  6. 13 1
      wal/wal_test.go

+ 47 - 12
wal/decoder.go

@@ -28,30 +28,58 @@ import (
 )
 )
 
 
 type decoder struct {
 type decoder struct {
-	mu sync.Mutex
-	br *bufio.Reader
+	mu  sync.Mutex
+	brs []*bufio.Reader
 
 
-	crc hash.Hash32
+	// lastValidOff file offset following the last valid decoded record
+	lastValidOff int64
+	crc          hash.Hash32
 }
 }
 
 
-func newDecoder(r io.Reader) *decoder {
+func newDecoder(r ...io.Reader) *decoder {
+	readers := make([]*bufio.Reader, len(r))
+	for i := range r {
+		readers[i] = bufio.NewReader(r[i])
+	}
 	return &decoder{
 	return &decoder{
-		br:  bufio.NewReader(r),
+		brs: readers,
 		crc: crc.New(0, crcTable),
 		crc: crc.New(0, crcTable),
 	}
 	}
 }
 }
 
 
 func (d *decoder) decode(rec *walpb.Record) error {
 func (d *decoder) decode(rec *walpb.Record) error {
+	rec.Reset()
 	d.mu.Lock()
 	d.mu.Lock()
 	defer d.mu.Unlock()
 	defer d.mu.Unlock()
+	return d.decodeRecord(rec)
+}
+
+func (d *decoder) decodeRecord(rec *walpb.Record) error {
+	if len(d.brs) == 0 {
+		return io.EOF
+	}
+
+	l, err := readInt64(d.brs[0])
+	if err == io.EOF {
+		d.brs = d.brs[1:]
+		d.lastValidOff = 0
+		return d.decodeRecord(rec)
+	}
 
 
-	rec.Reset()
-	l, err := readInt64(d.br)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
+	if l == 0 {
+		// hit preallocated space
+		d.brs = d.brs[1:]
+		if len(d.brs) == 0 {
+			return io.EOF
+		}
+		d.lastValidOff = 0
+		return d.decodeRecord(rec)
+	}
 	data := make([]byte, l)
 	data := make([]byte, l)
-	if _, err = io.ReadFull(d.br, data); err != nil {
+	if _, err = io.ReadFull(d.brs[0], data); err != nil {
 		// ReadFull returns io.EOF only if no bytes were read
 		// ReadFull returns io.EOF only if no bytes were read
 		// the decoder should treat this as an ErrUnexpectedEOF instead.
 		// the decoder should treat this as an ErrUnexpectedEOF instead.
 		if err == io.EOF {
 		if err == io.EOF {
@@ -62,12 +90,17 @@ func (d *decoder) decode(rec *walpb.Record) error {
 	if err := rec.Unmarshal(data); err != nil {
 	if err := rec.Unmarshal(data); err != nil {
 		return err
 		return err
 	}
 	}
+
 	// skip crc checking if the record type is crcType
 	// skip crc checking if the record type is crcType
-	if rec.Type == crcType {
-		return nil
+	if rec.Type != crcType {
+		d.crc.Write(rec.Data)
+		if err := rec.Validate(d.crc.Sum32()); err != nil {
+			return err
+		}
 	}
 	}
-	d.crc.Write(rec.Data)
-	return rec.Validate(d.crc.Sum32())
+	// record decoded as valid; point last valid offset to end of record
+	d.lastValidOff += l + 8
+	return nil
 }
 }
 
 
 func (d *decoder) updateCRC(prevCrc uint32) {
 func (d *decoder) updateCRC(prevCrc uint32) {
@@ -78,6 +111,8 @@ func (d *decoder) lastCRC() uint32 {
 	return d.crc.Sum32()
 	return d.crc.Sum32()
 }
 }
 
 
+func (d *decoder) lastOffset() int64 { return d.lastValidOff }
+
 func mustUnmarshalEntry(d []byte) raftpb.Entry {
 func mustUnmarshalEntry(d []byte) raftpb.Entry {
 	var e raftpb.Entry
 	var e raftpb.Entry
 	pbutil.MustUnmarshal(&e, d)
 	pbutil.MustUnmarshal(&e, d)

+ 0 - 45
wal/multi_readcloser.go

@@ -1,45 +0,0 @@
-// Copyright 2015 CoreOS, Inc.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package wal
-
-import "io"
-
-type multiReadCloser struct {
-	closers []io.Closer
-	reader  io.Reader
-}
-
-func (mc *multiReadCloser) Close() error {
-	var err error
-	for i := range mc.closers {
-		err = mc.closers[i].Close()
-	}
-	return err
-}
-
-func (mc *multiReadCloser) Read(p []byte) (int, error) {
-	return mc.reader.Read(p)
-}
-
-func MultiReadCloser(readClosers ...io.ReadCloser) io.ReadCloser {
-	cs := make([]io.Closer, len(readClosers))
-	rs := make([]io.Reader, len(readClosers))
-	for i := range readClosers {
-		cs[i] = readClosers[i]
-		rs[i] = readClosers[i]
-	}
-	r := io.MultiReader(rs...)
-	return &multiReadCloser{cs, r}
-}

+ 3 - 0
wal/repair.go

@@ -55,6 +55,9 @@ func Repair(dirpath string) bool {
 			continue
 			continue
 		case io.EOF:
 		case io.EOF:
 			return true
 			return true
+		case ErrZeroTrailer:
+			plog.Noticef("found zero trailer in %v", f.Name())
+			fallthrough
 		case io.ErrUnexpectedEOF:
 		case io.ErrUnexpectedEOF:
 			plog.Noticef("repairing %v", f.Name())
 			plog.Noticef("repairing %v", f.Name())
 			bf, bferr := os.Create(f.Name() + ".broken")
 			bf, bferr := os.Create(f.Name() + ".broken")

+ 6 - 6
wal/repair_test.go

@@ -44,6 +44,10 @@ func TestRepair(t *testing.T) {
 			t.Fatal(err)
 			t.Fatal(err)
 		}
 		}
 	}
 	}
+	offset, err := w.tail().Seek(0, os.SEEK_CUR)
+	if err != nil {
+		t.Fatal(err)
+	}
 	w.Close()
 	w.Close()
 
 
 	// break the wal.
 	// break the wal.
@@ -51,11 +55,7 @@ func TestRepair(t *testing.T) {
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
-	offset, err := f.Seek(-4, os.SEEK_END)
-	if err != nil {
-		t.Fatal(err)
-	}
-	err = f.Truncate(offset)
+	err = f.Truncate(offset - 4)
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
@@ -67,7 +67,7 @@ func TestRepair(t *testing.T) {
 	}
 	}
 	_, _, _, err = w.ReadAll()
 	_, _, _, err = w.ReadAll()
 	if err != io.ErrUnexpectedEOF {
 	if err != io.ErrUnexpectedEOF {
-		t.Fatalf("err = %v, want %v", err, io.ErrUnexpectedEOF)
+		t.Fatalf("err = %v, want error %v", err, io.ErrUnexpectedEOF)
 	}
 	}
 	w.Close()
 	w.Close()
 
 

+ 53 - 20
wal/wal.go

@@ -57,6 +57,7 @@ var (
 	ErrCRCMismatch      = errors.New("wal: crc mismatch")
 	ErrCRCMismatch      = errors.New("wal: crc mismatch")
 	ErrSnapshotMismatch = errors.New("wal: snapshot mismatch")
 	ErrSnapshotMismatch = errors.New("wal: snapshot mismatch")
 	ErrSnapshotNotFound = errors.New("wal: snapshot not found")
 	ErrSnapshotNotFound = errors.New("wal: snapshot not found")
+	ErrZeroTrailer      = errors.New("wal: zero trailer")
 	crcTable            = crc32.MakeTable(crc32.Castagnoli)
 	crcTable            = crc32.MakeTable(crc32.Castagnoli)
 )
 )
 
 
@@ -72,7 +73,7 @@ type WAL struct {
 
 
 	start     walpb.Snapshot // snapshot to start reading
 	start     walpb.Snapshot // snapshot to start reading
 	decoder   *decoder       // decoder to decode records
 	decoder   *decoder       // decoder to decode records
-	readClose io.Closer      // closer for decode reader
+	readClose func() error   // closer for decode reader
 
 
 	mu      sync.Mutex
 	mu      sync.Mutex
 	enti    uint64   // index of the last entry saved to the wal
 	enti    uint64   // index of the last entry saved to the wal
@@ -93,10 +94,16 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
 	}
 	}
 
 
 	p := path.Join(dirpath, walName(0, 0))
 	p := path.Join(dirpath, walName(0, 0))
-	f, err := fileutil.LockFile(p, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600)
+	f, err := fileutil.LockFile(p, os.O_WRONLY|os.O_CREATE, 0600)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
+	if _, err := f.Seek(0, os.SEEK_END); err != nil {
+		return nil, err
+	}
+	if err := fileutil.Preallocate(f.File, segmentSizeBytes, true); err != nil {
+		return nil, err
+	}
 
 
 	w := &WAL{
 	w := &WAL{
 		dir:      dirpath,
 		dir:      dirpath,
@@ -149,13 +156,14 @@ func openAtIndex(dirpath string, snap walpb.Snapshot, write bool) (*WAL, error)
 
 
 	// open the wal files
 	// open the wal files
 	rcs := make([]io.ReadCloser, 0)
 	rcs := make([]io.ReadCloser, 0)
+	rs := make([]io.Reader, 0)
 	ls := make([]*fileutil.LockedFile, 0)
 	ls := make([]*fileutil.LockedFile, 0)
 	for _, name := range names[nameIndex:] {
 	for _, name := range names[nameIndex:] {
 		p := path.Join(dirpath, name)
 		p := path.Join(dirpath, name)
 		if write {
 		if write {
 			l, err := fileutil.TryLockFile(p, os.O_RDWR, 0600)
 			l, err := fileutil.TryLockFile(p, os.O_RDWR, 0600)
 			if err != nil {
 			if err != nil {
-				MultiReadCloser(rcs...).Close()
+				closeAll(rcs...)
 				return nil, err
 				return nil, err
 			}
 			}
 			ls = append(ls, l)
 			ls = append(ls, l)
@@ -163,37 +171,38 @@ func openAtIndex(dirpath string, snap walpb.Snapshot, write bool) (*WAL, error)
 		} else {
 		} else {
 			rf, err := os.OpenFile(p, os.O_RDONLY, 0600)
 			rf, err := os.OpenFile(p, os.O_RDONLY, 0600)
 			if err != nil {
 			if err != nil {
+				closeAll(rcs...)
 				return nil, err
 				return nil, err
 			}
 			}
 			ls = append(ls, nil)
 			ls = append(ls, nil)
 			rcs = append(rcs, rf)
 			rcs = append(rcs, rf)
 		}
 		}
+		rs = append(rs, rcs[len(rcs)-1])
 	}
 	}
 
 
-	rc := MultiReadCloser(rcs...)
-	c := rc
-	if write {
-		// write reuses the file descriptors from read; don't close so
-		// WAL can append without dropping the file lock
-		c = nil
-	}
+	closer := func() error { return closeAll(rcs...) }
 
 
 	// create a WAL ready for reading
 	// create a WAL ready for reading
 	w := &WAL{
 	w := &WAL{
 		dir:       dirpath,
 		dir:       dirpath,
 		start:     snap,
 		start:     snap,
-		decoder:   newDecoder(rc),
-		readClose: c,
+		decoder:   newDecoder(rs...),
+		readClose: closer,
 		locks:     ls,
 		locks:     ls,
 	}
 	}
 
 
 	if write {
 	if write {
+		// write reuses the file descriptors from read; don't close so
+		// WAL can append without dropping the file lock
+		w.readClose = nil
+
 		if _, _, err := parseWalName(path.Base(w.tail().Name())); err != nil {
 		if _, _, err := parseWalName(path.Base(w.tail().Name())); err != nil {
-			rc.Close()
+			closer()
 			return nil, err
 			return nil, err
 		}
 		}
+		// don't resize file for preallocation in case tail is corrupted
 		if err := fileutil.Preallocate(w.tail().File, segmentSizeBytes, false); err != nil {
 		if err := fileutil.Preallocate(w.tail().File, segmentSizeBytes, false); err != nil {
-			rc.Close()
+			closer()
 			plog.Errorf("failed to allocate space when creating new wal file (%v)", err)
 			plog.Errorf("failed to allocate space when creating new wal file (%v)", err)
 			return nil, err
 			return nil, err
 		}
 		}
@@ -261,6 +270,9 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
 		}
 		}
 	}
 	}
 
 
+	if err == ErrZeroTrailer {
+		err = io.EOF
+	}
 	switch w.tail() {
 	switch w.tail() {
 	case nil:
 	case nil:
 		// We do not have to read out all entries in read mode.
 		// We do not have to read out all entries in read mode.
@@ -285,7 +297,7 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
 
 
 	// close decoder, disable reading
 	// close decoder, disable reading
 	if w.readClose != nil {
 	if w.readClose != nil {
-		w.readClose.Close()
+		w.readClose()
 		w.readClose = nil
 		w.readClose = nil
 	}
 	}
 	w.start = walpb.Snapshot{}
 	w.start = walpb.Snapshot{}
@@ -294,6 +306,7 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
 
 
 	if w.tail() != nil {
 	if w.tail() != nil {
 		// create encoder (chain crc with the decoder), enable appending
 		// create encoder (chain crc with the decoder), enable appending
+		_, err = w.tail().Seek(w.decoder.lastOffset(), os.SEEK_SET)
 		w.encoder = newEncoder(w.tail(), w.decoder.lastCRC())
 		w.encoder = newEncoder(w.tail(), w.decoder.lastCRC())
 		lastIndexSaved.Set(float64(w.enti))
 		lastIndexSaved.Set(float64(w.enti))
 	}
 	}
@@ -306,7 +319,14 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
 // cut first creates a temp wal file and writes necessary headers into it.
 // cut first creates a temp wal file and writes necessary headers into it.
 // Then cut atomically rename temp wal file to a wal file.
 // Then cut atomically rename temp wal file to a wal file.
 func (w *WAL) cut() error {
 func (w *WAL) cut() error {
-	// close old wal file
+	// close old wal file; truncate to avoid wasting space if an early cut
+	off, serr := w.tail().Seek(0, os.SEEK_CUR)
+	if serr != nil {
+		return serr
+	}
+	if err := w.tail().Truncate(off); err != nil {
+		return err
+	}
 	if err := w.sync(); err != nil {
 	if err := w.sync(); err != nil {
 		return err
 		return err
 	}
 	}
@@ -342,15 +362,19 @@ func (w *WAL) cut() error {
 	}
 	}
 	newTail.Close()
 	newTail.Close()
 
 
-	if newTail, err = fileutil.LockFile(fpath, os.O_WRONLY|os.O_APPEND, 0600); err != nil {
+	if newTail, err = fileutil.LockFile(fpath, os.O_WRONLY, 0600); err != nil {
 		return err
 		return err
 	}
 	}
+	if _, err = newTail.Seek(0, os.SEEK_END); err != nil {
+		return err
+	}
+
 	w.locks[len(w.locks)-1] = newTail
 	w.locks[len(w.locks)-1] = newTail
 
 
 	prevCrc = w.encoder.crc.Sum32()
 	prevCrc = w.encoder.crc.Sum32()
 	w.encoder = newEncoder(w.tail(), prevCrc)
 	w.encoder = newEncoder(w.tail(), prevCrc)
 
 
-	if err = fileutil.Preallocate(w.tail().File, segmentSizeBytes, false); err != nil {
+	if err = fileutil.Preallocate(w.tail().File, segmentSizeBytes, true); err != nil {
 		plog.Errorf("failed to allocate space when creating new wal file (%v)", err)
 		plog.Errorf("failed to allocate space when creating new wal file (%v)", err)
 		return err
 		return err
 	}
 	}
@@ -478,11 +502,11 @@ func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {
 		return err
 		return err
 	}
 	}
 
 
-	fstat, err := w.tail().Stat()
+	curOff, err := w.tail().Seek(0, os.SEEK_CUR)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
-	if fstat.Size() < segmentSizeBytes {
+	if curOff < segmentSizeBytes {
 		if mustSync {
 		if mustSync {
 			return w.sync()
 			return w.sync()
 		}
 		}
@@ -544,3 +568,12 @@ func mustSync(st, prevst raftpb.HardState, entsnum int) bool {
 	}
 	}
 	return false
 	return false
 }
 }
+
+func closeAll(rcs ...io.ReadCloser) error {
+	for _, f := range rcs {
+		if err := f.Close(); err != nil {
+			return err
+		}
+	}
+	return nil
+}

+ 13 - 1
wal/wal_test.go

@@ -16,6 +16,7 @@ package wal
 
 
 import (
 import (
 	"bytes"
 	"bytes"
+	"io"
 	"io/ioutil"
 	"io/ioutil"
 	"os"
 	"os"
 	"path"
 	"path"
@@ -42,8 +43,19 @@ func TestNew(t *testing.T) {
 		t.Errorf("name = %+v, want %+v", g, walName(0, 0))
 		t.Errorf("name = %+v, want %+v", g, walName(0, 0))
 	}
 	}
 	defer w.Close()
 	defer w.Close()
-	gd, err := ioutil.ReadFile(w.tail().Name())
+
+	// file is preallocated to segment size; only read data written by wal
+	off, err := w.tail().Seek(0, os.SEEK_CUR)
+	if err != nil {
+		t.Fatal(err)
+	}
+	gd := make([]byte, off)
+	f, err := os.Open(w.tail().Name())
 	if err != nil {
 	if err != nil {
+		t.Fatal(err)
+	}
+	defer f.Close()
+	if _, err = io.ReadFull(f, gd); err != nil {
 		t.Fatalf("err = %v, want nil", err)
 		t.Fatalf("err = %v, want nil", err)
 	}
 	}