Browse Source

Merge pull request #6310 from heyitsanthony/wal-page-write

wal: use page buffered writer for writing records
Anthony Romano 9 years ago
parent
commit
e29c79c54c
3 changed files with 211 additions and 3 deletions
  1. 103 0
      pkg/ioutil/pagewriter.go
  2. 100 0
      pkg/ioutil/pagewriter_test.go
  3. 8 3
      wal/encoder.go

+ 103 - 0
pkg/ioutil/pagewriter.go

@@ -0,0 +1,103 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package ioutil
+
+import (
+	"io"
+)
+
+var defaultBufferBytes = 128 * 1024
+
+// PageWriter implements the io.Writer interface so that writes will
+// either be in page chunks or from flushing.
+type PageWriter struct {
+	w io.Writer
+	// pageOffset tracks the page offset of the base of the buffer
+	pageOffset int
+	// pageBytes is the number of bytes per page
+	pageBytes int
+	// bufferedBytes counts the number of bytes pending for write in the buffer
+	bufferedBytes int
+	// buf holds the write buffer
+	buf []byte
+	// bufWatermarkBytes is the number of bytes the buffer can hold before it needs
+	// to be flushed. It is less than len(buf) so there is space for slack writes
+	// to bring the writer to page alignment.
+	bufWatermarkBytes int
+}
+
+func NewPageWriter(w io.Writer, pageBytes int) *PageWriter {
+	return &PageWriter{
+		w:                 w,
+		pageBytes:         pageBytes,
+		buf:               make([]byte, defaultBufferBytes+pageBytes),
+		bufWatermarkBytes: defaultBufferBytes,
+	}
+}
+
+func (pw *PageWriter) Write(p []byte) (n int, err error) {
+	if len(p)+pw.bufferedBytes <= pw.bufWatermarkBytes {
+		// no overflow
+		copy(pw.buf[pw.bufferedBytes:], p)
+		pw.bufferedBytes += len(p)
+		return len(p), nil
+	}
+	// complete the slack page in the buffer if unaligned
+	slack := pw.pageBytes - ((pw.pageOffset + pw.bufferedBytes) % pw.pageBytes)
+	if slack != pw.pageBytes {
+		partial := slack > len(p)
+		if partial {
+			// not enough data to complete the slack page
+			slack = len(p)
+		}
+		// special case: writing to slack page in buffer
+		copy(pw.buf[pw.bufferedBytes:], p[:slack])
+		pw.bufferedBytes += slack
+		n = slack
+		p = p[slack:]
+		if partial {
+			// avoid forcing an unaligned flush
+			return n, nil
+		}
+	}
+	// buffer contents are now page-aligned; clear out
+	if err = pw.Flush(); err != nil {
+		return n, err
+	}
+	// directly write all complete pages without copying
+	if len(p) > pw.pageBytes {
+		pages := len(p) / pw.pageBytes
+		c, werr := pw.w.Write(p[:pages*pw.pageBytes])
+		n += c
+		if werr != nil {
+			return n, werr
+		}
+		p = p[pages*pw.pageBytes:]
+	}
+	// write remaining tail to buffer
+	c, werr := pw.Write(p)
+	n += c
+	return n, werr
+}
+
+func (pw *PageWriter) Flush() error {
+	if pw.bufferedBytes == 0 {
+		return nil
+	}
+	_, err := pw.w.Write(pw.buf[:pw.bufferedBytes])
+	pw.pageOffset = (pw.pageOffset + pw.bufferedBytes) % pw.pageBytes
+	pw.bufferedBytes = 0
+	return err
+}

+ 100 - 0
pkg/ioutil/pagewriter_test.go

@@ -0,0 +1,100 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package ioutil
+
+import (
+	"math/rand"
+	"testing"
+)
+
+func TestPageWriterRandom(t *testing.T) {
+	// smaller buffer for stress testing
+	defaultBufferBytes = 8 * 1024
+	pageBytes := 128
+	buf := make([]byte, 4*defaultBufferBytes)
+	cw := &checkPageWriter{pageBytes: pageBytes, t: t}
+	w := NewPageWriter(cw, pageBytes)
+	n := 0
+	for i := 0; i < 4096; i++ {
+		c, err := w.Write(buf[:rand.Intn(len(buf))])
+		if err != nil {
+			t.Fatal(err)
+		}
+		n += c
+	}
+	if cw.writeBytes > n {
+		t.Fatalf("wrote %d bytes to io.Writer, but only wrote %d bytes", cw.writeBytes, n)
+	}
+	if cw.writeBytes-n > pageBytes {
+		t.Fatalf("got %d bytes pending, expected less than %d bytes", cw.writeBytes-n, pageBytes)
+	}
+	t.Logf("total writes: %d", cw.writes)
+	t.Logf("total write bytes: %d (of %d)", cw.writeBytes, n)
+}
+
+// TestPageWriterPariallack tests the case where a write overflows the buffer
+// but there is not enough data to complete the slack write.
+func TestPageWriterPartialSlack(t *testing.T) {
+	defaultBufferBytes = 1024
+	pageBytes := 128
+	buf := make([]byte, defaultBufferBytes)
+	cw := &checkPageWriter{pageBytes: 64, t: t}
+	w := NewPageWriter(cw, pageBytes)
+	// put writer in non-zero page offset
+	if _, err := w.Write(buf[:64]); err != nil {
+		t.Fatal(err)
+	}
+	if err := w.Flush(); err != nil {
+		t.Fatal(err)
+	}
+	if cw.writes != 1 {
+		t.Fatalf("got %d writes, expected 1", cw.writes)
+	}
+	// nearly fill buffer
+	if _, err := w.Write(buf[:1022]); err != nil {
+		t.Fatal(err)
+	}
+	// overflow buffer, but without enough to write as aligned
+	if _, err := w.Write(buf[:8]); err != nil {
+		t.Fatal(err)
+	}
+	if cw.writes != 1 {
+		t.Fatalf("got %d writes, expected 1", cw.writes)
+	}
+	// finish writing slack space
+	if _, err := w.Write(buf[:128]); err != nil {
+		t.Fatal(err)
+	}
+	if cw.writes != 2 {
+		t.Fatalf("got %d writes, expected 2", cw.writes)
+	}
+}
+
+// checkPageWriter implements an io.Writer that fails a test on unaligned writes.
+type checkPageWriter struct {
+	pageBytes  int
+	writes     int
+	writeBytes int
+	t          *testing.T
+}
+
+func (cw *checkPageWriter) Write(p []byte) (int, error) {
+	if len(p)%cw.pageBytes != 0 {
+		cw.t.Fatalf("got write len(p) = %d, expected len(p) == k*cw.pageBytes", len(p))
+	}
+	cw.writes++
+	cw.writeBytes += len(p)
+	return len(p), nil
+}

+ 8 - 3
wal/encoder.go

@@ -15,19 +15,24 @@
 package wal
 
 import (
-	"bufio"
 	"encoding/binary"
 	"hash"
 	"io"
 	"sync"
 
 	"github.com/coreos/etcd/pkg/crc"
+	"github.com/coreos/etcd/pkg/ioutil"
 	"github.com/coreos/etcd/wal/walpb"
 )
 
+// walPageBytes is the alignment for flushing records to the backing Writer.
+// It should be a multiple of the minimum sector size so that WAL repair can
+// safely between torn writes and ordinary data corruption.
+const walPageBytes = 8 * minSectorSize
+
 type encoder struct {
 	mu sync.Mutex
-	bw *bufio.Writer
+	bw *ioutil.PageWriter
 
 	crc       hash.Hash32
 	buf       []byte
@@ -36,7 +41,7 @@ type encoder struct {
 
 func newEncoder(w io.Writer, prevCrc uint32) *encoder {
 	return &encoder{
-		bw:  bufio.NewWriter(w),
+		bw:  ioutil.NewPageWriter(w, walPageBytes),
 		crc: crc.New(prevCrc, crcTable),
 		// 1MB buffer
 		buf:       make([]byte, 1024*1024),