Browse Source

Merge pull request #2597 from xiang90/wal-repair

wal: fix the unexpectedEOF error in the last wal.
Xiang Li 10 years ago
parent
commit
77a04cda0c
5 changed files with 242 additions and 16 deletions
  1. 31 12
      etcdserver/storage.go
  2. 107 0
      wal/repair.go
  3. 91 0
      wal/repair_test.go
  4. 10 1
      wal/util.go
  5. 3 3
      wal/wal.go

+ 31 - 12
etcdserver/storage.go

@@ -15,6 +15,7 @@
 package etcdserver
 
 import (
+	"io"
 	"log"
 	"os"
 	"path"
@@ -52,15 +53,15 @@ func NewStorage(w *wal.WAL, s *snap.Snapshotter) Storage {
 // SaveSnap saves the snapshot to disk and release the locked
 // wal files since they will not be used.
 func (st *storage) SaveSnap(snap raftpb.Snapshot) error {
-	err := st.Snapshotter.SaveSnap(snap)
-	if err != nil {
-		return err
-	}
 	walsnap := walpb.Snapshot{
 		Index: snap.Metadata.Index,
 		Term:  snap.Metadata.Term,
 	}
-	err = st.WAL.SaveSnapshot(walsnap)
+	err := st.WAL.SaveSnapshot(walsnap)
+	if err != nil {
+		return err
+	}
+	err = st.Snapshotter.SaveSnap(snap)
 	if err != nil {
 		return err
 	}
@@ -72,13 +73,31 @@ func (st *storage) SaveSnap(snap raftpb.Snapshot) error {
 }
 
 func readWAL(waldir string, snap walpb.Snapshot) (w *wal.WAL, id, cid types.ID, st raftpb.HardState, ents []raftpb.Entry) {
-	var err error
-	if w, err = wal.Open(waldir, snap); err != nil {
-		log.Fatalf("etcdserver: open wal error: %v", err)
-	}
-	var wmetadata []byte
-	if wmetadata, st, ents, err = w.ReadAll(); err != nil {
-		log.Fatalf("etcdserver: read wal error: %v", err)
+	var (
+		err       error
+		wmetadata []byte
+	)
+
+	repaired := false
+	for {
+		if w, err = wal.Open(waldir, snap); err != nil {
+			log.Fatalf("etcdserver: open wal error: %v", err)
+		}
+		if wmetadata, st, ents, err = w.ReadAll(); err != nil {
+			w.Close()
+			// we can only repair ErrUnexpectedEOF and we never repair twice.
+			if repaired || err != io.ErrUnexpectedEOF {
+				log.Fatalf("etcdserver: read wal error (%v) and cannot be repaired", err)
+			}
+			if !wal.Repair(waldir) {
+				log.Fatalf("etcdserver: WAL error (%v) cannot be repaired", err)
+			} else {
+				log.Printf("etcdserver: repaired WAL error (%v)", err)
+				repaired = true
+			}
+			continue
+		}
+		break
 	}
 	var metadata pb.Metadata
 	pbutil.MustUnmarshal(&metadata, wmetadata)

+ 107 - 0
wal/repair.go

@@ -0,0 +1,107 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package wal
+
+import (
+	"io"
+	"log"
+	"os"
+	"path"
+
+	"github.com/coreos/etcd/pkg/fileutil"
+	"github.com/coreos/etcd/wal/walpb"
+)
+
+// Repair tries to repair the unexpectedEOF error in the
+// last wal file by truncating.
+func Repair(dirpath string) bool {
+	f, err := openLast(dirpath)
+	if err != nil {
+		return false
+	}
+	defer f.Close()
+
+	n := 0
+	rec := &walpb.Record{}
+
+	decoder := newDecoder(f)
+	defer decoder.close()
+	for {
+		err := decoder.decode(rec)
+		switch err {
+		case nil:
+			n += 8 + rec.Size()
+			// update crc of the decoder when necessary
+			switch rec.Type {
+			case crcType:
+				crc := decoder.crc.Sum32()
+				// current crc of decoder must match the crc of the record.
+				// do no need to match 0 crc, since the decoder is a new one at this case.
+				if crc != 0 && rec.Validate(crc) != nil {
+					return false
+				}
+				decoder.updateCRC(rec.Crc)
+			}
+			continue
+		case io.EOF:
+			return true
+		case io.ErrUnexpectedEOF:
+			log.Printf("wal: repairing %v", f.Name())
+			bf, bferr := os.Create(f.Name() + ".broken")
+			if bferr != nil {
+				log.Printf("wal: could not repair %v, failed to create backup file", f.Name())
+				return false
+			}
+			defer bf.Close()
+
+			if _, err = f.Seek(0, os.SEEK_SET); err != nil {
+				log.Printf("wal: could not repair %v, failed to read file", f.Name())
+				return false
+			}
+
+			if _, err = io.Copy(bf, f); err != nil {
+				log.Printf("wal: could not repair %v, failed to copy file", f.Name())
+				return false
+			}
+
+			if err = f.Truncate(int64(n)); err != nil {
+				log.Printf("wal: could not repair %v, failed to truncate file", f.Name())
+				return false
+			}
+			if err = f.Sync(); err != nil {
+				log.Printf("wal: could not repair %v, failed to sync file", f.Name())
+				return false
+			}
+			return true
+		default:
+			log.Printf("wal: could not repair error (%v)", err)
+			return false
+		}
+	}
+}
+
+// openLast opens the last wal file for read and write.
+func openLast(dirpath string) (*os.File, error) {
+	names, err := fileutil.ReadDir(dirpath)
+	if err != nil {
+		return nil, err
+	}
+	names = checkWalNames(names)
+	if len(names) == 0 {
+		return nil, ErrFileNotFound
+	}
+	last := path.Join(dirpath, names[len(names)-1])
+	return os.OpenFile(last, os.O_RDWR, 0)
+}

+ 91 - 0
wal/repair_test.go

@@ -0,0 +1,91 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package wal
+
+import (
+	"io"
+	"io/ioutil"
+	"os"
+	"testing"
+
+	"github.com/coreos/etcd/raft/raftpb"
+	"github.com/coreos/etcd/wal/walpb"
+)
+
+func TestRepair(t *testing.T) {
+	p, err := ioutil.TempDir(os.TempDir(), "waltest")
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer os.RemoveAll(p)
+	// create WAL
+	w, err := Create(p, nil)
+	defer w.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	n := 10
+	for i := 1; i <= n; i++ {
+		es := []raftpb.Entry{{Index: uint64(i)}}
+		if err = w.Save(raftpb.HardState{}, es); err != nil {
+			t.Fatal(err)
+		}
+	}
+	w.Close()
+
+	// break the wal.
+	f, err := openLast(p)
+	if err != nil {
+		t.Fatal(err)
+	}
+	offset, err := f.Seek(-4, os.SEEK_END)
+	if err != nil {
+		t.Fatal(err)
+	}
+	err = f.Truncate(offset)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// verify we have broke the wal
+	w, err = Open(p, walpb.Snapshot{})
+	if err != nil {
+		t.Fatal(err)
+	}
+	_, _, _, err = w.ReadAll()
+	if err != io.ErrUnexpectedEOF {
+		t.Fatalf("err = %v, want %v", err, io.ErrUnexpectedEOF)
+	}
+	w.Close()
+
+	// repair the wal
+	ok := Repair(p)
+	if !ok {
+		t.Fatalf("fix = %t, want %t", ok, true)
+	}
+
+	w, err = Open(p, walpb.Snapshot{})
+	if err != nil {
+		t.Fatal(err)
+	}
+	_, _, ents, err := w.ReadAll()
+	if err != nil {
+		t.Fatalf("err = %v, want %v", err, nil)
+	}
+	if len(ents) != n-1 {
+		t.Fatalf("len(ents) = %d, want %d", len(ents), n-1)
+	}
+}

+ 10 - 1
wal/util.go

@@ -15,12 +15,18 @@
 package wal
 
 import (
+	"errors"
 	"fmt"
 	"log"
+	"strings"
 
 	"github.com/coreos/etcd/pkg/fileutil"
 )
 
+var (
+	badWalName = errors.New("bad wal name")
+)
+
 func Exist(dirpath string) bool {
 	names, err := fileutil.ReadDir(dirpath)
 	if err != nil {
@@ -76,8 +82,11 @@ func checkWalNames(names []string) []string {
 }
 
 func parseWalName(str string) (seq, index uint64, err error) {
+	if !strings.HasSuffix(str, ".wal") {
+		return 0, 0, badWalName
+	}
 	_, err = fmt.Sscanf(str, "%016x-%016x.wal", &seq, &index)
-	return
+	return seq, index, err
 }
 
 func walName(seq, index uint64) string {

+ 3 - 3
wal/wal.go

@@ -414,14 +414,14 @@ func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {
 	}
 
 	// TODO(xiangli): no more reference operator
-	if err := w.saveState(&st); err != nil {
-		return err
-	}
 	for i := range ents {
 		if err := w.saveEntry(&ents[i]); err != nil {
 			return err
 		}
 	}
+	if err := w.saveState(&st); err != nil {
+		return err
+	}
 
 	fstat, err := w.f.Stat()
 	if err != nil {