소스 검색

snap: handle snapshot file error robustly

Xiang Li 11 년 전
부모
커밋
f664c02a40
2개의 변경된 파일42개의 추가작업 그리고 7개의 파일을 삭제
  1. 35 5
      snap/snapshotter.go
  2. 7 2
      snap/snapshotter_test.go

+ 35 - 5
snap/snapshotter.go

@@ -10,10 +10,15 @@ import (
 	"os"
 	"path"
 	"sort"
+	"strings"
 
 	"github.com/coreos/etcd/raft"
 )
 
+const (
+	snapSuffix = ".snap"
+)
+
 var (
 	ErrNoSnapshot  = errors.New("snap: no available snapshot")
 	ErrCRCMismatch = errors.New("snap: crc mismatch")
@@ -31,7 +36,7 @@ func New(dir string) *Snapshotter {
 }
 
 func (s *Snapshotter) Save(snapshot *raft.Snapshot) error {
-	fname := fmt.Sprintf("%016x-%016x-%016x.snap", snapshot.ClusterId, snapshot.Term, snapshot.Index)
+	fname := fmt.Sprintf("%016x-%016x-%016x%s", snapshot.ClusterId, snapshot.Term, snapshot.Index, snapSuffix)
 	// TODO(xiangli): make raft.Snapshot a protobuf type
 	b, err := json.Marshal(snapshot)
 	if err != nil {
@@ -55,23 +60,28 @@ func (s *Snapshotter) Load() (*raft.Snapshot, error) {
 	var serializedSnap Snapshot
 	var b []byte
 	for _, name := range names {
-		b, err = ioutil.ReadFile(path.Join(s.dir, name))
+		fpath := path.Join(s.dir, name)
+		b, err = ioutil.ReadFile(fpath)
 		if err != nil {
 			log.Printf("Snapshotter cannot read file %v: %v", name, err)
+			renameBroken(fpath)
 			continue
 		}
 		if err = serializedSnap.Unmarshal(b); err != nil {
 			log.Printf("Corrupted snapshot file %v: %v", name, err)
+			renameBroken(fpath)
 			continue
 		}
 		crc := crc32.Update(0, crcTable, serializedSnap.Data)
 		if crc != serializedSnap.Crc {
 			log.Printf("Corrupted snapshot file %v: crc mismatch", name)
+			renameBroken(fpath)
 			err = ErrCRCMismatch
 			continue
 		}
 		if err = json.Unmarshal(serializedSnap.Data, &snap); err != nil {
 			log.Printf("Corrupted snapshot file %v: %v", name, err)
+			renameBroken(fpath)
 			continue
 		}
 		break
@@ -94,9 +104,29 @@ func (s *Snapshotter) snapNames() ([]string, error) {
 	if err != nil {
 		return nil, err
 	}
-	if len(names) == 0 {
+	snaps := checkSuffix(names)
+	if len(snaps) == 0 {
 		return nil, ErrNoSnapshot
 	}
-	sort.Sort(sort.Reverse(sort.StringSlice(names)))
-	return names, nil
+	sort.Sort(sort.Reverse(sort.StringSlice(snaps)))
+	return snaps, nil
+}
+
+func checkSuffix(names []string) []string {
+	snaps := []string{}
+	for i := range names {
+		if strings.HasSuffix(names[i], snapSuffix) {
+			snaps = append(snaps, names[i])
+		} else {
+			log.Printf("Unexpected non-snap file %v", names[i])
+		}
+	}
+	return snaps
+}
+
+func renameBroken(path string) {
+	brokenPath := path + ".broken"
+	if err := os.Rename(path, brokenPath); err != nil {
+		log.Printf("Cannot rename broken snapshot file %v to %v: %v", path, brokenPath, err)
+	}
 }

+ 7 - 2
snap/snapshotter_test.go

@@ -92,6 +92,11 @@ func TestFailback(t *testing.T) {
 	if !reflect.DeepEqual(g, testSnap) {
 		t.Errorf("snap = %#v, want %#v", g, testSnap)
 	}
+	if f, err := os.Open(path.Join(dir, large) + ".broken"); err != nil {
+		t.Fatal("broken snapshot does not exist")
+	} else {
+		f.Close()
+	}
 }
 
 func TestSnapNames(t *testing.T) {
@@ -102,7 +107,7 @@ func TestSnapNames(t *testing.T) {
 	}
 	defer os.RemoveAll(dir)
 	for i := 1; i <= 5; i++ {
-		if f, err := os.Create(path.Join(dir, fmt.Sprintf("%d", i))); err != nil {
+		if f, err := os.Create(path.Join(dir, fmt.Sprintf("%d.snap", i))); err != nil {
 			t.Fatal(err)
 		} else {
 			f.Close()
@@ -116,7 +121,7 @@ func TestSnapNames(t *testing.T) {
 	if len(names) != 5 {
 		t.Errorf("len = %d, want 10", len(names))
 	}
-	w := []string{"5", "4", "3", "2", "1"}
+	w := []string{"5.snap", "4.snap", "3.snap", "2.snap", "1.snap"}
 	if !reflect.DeepEqual(names, w) {
 		t.Errorf("names = %v, want %v", names, w)
 	}