Browse Source

wal: support multiple files

Yicheng Qin 11 years ago
parent
commit
442cae6844
3 changed files with 452 additions and 77 deletions
  1. 9 10
      etcd/participant.go
  2. 200 29
      wal/wal.go
  3. 243 38
      wal/wal_test.go

+ 9 - 10
etcd/participant.go

@@ -22,7 +22,6 @@ import (
 	"log"
 	"math/rand"
 	"net/http"
-	"os"
 	"path"
 	"time"
 
@@ -112,17 +111,14 @@ func newParticipant(c *conf.Config, client *v2client, peerHub *peerHub, tickDura
 	p.rh = newRaftHandler(peerHub, p.Store.Version(), p.serverStats)
 	p.peerHub.setServerStats(p.serverStats)
 
-	walPath := path.Join(p.cfg.DataDir, "wal")
-	w, err := wal.Open(walPath)
-	if err != nil {
-		if !os.IsNotExist(err) {
-			return nil, err
-		}
-
+	walPath := p.cfg.DataDir
+	var w *wal.WAL
+	var err error
+	if !wal.Exist(walPath) {
 		p.id = genId()
 		p.pubAddr = c.Addr
 		p.raftPubAddr = c.Peer.Addr
-		if w, err = wal.New(walPath); err != nil {
+		if w, err = wal.Create(walPath); err != nil {
 			return nil, err
 		}
 		p.node.Node = raft.New(p.id, defaultHeartbeat, defaultElection)
@@ -132,7 +128,7 @@ func newParticipant(c *conf.Config, client *v2client, peerHub *peerHub, tickDura
 		}
 		log.Printf("id=%x participant.new path=%s\n", p.id, walPath)
 	} else {
-		n, err := w.LoadNode()
+		n, err := wal.Read(walPath, 0)
 		if err != nil {
 			return nil, err
 		}
@@ -140,6 +136,9 @@ func newParticipant(c *conf.Config, client *v2client, peerHub *peerHub, tickDura
 		p.node.Node = raft.Recover(n.Id, n.Ents, n.State, defaultHeartbeat, defaultElection)
 		p.apply(p.node.Next())
 		log.Printf("id=%x participant.load path=%s state=\"%+v\" len(ents)=%d", p.id, walPath, n.State, len(n.Ents))
+		if w, err = wal.Open(walPath); err != nil {
+			return nil, err
+		}
 	}
 	p.w = w
 

+ 200 - 29
wal/wal.go

@@ -24,14 +24,21 @@ import (
 	"io"
 	"log"
 	"os"
+	"path"
+	"sort"
 
 	"github.com/coreos/etcd/raft"
 )
 
+const (
+	infoType int64 = iota + 1
+	entryType
+	stateType
+)
+
 var (
-	infoType  = int64(1)
-	entryType = int64(2)
-	stateType = int64(3)
+	ErrIdMismatch = fmt.Errorf("unmatch id")
+	ErrNotFound   = fmt.Errorf("wal file is not found")
 )
 
 type WAL struct {
@@ -44,29 +51,69 @@ func newWAL(f *os.File) *WAL {
 	return &WAL{f, bufio.NewWriter(f), new(bytes.Buffer)}
 }
 
-func New(path string) (*WAL, error) {
-	log.Printf("path=%s wal.new", path)
-	f, err := os.Open(path)
-	if err == nil {
-		f.Close()
+func Exist(dirpath string) bool {
+	names, err := readDir(dirpath)
+	if err != nil {
+		return false
+	}
+	return len(names) != 0
+}
+
+func Create(dirpath string) (*WAL, error) {
+	log.Printf("path=%s wal.create", dirpath)
+	if Exist(dirpath) {
 		return nil, os.ErrExist
 	}
-	f, err = os.Create(path)
+	p := path.Join(dirpath, fmt.Sprintf("%016x-%016x.wal", 0, 0))
+	f, err := os.OpenFile(p, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600)
 	if err != nil {
 		return nil, err
 	}
 	return newWAL(f), nil
 }
 
-func Open(path string) (*WAL, error) {
-	log.Printf("path=%s wal.open", path)
-	f, err := os.OpenFile(path, os.O_RDWR, 0)
+func Open(dirpath string) (*WAL, error) {
+	log.Printf("path=%s wal.append", dirpath)
+	names, err := readDir(dirpath)
+	if err != nil {
+		return nil, err
+	}
+	names = checkWalNames(names)
+	if len(names) == 0 {
+		return nil, ErrNotFound
+	}
+
+	name := names[len(names)-1]
+	p := path.Join(dirpath, name)
+	f, err := os.OpenFile(p, os.O_WRONLY|os.O_APPEND, 0)
 	if err != nil {
 		return nil, err
 	}
 	return newWAL(f), nil
 }
 
+// index should be the index of last log entry currently.
+// Cut closes current file written and creates a new one to append.
+func (w *WAL) Cut(index int64) error {
+	log.Printf("path=%s wal.cut index=%d", w.f.Name(), index)
+	fpath := w.f.Name()
+	seq, _, err := parseWalName(path.Base(fpath))
+	if err != nil {
+		panic("parse correct name error")
+	}
+	fpath = path.Join(path.Dir(fpath), fmt.Sprintf("%016x-%016x.wal", seq+1, index))
+	f, err := os.OpenFile(fpath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600)
+	if err != nil {
+		return err
+	}
+
+	w.Sync()
+	w.f.Close()
+	w.f = f
+	w.bw = bufio.NewWriter(f)
+	return nil
+}
+
 func (w *WAL) Sync() error {
 	if err := w.bw.Flush(); err != nil {
 		return err
@@ -129,52 +176,141 @@ type Node struct {
 	Id    int64
 	Ents  []raft.Entry
 	State raft.State
+
+	// index of the first entry
+	index int64
 }
 
-func (w *WAL) LoadNode() (*Node, error) {
-	log.Printf("path=%s wal.loadNode", w.f.Name())
-	if err := w.checkAtHead(); err != nil {
-		return nil, err
+func newNode(index int64) *Node {
+	return &Node{Ents: make([]raft.Entry, 0), index: index + 1}
+}
+
+func (n *Node) load(path string) error {
+	f, err := os.Open(path)
+	if err != nil {
+		return err
 	}
-	br := bufio.NewReader(w.f)
+	defer f.Close()
+	br := bufio.NewReader(f)
 	rec := &Record{}
 
-	err := readRecord(br, rec)
+	err = readRecord(br, rec)
 	if err != nil {
-		return nil, err
+		return err
 	}
 	if rec.Type != infoType {
-		return nil, fmt.Errorf("the first block of wal is not infoType but %d", rec.Type)
+		return fmt.Errorf("the first block of wal is not infoType but %d", rec.Type)
 	}
 	i, err := loadInfo(rec.Data)
 	if err != nil {
-		return nil, err
+		return err
 	}
+	if n.Id != 0 && n.Id != i.Id {
+		return ErrIdMismatch
+	}
+	n.Id = i.Id
 
-	ents := make([]raft.Entry, 0)
-	var state raft.State
 	for err = readRecord(br, rec); err == nil; err = readRecord(br, rec) {
 		switch rec.Type {
 		case entryType:
 			e, err := loadEntry(rec.Data)
 			if err != nil {
-				return nil, err
+				return err
+			}
+			if e.Index >= n.index {
+				n.Ents = append(n.Ents[:e.Index-n.index], e)
 			}
-			ents = append(ents[:e.Index-1], e)
 		case stateType:
 			s, err := loadState(rec.Data)
 			if err != nil {
-				return nil, err
+				return err
 			}
-			state = s
+			n.State = s
 		default:
-			return nil, fmt.Errorf("unexpected block type %d", rec.Type)
+			return fmt.Errorf("unexpected block type %d", rec.Type)
 		}
 	}
 	if err != io.EOF {
+		return err
+	}
+	return nil
+}
+
+func (n *Node) startFrom(index int64) error {
+	diff := int(index - n.index)
+	if diff > len(n.Ents) {
+		return ErrNotFound
+	}
+	n.Ents = n.Ents[diff:]
+	return nil
+}
+
+// Read loads all entries after index (index is not included).
+func Read(dirpath string, index int64) (*Node, error) {
+	log.Printf("path=%s wal.load index=%d", dirpath, index)
+	names, err := readDir(dirpath)
+	if err != nil {
 		return nil, err
 	}
-	return &Node{i.Id, ents, state}, nil
+	names = checkWalNames(names)
+	if len(names) == 0 {
+		return nil, ErrNotFound
+	}
+
+	sort.Sort(sort.StringSlice(names))
+	nameIndex, ok := searchIndex(names, index)
+	if !ok || !isValidSeq(names[nameIndex:]) {
+		return nil, ErrNotFound
+	}
+
+	_, initIndex, err := parseWalName(names[nameIndex])
+	if err != nil {
+		panic("parse correct name error")
+	}
+	n := newNode(initIndex)
+	for _, name := range names[nameIndex:] {
+		if err := n.load(path.Join(dirpath, name)); err != nil {
+			return nil, err
+		}
+	}
+	if err := n.startFrom(index + 1); err != nil {
+		return nil, ErrNotFound
+	}
+	return n, nil
+}
+
+// The input names should be sorted.
+// serachIndex returns the array index of the last name that has
+// a smaller raft index section than the given raft index.
+func searchIndex(names []string, index int64) (int, bool) {
+	for i := len(names) - 1; i >= 0; i-- {
+		name := names[i]
+		_, curIndex, err := parseWalName(name)
+		if err != nil {
+			panic("parse correct name error")
+		}
+		if index >= curIndex {
+			return i, true
+		}
+	}
+	return -1, false
+}
+
+// names should have been sorted based on sequence number.
+// isValidSeq checks whether seq increases continuously.
+func isValidSeq(names []string) bool {
+	var lastSeq int64
+	for _, name := range names {
+		curSeq, _, err := parseWalName(name)
+		if err != nil {
+			panic("parse correct name error")
+		}
+		if lastSeq != 0 && lastSeq != curSeq-1 {
+			return false
+		}
+		lastSeq = curSeq
+	}
+	return true
 }
 
 func loadInfo(d []byte) (raft.Info, error) {
@@ -201,6 +337,41 @@ func loadState(d []byte) (raft.State, error) {
 	return s, err
 }
 
+// readDir returns the filenames in wal directory.
+func readDir(dirpath string) ([]string, error) {
+	dir, err := os.Open(dirpath)
+	if err != nil {
+		return nil, err
+	}
+	defer dir.Close()
+	names, err := dir.Readdirnames(-1)
+	if err != nil {
+		return nil, err
+	}
+	return names, nil
+}
+
+func checkWalNames(names []string) []string {
+	wnames := make([]string, 0)
+	for _, name := range names {
+		if _, _, err := parseWalName(name); err != nil {
+			log.Printf("parse %s: %v", name, err)
+			continue
+		}
+		wnames = append(wnames, name)
+	}
+	return wnames
+}
+
+func parseWalName(str string) (seq, index int64, err error) {
+	var num int
+	num, err = fmt.Sscanf(str, "%016x-%016x.wal", &seq, &index)
+	if num != 2 && err == nil {
+		err = fmt.Errorf("bad wal name: %s", str)
+	}
+	return
+}
+
 func writeInt64(w io.Writer, n int64) error {
 	return binary.Write(w, binary.LittleEndian, n)
 }

+ 243 - 38
wal/wal_test.go

@@ -17,6 +17,7 @@ limitations under the License.
 package wal
 
 import (
+	"fmt"
 	"io/ioutil"
 	"os"
 	"path"
@@ -35,36 +36,123 @@ var (
 
 	entryData   = []byte("\b\x01\x10\x01\x18\x01\x22\x01\x01")
 	entryRecord = append([]byte("\x0f\x00\x00\x00\x00\x00\x00\x00\b\x02\x10\x00\x1a\t"), entryData...)
+
+	firstWalName = "0000000000000000-0000000000000000.wal"
 )
 
 func TestNew(t *testing.T) {
-	f, err := ioutil.TempFile(os.TempDir(), "waltest")
+	p, err := ioutil.TempDir(os.TempDir(), "waltest")
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer os.RemoveAll(p)
+
+	w, err := Create(p)
+	if err != nil {
+		t.Fatalf("err = %v, want nil", err)
+	}
+	if g := path.Base(w.f.Name()); g != firstWalName {
+		t.Errorf("name = %+v, want %+v", g, firstWalName)
+	}
+	w.Close()
+}
+
+func TestNewForInitedDir(t *testing.T) {
+	p, err := ioutil.TempDir(os.TempDir(), "waltest")
 	if err != nil {
 		t.Fatal(err)
 	}
-	p := f.Name()
-	_, err = New(p)
-	if err == nil || err != os.ErrExist {
+	defer os.RemoveAll(p)
+
+	os.Create(path.Join(p, firstWalName))
+	if _, err = Create(p); err == nil || err != os.ErrExist {
 		t.Errorf("err = %v, want %v", err, os.ErrExist)
 	}
-	err = os.Remove(p)
+}
+
+func TestAppend(t *testing.T) {
+	p, err := ioutil.TempDir(os.TempDir(), "waltest")
 	if err != nil {
 		t.Fatal(err)
 	}
-	w, err := New(p)
+	defer os.RemoveAll(p)
+
+	os.Create(path.Join(p, firstWalName))
+	w, err := Open(p)
+	if err != nil {
+		t.Fatalf("err = %v, want nil", err)
+	}
+	if g := path.Base(w.f.Name()); g != firstWalName {
+		t.Errorf("name = %+v, want %+v", g, firstWalName)
+	}
+	w.Close()
+
+	wname := fmt.Sprintf("%016x-%016x.wal", 2, 10)
+	os.Create(path.Join(p, wname))
+	w, err = Open(p)
 	if err != nil {
-		t.Errorf("err = %v, want nil", err)
+		t.Fatalf("err = %v, want nil", err)
+	}
+	if g := path.Base(w.f.Name()); g != wname {
+		t.Errorf("name = %+v, want %+v", g, wname)
 	}
 	w.Close()
-	err = os.Remove(p)
+}
+
+func TestAppendForUninitedDir(t *testing.T) {
+	p, err := ioutil.TempDir(os.TempDir(), "waltest")
 	if err != nil {
 		t.Fatal(err)
 	}
+	defer os.RemoveAll(p)
+
+	if _, err = Open(p); err != ErrNotFound {
+		t.Errorf("err = %v, want %v", err, ErrNotFound)
+	}
+}
+
+func TestCut(t *testing.T) {
+	p, err := ioutil.TempDir(os.TempDir(), "waltest")
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer os.RemoveAll(p)
+
+	w, err := Create(p)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer w.Close()
+
+	if err := w.Cut(0); err != nil {
+		t.Fatal(err)
+	}
+	wname := fmt.Sprintf("%016x-%016x.wal", 1, 0)
+	if g := path.Base(w.f.Name()); g != wname {
+		t.Errorf("name = %s, want %s", g, wname)
+	}
+
+	e := &raft.Entry{Type: 1, Index: 1, Term: 1, Data: []byte{1}}
+	if err := w.SaveEntry(e); err != nil {
+		t.Fatal(err)
+	}
+	if err := w.Cut(1); err != nil {
+		t.Fatal(err)
+	}
+	wname = fmt.Sprintf("%016x-%016x.wal", 2, 1)
+	if g := path.Base(w.f.Name()); g != wname {
+		t.Errorf("name = %s, want %s", g, wname)
+	}
 }
 
 func TestSaveEntry(t *testing.T) {
-	p := path.Join(os.TempDir(), "waltest")
-	w, err := New(p)
+	p, err := ioutil.TempDir(os.TempDir(), "waltest")
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer os.RemoveAll(p)
+
+	w, err := Create(p)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -75,23 +163,23 @@ func TestSaveEntry(t *testing.T) {
 	}
 	w.Close()
 
-	b, err := ioutil.ReadFile(p)
+	b, err := ioutil.ReadFile(path.Join(p, firstWalName))
 	if err != nil {
 		t.Fatal(err)
 	}
 	if !reflect.DeepEqual(b, entryRecord) {
 		t.Errorf("ent = %q, want %q", b, entryRecord)
 	}
+}
 
-	err = os.Remove(p)
+func TestSaveInfo(t *testing.T) {
+	p, err := ioutil.TempDir(os.TempDir(), "waltest")
 	if err != nil {
 		t.Fatal(err)
 	}
-}
+	defer os.RemoveAll(p)
 
-func TestSaveInfo(t *testing.T) {
-	p := path.Join(os.TempDir(), "waltest")
-	w, err := New(p)
+	w, err := Create(p)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -116,23 +204,23 @@ func TestSaveInfo(t *testing.T) {
 	}
 	w.Close()
 
-	b, err := ioutil.ReadFile(p)
+	b, err := ioutil.ReadFile(path.Join(p, firstWalName))
 	if err != nil {
 		t.Fatal(err)
 	}
 	if !reflect.DeepEqual(b, infoRecord) {
 		t.Errorf("ent = %q, want %q", b, infoRecord)
 	}
+}
 
-	err = os.Remove(p)
+func TestSaveState(t *testing.T) {
+	p, err := ioutil.TempDir(os.TempDir(), "waltest")
 	if err != nil {
 		t.Fatal(err)
 	}
-}
+	defer os.RemoveAll(p)
 
-func TestSaveState(t *testing.T) {
-	p := path.Join(os.TempDir(), "waltest")
-	w, err := New(p)
+	w, err := Create(p)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -143,18 +231,13 @@ func TestSaveState(t *testing.T) {
 	}
 	w.Close()
 
-	b, err := ioutil.ReadFile(p)
+	b, err := ioutil.ReadFile(path.Join(p, firstWalName))
 	if err != nil {
 		t.Fatal(err)
 	}
 	if !reflect.DeepEqual(b, stateRecord) {
 		t.Errorf("ent = %q, want %q", b, stateRecord)
 	}
-
-	err = os.Remove(p)
-	if err != nil {
-		t.Fatal(err)
-	}
 }
 
 func TestLoadInfo(t *testing.T) {
@@ -189,9 +272,14 @@ func TestLoadState(t *testing.T) {
 	}
 }
 
-func TestLoadNode(t *testing.T) {
-	p := path.Join(os.TempDir(), "waltest")
-	w, err := New(p)
+func TestNodeLoad(t *testing.T) {
+	p, err := ioutil.TempDir(os.TempDir(), "waltest")
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer os.RemoveAll(p)
+
+	w, err := Create(p)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -213,12 +301,8 @@ func TestLoadNode(t *testing.T) {
 	}
 	w.Close()
 
-	w, err = Open(p)
-	if err != nil {
-		t.Fatal(err)
-	}
-	n, err := w.LoadNode()
-	if err != nil {
+	n := newNode(0)
+	if err := n.load(path.Join(p, firstWalName)); err != nil {
 		t.Fatal(err)
 	}
 	if n.Id != i.Id {
@@ -232,9 +316,130 @@ func TestLoadNode(t *testing.T) {
 	if !reflect.DeepEqual(n.State, s) {
 		t.Errorf("state = %+v, want %+v", n.State, s)
 	}
+}
+
+func TestSearchIndex(t *testing.T) {
+	tests := []struct {
+		names []string
+		index int64
+		widx  int
+		wok   bool
+	}{
+		{
+			[]string{
+				"0000000000000000-0000000000000000.wal",
+				"0000000000000001-0000000000001000.wal",
+				"0000000000000002-0000000000002000.wal",
+			},
+			0x1000, 1, true,
+		},
+		{
+			[]string{
+				"0000000000000001-0000000000004000.wal",
+				"0000000000000002-0000000000003000.wal",
+				"0000000000000003-0000000000005000.wal",
+			},
+			0x4000, 1, true,
+		},
+		{
+			[]string{
+				"0000000000000001-0000000000002000.wal",
+				"0000000000000002-0000000000003000.wal",
+				"0000000000000003-0000000000005000.wal",
+			},
+			0x1000, -1, false,
+		},
+	}
+	for i, tt := range tests {
+		idx, ok := searchIndex(tt.names, tt.index)
+		if idx != tt.widx {
+			t.Errorf("#%d: idx = %d, want %d", i, idx, tt.widx)
+		}
+		if ok != tt.wok {
+			t.Errorf("#%d: ok = %v, want %v", i, ok, tt.wok)
+		}
+	}
+}
+
+func TestScanWalName(t *testing.T) {
+	tests := []struct {
+		str          string
+		wseq, windex int64
+		wok          bool
+	}{
+		{"0000000000000000-0000000000000000.wal", 0, 0, true},
+		{"0000000000000000.wal", 0, 0, false},
+		{"0000000000000000-0000000000000000.snap", 0, 0, false},
+	}
+	for i, tt := range tests {
+		s, index, err := parseWalName(tt.str)
+		if g := err == nil; g != tt.wok {
+			t.Errorf("#%d: ok = %v, want %v", i, g, tt.wok)
+		}
+		if s != tt.wseq {
+			t.Errorf("#%d: seq = %d, want %d", i, s, tt.wseq)
+		}
+		if index != tt.windex {
+			t.Errorf("#%d: index = %d, want %d", i, index, tt.windex)
+		}
+	}
+}
 
-	err = os.Remove(p)
+func TestRead(t *testing.T) {
+	p, err := ioutil.TempDir(os.TempDir(), "waltest")
 	if err != nil {
 		t.Fatal(err)
 	}
+	defer os.RemoveAll(p)
+
+	w, err := Create(p)
+	if err != nil {
+		t.Fatal(err)
+	}
+	info := &raft.Info{Id: int64(0xBEEF)}
+	if err = w.SaveInfo(info); err != nil {
+		t.Fatal(err)
+	}
+	if err = w.Cut(0); err != nil {
+		t.Fatal(err)
+	}
+	for i := 1; i < 10; i++ {
+		e := raft.Entry{Index: int64(i)}
+		if err = w.SaveEntry(&e); err != nil {
+			t.Fatal(err)
+		}
+		if err = w.Cut(e.Index); err != nil {
+			t.Fatal(err)
+		}
+		if err = w.SaveInfo(info); err != nil {
+			t.Fatal(err)
+		}
+	}
+	w.Close()
+
+	if err := os.Remove(path.Join(p, "0000000000000004-0000000000000003.wal")); err != nil {
+		t.Fatal(err)
+	}
+
+	for i := 0; i < 15; i++ {
+		n, err := Read(p, int64(i))
+		if i <= 3 || i >= 10 {
+			if err != ErrNotFound {
+				t.Errorf("#%d: err = %v, want %v", i, err, ErrNotFound)
+			}
+			continue
+		}
+		if err != nil {
+			t.Errorf("#%d: err = %v, want nil", i, err)
+			continue
+		}
+		if n.Id != info.Id {
+			t.Errorf("#%d: id = %d, want %d", n.Id, info.Id)
+		}
+		for j, e := range n.Ents {
+			if e.Index != int64(j+i+1) {
+				t.Errorf("#%d: ents[%d].Index = %+v, want %+v", i, j, e.Index, j+i+1)
+			}
+		}
+	}
 }