Browse Source

wal: add Read

Yicheng Qin 11 years ago
parent
commit
363e952551
2 changed files with 274 additions and 5 deletions
  1. 142 5
      wal/wal.go
  2. 132 0
      wal/wal_test.go

+ 142 - 5
wal/wal.go

@@ -2,9 +2,11 @@ package wal
 
 import (
 	"bufio"
+	"bytes"
 	"encoding/binary"
 	"encoding/json"
 	"fmt"
+	"io"
 	"os"
 
 	"github.com/coreos/etcd/raft"
@@ -36,6 +38,15 @@ func New(path string) (*WAL, error) {
 	return &WAL{f, bw}, nil
 }
 
+func Open(path string) (*WAL, error) {
+	f, err := os.Open(path)
+	if err != nil {
+		return nil, err
+	}
+	bw := bufio.NewWriter(f)
+	return &WAL{f, bw}, nil
+}
+
 func (w *WAL) Close() {
 	if w.f != nil {
 		w.flush()
@@ -46,13 +57,9 @@ func (w *WAL) Close() {
 func (w *WAL) writeInfo(id int64) error {
 	// | 8 bytes | 8 bytes |  8 bytes |
 	// | type    |   len   |   nodeid |
-	o, err := w.f.Seek(0, os.SEEK_CUR)
-	if err != nil {
+	if err := w.checkAtHead(); err != nil {
 		return err
 	}
-	if o != 0 || w.bw.Buffered() != 0 {
-		return fmt.Errorf("cannot write info at %d, expect 0", max(o, int64(w.bw.Buffered())))
-	}
 	if err := w.writeInt64(infoType); err != nil {
 		return err
 	}
@@ -102,6 +109,136 @@ func (w *WAL) flush() error {
 	return w.bw.Flush()
 }
 
+func (w *WAL) checkAtHead() error {
+	o, err := w.f.Seek(0, os.SEEK_CUR)
+	if err != nil {
+		return err
+	}
+	if o != 0 || w.bw.Buffered() != 0 {
+		return fmt.Errorf("cannot write info at %d, expect 0", max(o, int64(w.bw.Buffered())))
+	}
+	return nil
+}
+
+type Node struct {
+	Id    int64
+	Ents  []raft.Entry
+	State raft.State
+}
+
+func (w *WAL) ReadNode() (*Node, error) {
+	if err := w.checkAtHead(); err != nil {
+		return nil, err
+	}
+	br := bufio.NewReader(w.f)
+	n := new(Node)
+
+	b, err := readBlock(br)
+	if err != nil {
+		return nil, err
+	}
+	switch b.t {
+	case infoType:
+		id, err := parseInfo(b.d)
+		if err != nil {
+			return nil, err
+		}
+		n.Id = id
+	default:
+		return nil, fmt.Errorf("type = %d, want %d", b.t, infoType)
+	}
+
+	ents := make([]raft.Entry, 0)
+	var state raft.State
+	for {
+		b, err := readBlock(br)
+		if err == io.EOF {
+			break
+		}
+		if err != nil {
+			return nil, err
+		}
+		switch b.t {
+		case entryType:
+			e, err := parseEntry(b.d)
+			if err != nil {
+				return nil, err
+			}
+			ents = append(ents, e)
+		case stateType:
+			s, err := parseState(b.d)
+			if err != nil {
+				return nil, err
+			}
+			state = s
+		default:
+			return nil, fmt.Errorf("cannot handle type %d", b.t)
+		}
+	}
+	n.Ents = ents
+	n.State = state
+	return n, nil
+}
+
+func parseInfo(d []byte) (int64, error) {
+	if len(d) != 8 {
+		return 0, fmt.Errorf("len = %d, want 8", len(d))
+	}
+	buf := bytes.NewBuffer(d)
+	return readInt64(buf)
+}
+
+func parseEntry(d []byte) (raft.Entry, error) {
+	var e raft.Entry
+	err := json.Unmarshal(d, &e)
+	return e, err
+}
+
+func parseState(d []byte) (raft.State, error) {
+	var s raft.State
+	buf := bytes.NewBuffer(d)
+	err := binary.Read(buf, binary.LittleEndian, &s)
+	return s, err
+}
+
+type block struct {
+	t int64
+	l int64
+	d []byte
+}
+
+func readBlock(r io.Reader) (*block, error) {
+	typ, err := readInt64(r)
+	if err != nil {
+		return nil, err
+	}
+	l, err := readInt64(r)
+	if err != nil {
+		if err == io.EOF {
+			err = io.ErrUnexpectedEOF
+		}
+		return nil, err
+	}
+	data := make([]byte, l)
+	n, err := r.Read(data)
+	if err != nil {
+		if err == io.EOF {
+			err = io.ErrUnexpectedEOF
+		}
+		return nil, err
+	}
+	if n != int(l) {
+		return nil, fmt.Errorf("len(data) = %d, want %d", n, l)
+	}
+	return &block{typ, l, data}, nil
+}
+
+func readInt64(r io.Reader) (int64, error) {
+	var n int64
+	err := binary.Read(r, binary.LittleEndian, &n)
+	return n, err
+}
+
 func max(a, b int64) int64 {
 	if a > b {
 		return a

+ 132 - 0
wal/wal_test.go

@@ -1,6 +1,8 @@
 package wal
 
 import (
+	"bytes"
+	"io"
 	"io/ioutil"
 	"os"
 	"path"
@@ -132,3 +134,133 @@ func TestWriteState(t *testing.T) {
 		t.Fatal(err)
 	}
 }
+
+func TestParseInfo(t *testing.T) {
+	data := []byte("\xef\xbe\x00\x00\x00\x00\x00\x00")
+	id, err := parseInfo(data)
+	if err != nil {
+		t.Fatal(err)
+	}
+	if id != 0xBEEF {
+		t.Errorf("id = %x, want 0xBEEF", id)
+	}
+}
+
+func TestParseEntry(t *testing.T) {
+	data := []byte("{\"Type\":1,\"Term\":1,\"Data\":\"AQ==\"}")
+	e, err := parseEntry(data)
+	if err != nil {
+		t.Fatal(err)
+	}
+	we := raft.Entry{1, 1, []byte{1}}
+	if !reflect.DeepEqual(e, we) {
+		t.Errorf("ent = %v, want %v", e, we)
+	}
+}
+
+func TestParseState(t *testing.T) {
+	data := []byte("\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00")
+	s, err := parseState(data)
+	if err != nil {
+		t.Fatal(err)
+	}
+	ws := raft.State{1, 1, 1}
+	if !reflect.DeepEqual(s, ws) {
+		t.Errorf("state = %v, want %v", s, ws)
+	}
+}
+
+func TestReadBlock(t *testing.T) {
+	tests := []struct {
+		data []byte
+		wb   *block
+		we   error
+	}{
+		{
+			[]byte("\x01\x00\x00\x00\x00\x00\x00\x00\b\x00\x00\x00\x00\x00\x00\x00\xef\xbe\x00\x00\x00\x00\x00\x00"),
+			&block{1, 8, []byte("\xef\xbe\x00\x00\x00\x00\x00\x00")},
+			nil,
+		},
+		{
+			[]byte(""),
+			nil,
+			io.EOF,
+		},
+		{
+			[]byte("\x01\x00\x00\x00"),
+			nil,
+			io.ErrUnexpectedEOF,
+		},
+		{
+			[]byte("\x01\x00\x00\x00\x00\x00\x00\x00"),
+			nil,
+			io.ErrUnexpectedEOF,
+		},
+		{
+			[]byte("\x01\x00\x00\x00\x00\x00\x00\x00\b\x00\x00\x00\x00\x00\x00\x00"),
+			nil,
+			io.ErrUnexpectedEOF,
+		},
+	}
+
+	for i, tt := range tests {
+		buf := bytes.NewBuffer(tt.data)
+		b, e := readBlock(buf)
+		if !reflect.DeepEqual(b, tt.wb) {
+			t.Errorf("#%d: block = %v, want %v", i, b, tt.wb)
+		}
+		if !reflect.DeepEqual(e, tt.we) {
+			t.Errorf("#%d: err = %v, want %v", i, e, tt.we)
+		}
+	}
+}
+
+func TestReadNode(t *testing.T) {
+	p := path.Join(os.TempDir(), "waltest")
+	w, err := New(p)
+	if err != nil {
+		t.Fatal(err)
+	}
+	id := int64(0xBEEF)
+	if err = w.writeInfo(id); err != nil {
+		t.Fatal(err)
+	}
+	ents := []raft.Entry{{1, 1, []byte{1}}, {2, 2, []byte{2}}}
+	for _, e := range ents {
+		if err = w.writeEntry(&e); err != nil {
+			t.Fatal(err)
+		}
+	}
+	sts := []raft.State{{1, 1, 1}, {2, 2, 2}}
+	for _, s := range sts {
+		if err = w.writeState(&s); err != nil {
+			t.Fatal(err)
+		}
+	}
+	w.Close()
+
+	w, err = Open(p)
+	if err != nil {
+		t.Fatal(err)
+	}
+	n, err := w.ReadNode()
+	if err != nil {
+		t.Fatal(err)
+	}
+	if n.Id != id {
+		t.Errorf("id = %d, want %d", n.Id, id)
+	}
+	if !reflect.DeepEqual(n.Ents, ents) {
+		t.Errorf("ents = %+v, want %+v", n.Ents, ents)
+	}
+	// only the latest state is recorded
+	s := sts[len(sts)-1]
+	if !reflect.DeepEqual(n.State, s) {
+		t.Errorf("state = %+v, want %+v", n.State, s)
+	}
+
+	err = os.Remove(p)
+	if err != nil {
+		t.Fatal(err)
+	}
+}