Browse Source

wal: set PageWriter offset in file encoder

Gyu-Ho Lee 9 years ago
parent
commit
f5588526cc
4 changed files with 31 additions and 9 deletions
  1. 12 2
      wal/encoder.go
  2. 1 1
      wal/record_test.go
  3. 16 4
      wal/wal.go
  4. 2 2
      wal/wal_test.go

+ 12 - 2
wal/encoder.go

@@ -18,6 +18,7 @@ import (
 	"encoding/binary"
 	"hash"
 	"io"
+	"os"
 	"sync"
 
 	"github.com/coreos/etcd/pkg/crc"
@@ -39,9 +40,9 @@ type encoder struct {
 	uint64buf []byte
 }
 
-func newEncoder(w io.Writer, prevCrc uint32) *encoder {
+func newEncoder(w io.Writer, prevCrc uint32, pageOffset int) *encoder {
 	return &encoder{
-		bw:  ioutil.NewPageWriter(w, walPageBytes),
+		bw:  ioutil.NewPageWriter(w, walPageBytes, pageOffset),
 		crc: crc.New(prevCrc, crcTable),
 		// 1MB buffer
 		buf:       make([]byte, 1024*1024),
@@ -49,6 +50,15 @@ func newEncoder(w io.Writer, prevCrc uint32) *encoder {
 	}
 }
 
+// newFileEncoder creates a new encoder with current file offset for the page writer.
+func newFileEncoder(f *os.File, prevCrc uint32) (*encoder, error) {
+	offset, err := f.Seek(0, os.SEEK_CUR)
+	if err != nil {
+		return nil, err
+	}
+	return newEncoder(f, prevCrc, int(offset)), nil
+}
+
 func (e *encoder) encode(rec *walpb.Record) error {
 	e.mu.Lock()
 	defer e.mu.Unlock()

+ 1 - 1
wal/record_test.go

@@ -69,7 +69,7 @@ func TestWriteRecord(t *testing.T) {
 	typ := int64(0xABCD)
 	d := []byte("Hello world!")
 	buf := new(bytes.Buffer)
-	e := newEncoder(buf, 0)
+	e := newEncoder(buf, 0, 0)
 	e.encode(&walpb.Record{Type: typ, Data: d})
 	e.flush()
 	decoder := newDecoder(ioutil.NopCloser(buf))

+ 16 - 4
wal/wal.go

@@ -122,7 +122,10 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
 	w := &WAL{
 		dir:      dirpath,
 		metadata: metadata,
-		encoder:  newEncoder(f, 0),
+	}
+	w.encoder, err = newFileEncoder(f.File, 0)
+	if err != nil {
+		return nil, err
 	}
 	w.locks = append(w.locks, f)
 	if err = w.saveCrc(0); err != nil {
@@ -343,7 +346,10 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
 
 	if w.tail() != nil {
 		// create encoder (chain crc with the decoder), enable appending
-		w.encoder = newEncoder(w.tail(), w.decoder.lastCRC())
+		w.encoder, err = newFileEncoder(w.tail().File, w.decoder.lastCRC())
+		if err != nil {
+			return
+		}
 	}
 	w.decoder = nil
 
@@ -377,7 +383,10 @@ func (w *WAL) cut() error {
 	// update writer and save the previous crc
 	w.locks = append(w.locks, newTail)
 	prevCrc := w.encoder.crc.Sum32()
-	w.encoder = newEncoder(w.tail(), prevCrc)
+	w.encoder, err = newFileEncoder(w.tail().File, prevCrc)
+	if err != nil {
+		return err
+	}
 	if err = w.saveCrc(prevCrc); err != nil {
 		return err
 	}
@@ -416,7 +425,10 @@ func (w *WAL) cut() error {
 	w.locks[len(w.locks)-1] = newTail
 
 	prevCrc = w.encoder.crc.Sum32()
-	w.encoder = newEncoder(w.tail(), prevCrc)
+	w.encoder, err = newFileEncoder(w.tail().File, prevCrc)
+	if err != nil {
+		return err
+	}
 
 	plog.Infof("segmented wal file %v is created", fpath)
 	return nil

+ 2 - 2
wal/wal_test.go

@@ -61,7 +61,7 @@ func TestNew(t *testing.T) {
 	}
 
 	var wb bytes.Buffer
-	e := newEncoder(&wb, 0)
+	e := newEncoder(&wb, 0, 0)
 	err = e.encode(&walpb.Record{Type: crcType, Crc: 0})
 	if err != nil {
 		t.Fatalf("err = %v, want nil", err)
@@ -528,7 +528,7 @@ func TestSaveEmpty(t *testing.T) {
 	var buf bytes.Buffer
 	var est raftpb.HardState
 	w := WAL{
-		encoder: newEncoder(&buf, 0),
+		encoder: newEncoder(&buf, 0, 0),
 	}
 	if err := w.saveState(&est); err != nil {
 		t.Errorf("err = %v, want nil", err)