Browse Source

wal: repair torn writes

Fixes #5230
Anthony Romano 9 years ago
parent
commit
774030e1b2
5 changed files with 195 additions and 26 deletions
  1. 61 3
      wal/decoder.go
  2. 8 1
      wal/doc.go
  3. 18 1
      wal/encoder.go
  4. 2 4
      wal/repair.go
  5. 106 17
      wal/repair_test.go

+ 61 - 3
wal/decoder.go

@@ -27,6 +27,8 @@ import (
 	"github.com/coreos/etcd/wal/walpb"
 	"github.com/coreos/etcd/wal/walpb"
 )
 )
 
 
+const minSectorSize = 512
+
 type decoder struct {
 type decoder struct {
 	mu  sync.Mutex
 	mu  sync.Mutex
 	brs []*bufio.Reader
 	brs []*bufio.Reader
@@ -73,7 +75,9 @@ func (d *decoder) decodeRecord(rec *walpb.Record) error {
 		return err
 		return err
 	}
 	}
 
 
-	data := make([]byte, l)
+	recBytes, padBytes := decodeFrameSize(l)
+
+	data := make([]byte, recBytes+padBytes)
 	if _, err = io.ReadFull(d.brs[0], data); err != nil {
 	if _, err = io.ReadFull(d.brs[0], data); err != nil {
 		// ReadFull returns io.EOF only if no bytes were read
 		// ReadFull returns io.EOF only if no bytes were read
 		// the decoder should treat this as an ErrUnexpectedEOF instead.
 		// the decoder should treat this as an ErrUnexpectedEOF instead.
@@ -82,7 +86,10 @@ func (d *decoder) decodeRecord(rec *walpb.Record) error {
 		}
 		}
 		return err
 		return err
 	}
 	}
-	if err := rec.Unmarshal(data); err != nil {
+	if err := rec.Unmarshal(data[:recBytes]); err != nil {
+		if d.isTornEntry(data) {
+			return io.ErrUnexpectedEOF
+		}
 		return err
 		return err
 	}
 	}
 
 
@@ -90,14 +97,65 @@ func (d *decoder) decodeRecord(rec *walpb.Record) error {
 	if rec.Type != crcType {
 	if rec.Type != crcType {
 		d.crc.Write(rec.Data)
 		d.crc.Write(rec.Data)
 		if err := rec.Validate(d.crc.Sum32()); err != nil {
 		if err := rec.Validate(d.crc.Sum32()); err != nil {
+			if d.isTornEntry(data) {
+				return io.ErrUnexpectedEOF
+			}
 			return err
 			return err
 		}
 		}
 	}
 	}
 	// record decoded as valid; point last valid offset to end of record
 	// record decoded as valid; point last valid offset to end of record
-	d.lastValidOff += l + 8
+	d.lastValidOff += recBytes + padBytes + 8
 	return nil
 	return nil
 }
 }
 
 
+func decodeFrameSize(lenField int64) (recBytes int64, padBytes int64) {
+	// the record size is stored in the lower 56 bits of the 64-bit length
+	recBytes = int64(uint64(lenField) & ^(uint64(0xff) << 56))
+	// non-zero padding is indicated by set MSb / a negative length
+	if lenField < 0 {
+		// padding is stored in lower 3 bits of length MSB
+		padBytes = int64((uint64(lenField) >> 56) & 0x7)
+	}
+	return
+}
+
+// isTornEntry determines whether the last entry of the WAL was partially written
+// and corrupted because of a torn write.
+func (d *decoder) isTornEntry(data []byte) bool {
+	if len(d.brs) != 1 {
+		return false
+	}
+
+	fileOff := d.lastValidOff + 8
+	curOff := 0
+	chunks := [][]byte{}
+	// split data on sector boundaries
+	for curOff < len(data) {
+		chunkLen := int(minSectorSize - (fileOff % minSectorSize))
+		if chunkLen > len(data)-curOff {
+			chunkLen = len(data) - curOff
+		}
+		chunks = append(chunks, data[curOff:curOff+chunkLen])
+		fileOff += int64(chunkLen)
+		curOff += chunkLen
+	}
+
+	// if any data for a sector chunk is all 0, it's a torn write
+	for _, sect := range chunks {
+		isZero := true
+		for _, v := range sect {
+			if v != 0 {
+				isZero = false
+				break
+			}
+		}
+		if isZero {
+			return true
+		}
+	}
+	return false
+}
+
 func (d *decoder) updateCRC(prevCrc uint32) {
 func (d *decoder) updateCRC(prevCrc uint32) {
 	d.crc = crc.New(prevCrc, crcTable)
 	d.crc = crc.New(prevCrc, crcTable)
 }
 }

+ 8 - 1
wal/doc.go

@@ -34,6 +34,13 @@ When a user has finished using a WAL it must be closed:
 
 
 	w.Close()
 	w.Close()
 
 
+Each WAL file is a stream of WAL records. A WAL record is a length field and a wal record
+protobuf. The record protobuf contains a CRC, a type, and a data payload. The length field is a
+64-bit packed structure holding the length of the remaining logical record data in its lower
+56 bits and its physical padding in the first three bits of the most significant byte. Each
+record is 8-byte aligned so that the length field is never torn. The CRC contains the CRC32
+value of all record protobufs preceding the current record.
+
 WAL files are placed inside of the directory in the following format:
 WAL files are placed inside of the directory in the following format:
 $seq-$index.wal
 $seq-$index.wal
 
 
@@ -41,7 +48,7 @@ The first WAL file to be created will be 0000000000000000-0000000000000000.wal
 indicating an initial sequence of 0 and an initial raft index of 0. The first
 indicating an initial sequence of 0 and an initial raft index of 0. The first
 entry written to WAL MUST have raft index 0.
 entry written to WAL MUST have raft index 0.
 
 
-WAL will cuts its current wal files if its size exceeds 8MB. This will increment an internal
+WAL will cut its current tail wal file if its size exceeds 64MB. This will increment an internal
 sequence number and cause a new file to be created. If the last raft index saved
 sequence number and cause a new file to be created. If the last raft index saved
 was 0x20 and this is the first time cut has been called on this WAL then the sequence will
 was 0x20 and this is the first time cut has been called on this WAL then the sequence will
 increment from 0x0 to 0x1. The new file will be: 0000000000000001-0000000000000021.wal.
 increment from 0x0 to 0x1. The new file will be: 0000000000000001-0000000000000021.wal.

+ 18 - 1
wal/encoder.go

@@ -68,13 +68,30 @@ func (e *encoder) encode(rec *walpb.Record) error {
 		}
 		}
 		data = e.buf[:n]
 		data = e.buf[:n]
 	}
 	}
-	if err = writeInt64(e.bw, int64(len(data)), e.uint64buf); err != nil {
+
+	lenField, padBytes := encodeFrameSize(len(data))
+	if err = writeInt64(e.bw, int64(lenField), e.uint64buf); err != nil {
 		return err
 		return err
 	}
 	}
+
+	if padBytes != 0 {
+		data = append(data, make([]byte, padBytes)...)
+	}
 	_, err = e.bw.Write(data)
 	_, err = e.bw.Write(data)
 	return err
 	return err
 }
 }
 
 
+func encodeFrameSize(dataBytes int) (lenField uint64, padBytes int) {
+	lenField = uint64(dataBytes)
+	// force 8 byte alignment so length never gets a torn write
+	if padBytes = 8 - (dataBytes % 8); padBytes != 8 {
+		lenField |= uint64(0x80|padBytes) << 56
+	} else {
+		padBytes = 0
+	}
+	return
+}
+
 func (e *encoder) flush() error {
 func (e *encoder) flush() error {
 	e.mu.Lock()
 	e.mu.Lock()
 	defer e.mu.Unlock()
 	defer e.mu.Unlock()

+ 2 - 4
wal/repair.go

@@ -32,15 +32,13 @@ func Repair(dirpath string) bool {
 	}
 	}
 	defer f.Close()
 	defer f.Close()
 
 
-	n := 0
 	rec := &walpb.Record{}
 	rec := &walpb.Record{}
-
 	decoder := newDecoder(f)
 	decoder := newDecoder(f)
 	for {
 	for {
+		lastOffset := decoder.lastOffset()
 		err := decoder.decode(rec)
 		err := decoder.decode(rec)
 		switch err {
 		switch err {
 		case nil:
 		case nil:
-			n += 8 + rec.Size()
 			// update crc of the decoder when necessary
 			// update crc of the decoder when necessary
 			switch rec.Type {
 			switch rec.Type {
 			case crcType:
 			case crcType:
@@ -74,7 +72,7 @@ func Repair(dirpath string) bool {
 				return false
 				return false
 			}
 			}
 
 
-			if err = f.Truncate(int64(n)); err != nil {
+			if err = f.Truncate(int64(lastOffset)); err != nil {
 				plog.Errorf("could not repair %v, failed to truncate file", f.Name())
 				plog.Errorf("could not repair %v, failed to truncate file", f.Name())
 				return false
 				return false
 			}
 			}

+ 106 - 17
wal/repair_test.go

@@ -15,6 +15,7 @@
 package wal
 package wal
 
 
 import (
 import (
+	"fmt"
 	"io"
 	"io"
 	"io/ioutil"
 	"io/ioutil"
 	"os"
 	"os"
@@ -24,7 +25,25 @@ import (
 	"github.com/coreos/etcd/wal/walpb"
 	"github.com/coreos/etcd/wal/walpb"
 )
 )
 
 
-func TestRepair(t *testing.T) {
+type corruptFunc func(string, int64) error
+
+// TestRepairTruncate ensures a truncated file can be repaired
+func TestRepairTruncate(t *testing.T) {
+	corruptf := func(p string, offset int64) error {
+		f, err := openLast(p)
+		if err != nil {
+			return err
+		}
+		if terr := f.Truncate(offset - 4); terr != nil {
+			return terr
+		}
+		return nil
+	}
+
+	testRepair(t, makeEnts(10), corruptf, 9)
+}
+
+func testRepair(t *testing.T, ents [][]raftpb.Entry, corrupt corruptFunc, expectedEnts int) {
 	p, err := ioutil.TempDir(os.TempDir(), "waltest")
 	p, err := ioutil.TempDir(os.TempDir(), "waltest")
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
@@ -37,30 +56,24 @@ func TestRepair(t *testing.T) {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
 
 
-	n := 10
-	for i := 1; i <= n; i++ {
-		es := []raftpb.Entry{{Index: uint64(i)}}
+	for _, es := range ents {
 		if err = w.Save(raftpb.HardState{}, es); err != nil {
 		if err = w.Save(raftpb.HardState{}, es); err != nil {
 			t.Fatal(err)
 			t.Fatal(err)
 		}
 		}
 	}
 	}
+
 	offset, err := w.tail().Seek(0, os.SEEK_CUR)
 	offset, err := w.tail().Seek(0, os.SEEK_CUR)
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
 	w.Close()
 	w.Close()
 
 
-	// break the wal.
-	f, err := openLast(p)
-	if err != nil {
-		t.Fatal(err)
-	}
-	err = f.Truncate(offset - 4)
+	err = corrupt(p, offset)
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
 
 
-	// verify we have broke the wal
+	// verify we broke the wal
 	w, err = Open(p, walpb.Snapshot{})
 	w, err = Open(p, walpb.Snapshot{})
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
@@ -72,20 +85,96 @@ func TestRepair(t *testing.T) {
 	w.Close()
 	w.Close()
 
 
 	// repair the wal
 	// repair the wal
-	ok := Repair(p)
-	if !ok {
+	if ok := Repair(p); !ok {
 		t.Fatalf("fix = %t, want %t", ok, true)
 		t.Fatalf("fix = %t, want %t", ok, true)
 	}
 	}
 
 
+	// read it back
 	w, err = Open(p, walpb.Snapshot{})
 	w, err = Open(p, walpb.Snapshot{})
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
-	_, _, ents, err := w.ReadAll()
+	_, _, walEnts, err := w.ReadAll()
 	if err != nil {
 	if err != nil {
-		t.Fatalf("err = %v, want %v", err, nil)
+		t.Fatal(err)
+	}
+	if len(walEnts) != expectedEnts {
+		t.Fatalf("len(ents) = %d, want %d", len(walEnts), expectedEnts)
+	}
+
+	// write some more entries to repaired log
+	for i := 1; i <= 10; i++ {
+		es := []raftpb.Entry{{Index: uint64(expectedEnts + i)}}
+		if err = w.Save(raftpb.HardState{}, es); err != nil {
+			t.Fatal(err)
+		}
+	}
+	w.Close()
+
+	// read back entries following repair, ensure it's all there
+	w, err = Open(p, walpb.Snapshot{})
+	if err != nil {
+		t.Fatal(err)
+	}
+	_, _, walEnts, err = w.ReadAll()
+	if err != nil {
+		t.Fatal(err)
+	}
+	if len(walEnts) != expectedEnts+10 {
+		t.Fatalf("len(ents) = %d, want %d", len(walEnts), expectedEnts+10)
+	}
+}
+
+func makeEnts(ents int) (ret [][]raftpb.Entry) {
+	for i := 1; i <= ents; i++ {
+		ret = append(ret, []raftpb.Entry{{Index: uint64(i)}})
+	}
+	return ret
+}
+
+// TestRepairWriteTearLast repairs the WAL in case the last record is a torn write
+// that straddled two sectors.
+func TestRepairWriteTearLast(t *testing.T) {
+	corruptf := func(p string, offset int64) error {
+		f, err := openLast(p)
+		if err != nil {
+			return err
+		}
+		// 512 bytes perfectly aligns the last record, so use 1024
+		if offset < 1024 {
+			return fmt.Errorf("got offset %d, expected >1024", offset)
+		}
+		if terr := f.Truncate(1024); terr != nil {
+			return terr
+		}
+		if terr := f.Truncate(offset); terr != nil {
+			return terr
+		}
+		return nil
+	}
+	testRepair(t, makeEnts(50), corruptf, 40)
+}
+
+// TestRepairWriteTearMiddle repairs the WAL when there is write tearing
+// in the middle of a record.
+func TestRepairWriteTearMiddle(t *testing.T) {
+	corruptf := func(p string, offset int64) error {
+		f, err := openLast(p)
+		if err != nil {
+			return err
+		}
+		// corrupt middle of 2nd record
+		_, werr := f.WriteAt(make([]byte, 512), 4096+512)
+		return werr
+	}
+	ents := makeEnts(5)
+	// 4096 bytes of data so a middle sector is easy to corrupt
+	dat := make([]byte, 4096)
+	for i := range dat {
+		dat[i] = byte(i)
 	}
 	}
-	if len(ents) != n-1 {
-		t.Fatalf("len(ents) = %d, want %d", len(ents), n-1)
+	for i := range ents {
+		ents[i][0].Data = dat
 	}
 	}
+	testRepair(t, ents, corruptf, 1)
 }
 }