Browse Source

Merge pull request #2066 from yichengq/283

add tests and do clean in wal package
Yicheng Qin 11 years ago
parent
commit
5d99024fea
8 changed files with 149 additions and 47 deletions
  1. 3 5
      etcdserver/force_cluster.go
  2. 1 1
      test
  3. 0 4
      wal/encoder.go
  4. 4 15
      wal/util.go
  5. 79 0
      wal/util_test.go
  6. 5 5
      wal/wal.go
  7. 1 1
      wal/wal_bench_test.go
  8. 56 16
      wal/wal_test.go

+ 3 - 5
etcdserver/force_cluster.go

@@ -51,11 +51,9 @@ func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (type
 	ents = append(ents, toAppEnts...)
 	ents = append(ents, toAppEnts...)
 
 
 	// force commit newly appended entries
 	// force commit newly appended entries
-	for _, e := range toAppEnts {
-		err := w.SaveEntry(&e)
-		if err != nil {
-			log.Fatalf("etcdserver: %v", err)
-		}
+	err := w.Save(raftpb.HardState{}, toAppEnts)
+	if err != nil {
+		log.Fatalf("etcdserver: %v", err)
 	}
 	}
 	if len(ents) != 0 {
 	if len(ents) != 0 {
 		st.Commit = ents[len(ents)-1].Index
 		st.Commit = ents[len(ents)-1].Index

+ 1 - 1
test

@@ -15,7 +15,7 @@ COVER=${COVER:-"-cover"}
 source ./build
 source ./build
 
 
 # Hack: gofmt ./ will recursively check the .git directory. So use *.go for gofmt.
 # Hack: gofmt ./ will recursively check the .git directory. So use *.go for gofmt.
-TESTABLE_AND_FORMATTABLE="client discovery error etcdctl/command etcdmain etcdserver etcdserver/etcdhttp etcdserver/etcdhttp/httptypes etcdserver/etcdserverpb etcdserver/idutil integration migrate pkg/fileutil pkg/flags pkg/ioutils pkg/netutil pkg/pbutil pkg/types pkg/transport pkg/wait proxy raft rafthttp snap store wal"
+TESTABLE_AND_FORMATTABLE="client discovery error etcdctl/command etcdmain etcdserver etcdserver/etcdhttp etcdserver/etcdhttp/httptypes etcdserver/idutil integration migrate pkg/fileutil pkg/flags pkg/ioutils pkg/netutil pkg/pbutil pkg/types pkg/transport pkg/wait proxy raft rafthttp snap store wal"
 FORMATTABLE="$TESTABLE_AND_FORMATTABLE *.go etcdctl/"
 FORMATTABLE="$TESTABLE_AND_FORMATTABLE *.go etcdctl/"
 
 
 # user has not provided PKG override
 # user has not provided PKG override

+ 0 - 4
wal/encoder.go

@@ -56,10 +56,6 @@ func (e *encoder) flush() error {
 	return e.bw.Flush()
 	return e.bw.Flush()
 }
 }
 
 
-func (e *encoder) buffered() int {
-	return e.bw.Buffered()
-}
-
 func writeInt64(w io.Writer, n int64) error {
 func writeInt64(w io.Writer, n int64) error {
 	return binary.Write(w, binary.LittleEndian, n)
 	return binary.Write(w, binary.LittleEndian, n)
 }
 }

+ 4 - 15
wal/util.go

@@ -37,11 +37,11 @@ const (
 )
 )
 
 
 func DetectVersion(dirpath string) (WalVersion, error) {
 func DetectVersion(dirpath string) (WalVersion, error) {
-	if _, err := os.Stat(dirpath); os.IsNotExist(err) {
-		return WALNotExist, nil
-	}
 	names, err := fileutil.ReadDir(dirpath)
 	names, err := fileutil.ReadDir(dirpath)
 	if err != nil {
 	if err != nil {
+		if os.IsNotExist(err) {
+			err = nil
+		}
 		// Error reading the directory
 		// Error reading the directory
 		return WALNotExist, err
 		return WALNotExist, err
 	}
 	}
@@ -118,21 +118,10 @@ func checkWalNames(names []string) []string {
 }
 }
 
 
 func parseWalName(str string) (seq, index uint64, err error) {
 func parseWalName(str string) (seq, index uint64, err error) {
-	var num int
-	num, err = fmt.Sscanf(str, "%016x-%016x.wal", &seq, &index)
-	if num != 2 && err == nil {
-		err = fmt.Errorf("bad wal name: %s", str)
-	}
+	_, err = fmt.Sscanf(str, "%016x-%016x.wal", &seq, &index)
 	return
 	return
 }
 }
 
 
 func walName(seq, index uint64) string {
 func walName(seq, index uint64) string {
 	return fmt.Sprintf("%016x-%016x.wal", seq, index)
 	return fmt.Sprintf("%016x-%016x.wal", seq, index)
 }
 }
-
-func max(a, b int64) int64 {
-	if a > b {
-		return a
-	}
-	return b
-}

+ 79 - 0
wal/util_test.go

@@ -0,0 +1,79 @@
+/*
+   Copyright 2014 CoreOS, Inc.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+package wal
+
+import (
+	"io/ioutil"
+	"os"
+	"path"
+	"strings"
+	"testing"
+)
+
+func TestDetectVersion(t *testing.T) {
+	tests := []struct {
+		names []string
+		wver  WalVersion
+	}{
+		{[]string{}, WALNotExist},
+		{[]string{"snap/", "wal/", "wal/1"}, WALv0_5},
+		{[]string{"snapshot/", "conf", "log"}, WALv0_4},
+		{[]string{"weird"}, WALUnknown},
+		{[]string{"snap/", "wal/"}, WALUnknown},
+	}
+	for i, tt := range tests {
+		p := mustMakeDir(t, tt.names...)
+		ver, err := DetectVersion(p)
+		if ver != tt.wver {
+			t.Errorf("#%d: version = %s, want %s", i, ver, tt.wver)
+		}
+		if err != nil {
+			t.Errorf("#%d: err = %s, want nil", i, err)
+		}
+		os.RemoveAll(p)
+	}
+
+	// detect on non-exist directory
+	v, err := DetectVersion(path.Join(os.TempDir(), "waltest", "not-exist"))
+	if v != WALNotExist {
+		t.Errorf("#non-exist: version = %s, want %s", v, WALNotExist)
+	}
+	if err != nil {
+		t.Errorf("#non-exist: err = %s, want %s", v, WALNotExist)
+	}
+}
+
+// mustMakeDir builds the directory that contains files with the given
+// names. If the name ends with '/', it is created as a directory.
+func mustMakeDir(t *testing.T, names ...string) string {
+	p, err := ioutil.TempDir(os.TempDir(), "waltest")
+	if err != nil {
+		t.Fatal(err)
+	}
+	for _, n := range names {
+		if strings.HasSuffix(n, "/") {
+			if err := os.MkdirAll(path.Join(p, n), 0700); err != nil {
+				t.Fatal(err)
+			}
+		} else {
+			if _, err := os.Create(path.Join(p, n)); err != nil {
+				t.Fatal(err)
+			}
+		}
+	}
+	return p
+}

+ 5 - 5
wal/wal.go

@@ -307,7 +307,7 @@ func (w *WAL) Cut() error {
 	if err := w.encoder.encode(&walpb.Record{Type: metadataType, Data: w.metadata}); err != nil {
 	if err := w.encoder.encode(&walpb.Record{Type: metadataType, Data: w.metadata}); err != nil {
 		return err
 		return err
 	}
 	}
-	if err := w.SaveState(&w.state); err != nil {
+	if err := w.saveState(&w.state); err != nil {
 		return err
 		return err
 	}
 	}
 	return w.sync()
 	return w.sync()
@@ -363,7 +363,7 @@ func (w *WAL) Close() error {
 	return nil
 	return nil
 }
 }
 
 
-func (w *WAL) SaveEntry(e *raftpb.Entry) error {
+func (w *WAL) saveEntry(e *raftpb.Entry) error {
 	b := pbutil.MustMarshal(e)
 	b := pbutil.MustMarshal(e)
 	rec := &walpb.Record{Type: entryType, Data: b}
 	rec := &walpb.Record{Type: entryType, Data: b}
 	if err := w.encoder.encode(rec); err != nil {
 	if err := w.encoder.encode(rec); err != nil {
@@ -373,7 +373,7 @@ func (w *WAL) SaveEntry(e *raftpb.Entry) error {
 	return nil
 	return nil
 }
 }
 
 
-func (w *WAL) SaveState(s *raftpb.HardState) error {
+func (w *WAL) saveState(s *raftpb.HardState) error {
 	if raft.IsEmptyHardState(*s) {
 	if raft.IsEmptyHardState(*s) {
 		return nil
 		return nil
 	}
 	}
@@ -385,11 +385,11 @@ func (w *WAL) SaveState(s *raftpb.HardState) error {
 
 
 func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {
 func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {
 	// TODO(xiangli): no more reference operator
 	// TODO(xiangli): no more reference operator
-	if err := w.SaveState(&st); err != nil {
+	if err := w.saveState(&st); err != nil {
 		return err
 		return err
 	}
 	}
 	for i := range ents {
 	for i := range ents {
-		if err := w.SaveEntry(&ents[i]); err != nil {
+		if err := w.saveEntry(&ents[i]); err != nil {
 			return err
 			return err
 		}
 		}
 	}
 	}

+ 1 - 1
wal/wal_bench_test.go

@@ -40,7 +40,7 @@ func benchmarkWriteEntry(b *testing.B, size int, batch int) {
 	b.ResetTimer()
 	b.ResetTimer()
 	n := 0
 	n := 0
 	for i := 0; i < b.N; i++ {
 	for i := 0; i < b.N; i++ {
-		err := w.SaveEntry(e)
+		err := w.saveEntry(e)
 		if err != nil {
 		if err != nil {
 			b.Fatal(err)
 			b.Fatal(err)
 		}
 		}

+ 56 - 16
wal/wal_test.go

@@ -153,12 +153,9 @@ func TestCut(t *testing.T) {
 	}
 	}
 	defer w.Close()
 	defer w.Close()
 
 
-	// TODO(unihorn): remove this when cut can operate on an empty file
-	if err := w.SaveEntry(&raftpb.Entry{}); err != nil {
-		t.Fatal(err)
-	}
 	state := raftpb.HardState{Term: 1}
 	state := raftpb.HardState{Term: 1}
-	if err := w.SaveState(&state); err != nil {
+	// TODO(unihorn): remove this when cut can operate on an empty file
+	if err := w.Save(state, []raftpb.Entry{{}}); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
 	if err := w.Cut(); err != nil {
 	if err := w.Cut(); err != nil {
@@ -169,8 +166,8 @@ func TestCut(t *testing.T) {
 		t.Errorf("name = %s, want %s", g, wname)
 		t.Errorf("name = %s, want %s", g, wname)
 	}
 	}
 
 
-	e := &raftpb.Entry{Index: 1, Term: 1, Data: []byte{1}}
-	if err := w.SaveEntry(e); err != nil {
+	es := []raftpb.Entry{{Index: 1, Term: 1, Data: []byte{1}}}
+	if err := w.Save(raftpb.HardState{}, es); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
 	if err := w.Cut(); err != nil {
 	if err := w.Cut(); err != nil {
@@ -221,14 +218,12 @@ func TestRecover(t *testing.T) {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
 	ents := []raftpb.Entry{{Index: 1, Term: 1, Data: []byte{1}}, {Index: 2, Term: 2, Data: []byte{2}}}
 	ents := []raftpb.Entry{{Index: 1, Term: 1, Data: []byte{1}}, {Index: 2, Term: 2, Data: []byte{2}}}
-	for _, e := range ents {
-		if err = w.SaveEntry(&e); err != nil {
-			t.Fatal(err)
-		}
+	if err = w.Save(raftpb.HardState{}, ents); err != nil {
+		t.Fatal(err)
 	}
 	}
 	sts := []raftpb.HardState{{Term: 1, Vote: 1, Commit: 1}, {Term: 2, Vote: 2, Commit: 2}}
 	sts := []raftpb.HardState{{Term: 1, Vote: 1, Commit: 1}, {Term: 2, Vote: 2, Commit: 2}}
 	for _, s := range sts {
 	for _, s := range sts {
-		if err = w.SaveState(&s); err != nil {
+		if err = w.Save(s, nil); err != nil {
 			t.Fatal(err)
 			t.Fatal(err)
 		}
 		}
 	}
 	}
@@ -338,8 +333,8 @@ func TestRecoverAfterCut(t *testing.T) {
 		if err = w.SaveSnapshot(walpb.Snapshot{Index: uint64(i)}); err != nil {
 		if err = w.SaveSnapshot(walpb.Snapshot{Index: uint64(i)}); err != nil {
 			t.Fatal(err)
 			t.Fatal(err)
 		}
 		}
-		e := raftpb.Entry{Index: uint64(i)}
-		if err = w.SaveEntry(&e); err != nil {
+		es := []raftpb.Entry{{Index: uint64(i)}}
+		if err = w.Save(raftpb.HardState{}, es); err != nil {
 			t.Fatal(err)
 			t.Fatal(err)
 		}
 		}
 		if err = w.Cut(); err != nil {
 		if err = w.Cut(); err != nil {
@@ -395,7 +390,7 @@ func TestOpenAtUncommittedIndex(t *testing.T) {
 	if err := w.SaveSnapshot(walpb.Snapshot{}); err != nil {
 	if err := w.SaveSnapshot(walpb.Snapshot{}); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
-	if err := w.SaveEntry(&raftpb.Entry{Index: 0}); err != nil {
+	if err := w.Save(raftpb.HardState{}, []raftpb.Entry{{Index: 0}}); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
 	w.Close()
 	w.Close()
@@ -411,13 +406,58 @@ func TestOpenAtUncommittedIndex(t *testing.T) {
 	w.Close()
 	w.Close()
 }
 }
 
 
+// TestOpenNotInUse tests that OpenNotInUse can load all files that are
+// not in use at that point.
+// The tests creates WAL directory, and cut out multiple WAL files. Then
+// it releases the lock of part of data, and excepts that OpenNotInUse
+// can read out all unlocked data.
+func TestOpenNotInUse(t *testing.T) {
+	p, err := ioutil.TempDir(os.TempDir(), "waltest")
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer os.RemoveAll(p)
+	// create WAL
+	w, err := Create(p, nil)
+	defer w.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+	// make 10 seperate files
+	for i := 0; i < 10; i++ {
+		es := []raftpb.Entry{{Index: uint64(i)}}
+		if err = w.Save(raftpb.HardState{}, es); err != nil {
+			t.Fatal(err)
+		}
+		if err = w.Cut(); err != nil {
+			t.Fatal(err)
+		}
+	}
+	// release the lock to 5
+	unlockIndex := uint64(5)
+	w.ReleaseLockTo(unlockIndex)
+
+	w2, err := OpenNotInUse(p, walpb.Snapshot{})
+	defer w2.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+	_, _, ents, err := w2.ReadAll()
+	if err != nil {
+		t.Fatalf("err = %v, want nil", err)
+	}
+	if g := ents[len(ents)-1].Index; g != unlockIndex {
+		t.Errorf("last index read = %d, want %d", g, unlockIndex)
+	}
+}
+
 func TestSaveEmpty(t *testing.T) {
 func TestSaveEmpty(t *testing.T) {
 	var buf bytes.Buffer
 	var buf bytes.Buffer
 	var est raftpb.HardState
 	var est raftpb.HardState
 	w := WAL{
 	w := WAL{
 		encoder: newEncoder(&buf, 0),
 		encoder: newEncoder(&buf, 0),
 	}
 	}
-	if err := w.SaveState(&est); err != nil {
+	if err := w.saveState(&est); err != nil {
 		t.Errorf("err = %v, want nil", err)
 		t.Errorf("err = %v, want nil", err)
 	}
 	}
 	if len(buf.Bytes()) != 0 {
 	if len(buf.Bytes()) != 0 {