Browse Source

etcdserver/wal: record info at the head of WAL file

Yicheng Qin 11 years ago
parent
commit
447caf1afc

+ 99 - 0
etcdserver/etcdserverpb/etcdserver.pb.go

@@ -10,6 +10,7 @@
 
 	It has these top-level messages:
 		Request
+		Info
 */
 package etcdserverpb
 
@@ -51,6 +52,15 @@ func (m *Request) Reset()         { *m = Request{} }
 func (m *Request) String() string { return proto.CompactTextString(m) }
 func (*Request) ProtoMessage()    {}
 
+type Info struct {
+	ID               uint64 `protobuf:"varint,1,req" json:"ID"`
+	XXX_unrecognized []byte `json:"-"`
+}
+
+func (m *Info) Reset()         { *m = Info{} }
+func (m *Info) String() string { return proto.CompactTextString(m) }
+func (*Info) ProtoMessage()    {}
+
 func init() {
 }
 func (m *Request) Unmarshal(data []byte) error {
@@ -378,6 +388,63 @@ func (m *Request) Unmarshal(data []byte) error {
 	}
 	return nil
 }
+func (m *Info) Unmarshal(data []byte) error {
+	l := len(data)
+	index := 0
+	for index < l {
+		var wire uint64
+		for shift := uint(0); ; shift += 7 {
+			if index >= l {
+				return io.ErrUnexpectedEOF
+			}
+			b := data[index]
+			index++
+			wire |= (uint64(b) & 0x7F) << shift
+			if b < 0x80 {
+				break
+			}
+		}
+		fieldNum := int32(wire >> 3)
+		wireType := int(wire & 0x7)
+		switch fieldNum {
+		case 1:
+			if wireType != 0 {
+				return code_google_com_p_gogoprotobuf_proto.ErrWrongType
+			}
+			for shift := uint(0); ; shift += 7 {
+				if index >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[index]
+				index++
+				m.ID |= (uint64(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+		default:
+			var sizeOfWire int
+			for {
+				sizeOfWire++
+				wire >>= 7
+				if wire == 0 {
+					break
+				}
+			}
+			index -= sizeOfWire
+			skippy, err := code_google_com_p_gogoprotobuf_proto.Skip(data[index:])
+			if err != nil {
+				return err
+			}
+			if (index + skippy) > l {
+				return io.ErrUnexpectedEOF
+			}
+			m.XXX_unrecognized = append(m.XXX_unrecognized, data[index:index+skippy]...)
+			index += skippy
+		}
+	}
+	return nil
+}
 func (m *Request) Size() (n int) {
 	var l int
 	_ = l
@@ -408,6 +475,15 @@ func (m *Request) Size() (n int) {
 	}
 	return n
 }
+func (m *Info) Size() (n int) {
+	var l int
+	_ = l
+	n += 1 + sovEtcdserver(uint64(m.ID))
+	if m.XXX_unrecognized != nil {
+		n += len(m.XXX_unrecognized)
+	}
+	return n
+}
 
 func sovEtcdserver(x uint64) (n int) {
 	for {
@@ -533,6 +609,29 @@ func (m *Request) MarshalTo(data []byte) (n int, err error) {
 	}
 	return i, nil
 }
+func (m *Info) Marshal() (data []byte, err error) {
+	size := m.Size()
+	data = make([]byte, size)
+	n, err := m.MarshalTo(data)
+	if err != nil {
+		return nil, err
+	}
+	return data[:n], nil
+}
+
+func (m *Info) MarshalTo(data []byte) (n int, err error) {
+	var i int
+	_ = i
+	var l int
+	_ = l
+	data[i] = 0x8
+	i++
+	i = encodeVarintEtcdserver(data, i, uint64(m.ID))
+	if m.XXX_unrecognized != nil {
+		i += copy(data[i:], m.XXX_unrecognized)
+	}
+	return i, nil
+}
 func encodeFixed64Etcdserver(data []byte, offset int, v uint64) int {
 	data[offset] = uint8(v)
 	data[offset+1] = uint8(v >> 8)

+ 4 - 0
etcdserver/etcdserverpb/etcdserver.proto

@@ -25,3 +25,7 @@ message Request {
 	required int64  Time       = 15 [(gogoproto.nullable) = false];
 	required bool   Stream     = 16 [(gogoproto.nullable) = false];
 }
+
+message Info {
+	required uint64 ID   = 1 [(gogoproto.nullable) = false];
+}

+ 13 - 4
etcdserver/server.go

@@ -116,7 +116,12 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
 		} else if (cfg.ClusterState) != ClusterStateValueNew {
 			log.Fatalf("etcd: initial cluster state unset and no wal or discovery URL found")
 		}
-		if w, err = wal.Create(waldir); err != nil {
+		i := pb.Info{ID: m.ID}
+		b, err := i.Marshal()
+		if err != nil {
+			log.Fatal(err)
+		}
+		if w, err = wal.Create(waldir, b); err != nil {
 			log.Fatal(err)
 		}
 		// TODO: add context for PeerURLs
@@ -140,13 +145,17 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
 		if w, err = wal.OpenAtIndex(waldir, index); err != nil {
 			log.Fatal(err)
 		}
-		wid, st, ents, err := w.ReadAll()
+		md, st, ents, err := w.ReadAll()
 		if err != nil {
 			log.Fatal(err)
 		}
+		var info pb.Info
+		if err := info.Unmarshal(md); err != nil {
+			log.Fatal(err)
+		}
 		// TODO(xiangli): save/recovery nodeID?
-		if wid != 0 {
-			log.Fatalf("unexpected nodeid %d: nodeid should always be zero until we save nodeid into wal", wid)
+		if info.ID != m.ID {
+			log.Fatalf("unexpected nodeid %x, want %x: nodeid should always be the same until we support name/peerURLs update or dynamic configuration", info.ID, m.ID)
 		}
 		n = raft.RestartNode(m.ID, cfg.Cluster.IDs(), 10, 1, snapshot, st, ents)
 	}

+ 0 - 99
raft/raftpb/raft.pb.go

@@ -9,7 +9,6 @@
 		raft.proto
 
 	It has these top-level messages:
-		Info
 		Entry
 		Snapshot
 		Message
@@ -98,15 +97,6 @@ func (x *ConfChangeType) UnmarshalJSON(data []byte) error {
 	return nil
 }
 
-type Info struct {
-	ID               uint64 `protobuf:"varint,1,req" json:"ID"`
-	XXX_unrecognized []byte `json:"-"`
-}
-
-func (m *Info) Reset()         { *m = Info{} }
-func (m *Info) String() string { return proto.CompactTextString(m) }
-func (*Info) ProtoMessage()    {}
-
 type Entry struct {
 	Type             EntryType `protobuf:"varint,1,req,enum=raftpb.EntryType" json:"Type"`
 	Term             uint64    `protobuf:"varint,2,req" json:"Term"`
@@ -177,63 +167,6 @@ func init() {
 	proto.RegisterEnum("raftpb.EntryType", EntryType_name, EntryType_value)
 	proto.RegisterEnum("raftpb.ConfChangeType", ConfChangeType_name, ConfChangeType_value)
 }
-func (m *Info) Unmarshal(data []byte) error {
-	l := len(data)
-	index := 0
-	for index < l {
-		var wire uint64
-		for shift := uint(0); ; shift += 7 {
-			if index >= l {
-				return io.ErrUnexpectedEOF
-			}
-			b := data[index]
-			index++
-			wire |= (uint64(b) & 0x7F) << shift
-			if b < 0x80 {
-				break
-			}
-		}
-		fieldNum := int32(wire >> 3)
-		wireType := int(wire & 0x7)
-		switch fieldNum {
-		case 1:
-			if wireType != 0 {
-				return code_google_com_p_gogoprotobuf_proto.ErrWrongType
-			}
-			for shift := uint(0); ; shift += 7 {
-				if index >= l {
-					return io.ErrUnexpectedEOF
-				}
-				b := data[index]
-				index++
-				m.ID |= (uint64(b) & 0x7F) << shift
-				if b < 0x80 {
-					break
-				}
-			}
-		default:
-			var sizeOfWire int
-			for {
-				sizeOfWire++
-				wire >>= 7
-				if wire == 0 {
-					break
-				}
-			}
-			index -= sizeOfWire
-			skippy, err := code_google_com_p_gogoprotobuf_proto.Skip(data[index:])
-			if err != nil {
-				return err
-			}
-			if (index + skippy) > l {
-				return io.ErrUnexpectedEOF
-			}
-			m.XXX_unrecognized = append(m.XXX_unrecognized, data[index:index+skippy]...)
-			index += skippy
-		}
-	}
-	return nil
-}
 func (m *Entry) Unmarshal(data []byte) error {
 	l := len(data)
 	index := 0
@@ -878,15 +811,6 @@ func (m *ConfChange) Unmarshal(data []byte) error {
 	}
 	return nil
 }
-func (m *Info) Size() (n int) {
-	var l int
-	_ = l
-	n += 1 + sovRaft(uint64(m.ID))
-	if m.XXX_unrecognized != nil {
-		n += len(m.XXX_unrecognized)
-	}
-	return n
-}
 func (m *Entry) Size() (n int) {
 	var l int
 	_ = l
@@ -984,29 +908,6 @@ func sovRaft(x uint64) (n int) {
 func sozRaft(x uint64) (n int) {
 	return sovRaft(uint64((x << 1) ^ uint64((int64(x) >> 63))))
 }
-func (m *Info) Marshal() (data []byte, err error) {
-	size := m.Size()
-	data = make([]byte, size)
-	n, err := m.MarshalTo(data)
-	if err != nil {
-		return nil, err
-	}
-	return data[:n], nil
-}
-
-func (m *Info) MarshalTo(data []byte) (n int, err error) {
-	var i int
-	_ = i
-	var l int
-	_ = l
-	data[i] = 0x8
-	i++
-	i = encodeVarintRaft(data, i, uint64(m.ID))
-	if m.XXX_unrecognized != nil {
-		i += copy(data[i:], m.XXX_unrecognized)
-	}
-	return i, nil
-}
 func (m *Entry) Marshal() (data []byte, err error) {
 	size := m.Size()
 	data = make([]byte, size)

+ 0 - 4
raft/raftpb/raft.proto

@@ -8,10 +8,6 @@ option (gogoproto.unmarshaler_all) = true;
 option (gogoproto.goproto_getters_all) = false;
 option (gogoproto.goproto_enum_prefix_all) = false;
 
-message Info {
-	required uint64 ID   = 1 [(gogoproto.nullable) = false];
-}
-
 enum EntryType {
 	EntryNormal     = 0;
 	EntryConfChange = 1;

+ 2 - 10
wal/decoder.go

@@ -58,19 +58,11 @@ func (d *decoder) close() error {
 	return d.c.Close()
 }
 
-func mustUnmarshalInfo(d []byte) raftpb.Info {
-	var i raftpb.Info
-	if err := i.Unmarshal(d); err != nil {
-		// crc matched, but we cannot unmarshal the struct?!
-		// we must be the next winner of the $1B lottery.
-		panic(err)
-	}
-	return i
-}
-
 func mustUnmarshalEntry(d []byte) raftpb.Entry {
 	var e raftpb.Entry
 	if err := e.Unmarshal(d); err != nil {
+		// crc matched, but we cannot unmarshal the struct?!
+		// we must be the next winner of the $1B lottery.
 		panic(err)
 	}
 	return e

+ 5 - 0
wal/record_test.go

@@ -27,6 +27,11 @@ import (
 	"github.com/coreos/etcd/wal/walpb"
 )
 
+var (
+	infoData   = []byte("\b\xef\xfd\x02")
+	infoRecord = append([]byte("\x0e\x00\x00\x00\x00\x00\x00\x00\b\x01\x10\x99\xb5\xe4\xd0\x03\x1a\x04"), infoData...)
+)
+
 func TestReadRecord(t *testing.T) {
 	badInfoRecord := make([]byte, len(infoRecord))
 	copy(badInfoRecord, infoRecord)

+ 30 - 29
wal/wal.go

@@ -23,6 +23,7 @@ import (
 	"io"
 	"os"
 	"path"
+	"reflect"
 	"sort"
 
 	"github.com/coreos/etcd/raft"
@@ -31,7 +32,7 @@ import (
 )
 
 const (
-	infoType int64 = iota + 1
+	metadataType int64 = iota + 1
 	entryType
 	stateType
 	crcType
@@ -41,11 +42,11 @@ const (
 )
 
 var (
-	ErrIDMismatch    = errors.New("wal: unmatch id")
-	ErrFileNotFound  = errors.New("wal: file not found")
-	ErrIndexNotFound = errors.New("wal: index not found in file")
-	ErrCRCMismatch   = errors.New("wal: crc mismatch")
-	crcTable         = crc32.MakeTable(crc32.Castagnoli)
+	ErrMetadataConflict = errors.New("wal: conflicting metadata found")
+	ErrFileNotFound     = errors.New("wal: file not found")
+	ErrIndexNotFound    = errors.New("wal: index not found in file")
+	ErrCRCMismatch      = errors.New("wal: crc mismatch")
+	crcTable            = crc32.MakeTable(crc32.Castagnoli)
 )
 
 // WAL is a logical repersentation of the stable storage.
@@ -55,6 +56,7 @@ var (
 // The WAL will be ready for appending after reading out all the previous records.
 type WAL struct {
 	dir string // the living directory of the underlay files
+	md  []byte // metadata recorded at the head of each WAL
 
 	ri      uint64   // index of entry to start reading
 	decoder *decoder // decoder to decode records
@@ -65,8 +67,9 @@ type WAL struct {
 	encoder *encoder // encoder to encode records
 }
 
-// Create creates a WAL ready for appending records.
-func Create(dirpath string) (*WAL, error) {
+// Create creates a WAL ready for appending records. The given metadata is
+// recorded at the head of each WAL file, and can be retrieved with ReadAll.
+func Create(dirpath string, metadata []byte) (*WAL, error) {
 	if Exist(dirpath) {
 		return nil, os.ErrExist
 	}
@@ -82,6 +85,7 @@ func Create(dirpath string) (*WAL, error) {
 	}
 	w := &WAL{
 		dir:     dirpath,
+		md:      metadata,
 		seq:     0,
 		f:       f,
 		encoder: newEncoder(f, 0),
@@ -89,6 +93,9 @@ func Create(dirpath string) (*WAL, error) {
 	if err := w.saveCrc(0); err != nil {
 		return nil, err
 	}
+	if err := w.encoder.encode(&walpb.Record{Type: metadataType, Data: metadata}); err != nil {
+		return nil, err
+	}
 	return w, nil
 }
 
@@ -154,7 +161,7 @@ func OpenAtIndex(dirpath string, index uint64) (*WAL, error) {
 // ReadAll reads out all records of the current WAL.
 // If it cannot read out the expected entry, it will return ErrIndexNotFound.
 // After ReadAll, the WAL will be ready for appending new records.
-func (w *WAL) ReadAll() (id uint64, state raftpb.HardState, ents []raftpb.Entry, err error) {
+func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.Entry, err error) {
 	rec := &walpb.Record{}
 	decoder := w.decoder
 
@@ -168,44 +175,44 @@ func (w *WAL) ReadAll() (id uint64, state raftpb.HardState, ents []raftpb.Entry,
 			w.enti = e.Index
 		case stateType:
 			state = mustUnmarshalState(rec.Data)
-		case infoType:
-			i := mustUnmarshalInfo(rec.Data)
-			if id != 0 && id != i.ID {
+		case metadataType:
+			if metadata != nil && !reflect.DeepEqual(metadata, rec.Data) {
 				state.Reset()
-				return 0, state, nil, ErrIDMismatch
+				return nil, state, nil, ErrMetadataConflict
 			}
-			id = i.ID
+			metadata = rec.Data
 		case crcType:
 			crc := decoder.crc.Sum32()
 			// current crc of decoder must match the crc of the record.
 			// do no need to match 0 crc, since the decoder is a new one at this case.
 			if crc != 0 && rec.Validate(crc) != nil {
 				state.Reset()
-				return 0, state, nil, ErrCRCMismatch
+				return nil, state, nil, ErrCRCMismatch
 			}
 			decoder.updateCRC(rec.Crc)
 		default:
 			state.Reset()
-			return 0, state, nil, fmt.Errorf("unexpected block type %d", rec.Type)
+			return nil, state, nil, fmt.Errorf("unexpected block type %d", rec.Type)
 		}
 	}
 	if err != io.EOF {
 		state.Reset()
-		return 0, state, nil, err
+		return nil, state, nil, err
 	}
 	if w.enti < w.ri {
 		state.Reset()
-		return 0, state, nil, ErrIndexNotFound
+		return nil, state, nil, ErrIndexNotFound
 	}
 
 	// close decoder, disable reading
 	w.decoder.close()
 	w.ri = 0
 
+	w.md = metadata
 	// create encoder (chain crc with the decoder), enable appending
 	w.encoder = newEncoder(w.f, w.decoder.lastCRC())
 	w.decoder = nil
-	return id, state, ents, nil
+	return metadata, state, ents, nil
 }
 
 // Cut closes current file written and creates a new one ready to append.
@@ -224,7 +231,10 @@ func (w *WAL) Cut() error {
 	w.seq++
 	prevCrc := w.encoder.crc.Sum32()
 	w.encoder = newEncoder(w.f, prevCrc)
-	return w.saveCrc(prevCrc)
+	if err := w.saveCrc(prevCrc); err != nil {
+		return err
+	}
+	return w.encoder.encode(&walpb.Record{Type: metadataType, Data: w.md})
 }
 
 func (w *WAL) Sync() error {
@@ -243,15 +253,6 @@ func (w *WAL) Close() {
 	}
 }
 
-func (w *WAL) SaveInfo(i *raftpb.Info) error {
-	b, err := i.Marshal()
-	if err != nil {
-		panic(err)
-	}
-	rec := &walpb.Record{Type: infoType, Data: b}
-	return w.encoder.encode(rec)
-}
-
 func (w *WAL) SaveEntry(e *raftpb.Entry) error {
 	b, err := e.Marshal()
 	if err != nil {

+ 12 - 28
wal/wal_test.go

@@ -27,11 +27,6 @@ import (
 	"github.com/coreos/etcd/raft/raftpb"
 )
 
-var (
-	infoData   = []byte("\b\xef\xfd\x02")
-	infoRecord = append([]byte("\x0e\x00\x00\x00\x00\x00\x00\x00\b\x01\x10\x99\xb5\xe4\xd0\x03\x1a\x04"), infoData...)
-)
-
 func TestNew(t *testing.T) {
 	p, err := ioutil.TempDir(os.TempDir(), "waltest")
 	if err != nil {
@@ -39,7 +34,7 @@ func TestNew(t *testing.T) {
 	}
 	defer os.RemoveAll(p)
 
-	w, err := Create(p)
+	w, err := Create(p, nil)
 	if err != nil {
 		t.Fatalf("err = %v, want nil", err)
 	}
@@ -57,7 +52,7 @@ func TestNewForInitedDir(t *testing.T) {
 	defer os.RemoveAll(p)
 
 	os.Create(path.Join(p, walName(0, 0)))
-	if _, err = Create(p); err == nil || err != os.ErrExist {
+	if _, err = Create(p, nil); err == nil || err != os.ErrExist {
 		t.Errorf("err = %v, want %v", err, os.ErrExist)
 	}
 }
@@ -123,7 +118,7 @@ func TestCut(t *testing.T) {
 	}
 	defer os.RemoveAll(p)
 
-	w, err := Create(p)
+	w, err := Create(p, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -161,14 +156,10 @@ func TestRecover(t *testing.T) {
 	}
 	defer os.RemoveAll(p)
 
-	w, err := Create(p)
+	w, err := Create(p, []byte("metadata"))
 	if err != nil {
 		t.Fatal(err)
 	}
-	i := &raftpb.Info{ID: uint64(0xBAD0)}
-	if err = w.SaveInfo(i); err != nil {
-		t.Fatal(err)
-	}
 	ents := []raftpb.Entry{{Index: 0, Term: 0}, {Index: 1, Term: 1, Data: []byte{1}}, {Index: 2, Term: 2, Data: []byte{2}}}
 	for _, e := range ents {
 		if err = w.SaveEntry(&e); err != nil {
@@ -186,13 +177,13 @@ func TestRecover(t *testing.T) {
 	if w, err = OpenAtIndex(p, 0); err != nil {
 		t.Fatal(err)
 	}
-	id, state, entries, err := w.ReadAll()
+	metadata, state, entries, err := w.ReadAll()
 	if err != nil {
 		t.Fatal(err)
 	}
 
-	if id != i.ID {
-		t.Errorf("id = %d, want %d", id, i.ID)
+	if !reflect.DeepEqual(metadata, []byte("metadata")) {
+		t.Errorf("metadata = %s, want %s", metadata, "metadata")
 	}
 	if !reflect.DeepEqual(entries, ents) {
 		t.Errorf("ents = %+v, want %+v", entries, ents)
@@ -278,14 +269,10 @@ func TestRecoverAfterCut(t *testing.T) {
 	}
 	defer os.RemoveAll(p)
 
-	w, err := Create(p)
+	w, err := Create(p, []byte("metadata"))
 	if err != nil {
 		t.Fatal(err)
 	}
-	info := &raftpb.Info{ID: uint64(0xBAD1)}
-	if err = w.SaveInfo(info); err != nil {
-		t.Fatal(err)
-	}
 	// TODO(unihorn): remove this when cut can operate on an empty file
 	if err = w.SaveEntry(&raftpb.Entry{}); err != nil {
 		t.Fatal(err)
@@ -301,9 +288,6 @@ func TestRecoverAfterCut(t *testing.T) {
 		if err = w.Cut(); err != nil {
 			t.Fatal(err)
 		}
-		if err = w.SaveInfo(info); err != nil {
-			t.Fatal(err)
-		}
 	}
 	w.Close()
 
@@ -323,13 +307,13 @@ func TestRecoverAfterCut(t *testing.T) {
 			}
 			continue
 		}
-		id, _, entries, err := w.ReadAll()
+		metadata, _, entries, err := w.ReadAll()
 		if err != nil {
 			t.Errorf("#%d: err = %v, want nil", i, err)
 			continue
 		}
-		if id != info.ID {
-			t.Errorf("#%d: id = %d, want %d", i, id, info.ID)
+		if !reflect.DeepEqual(metadata, []byte("metadata")) {
+			t.Errorf("#%d: metadata = %s, want %s", i, metadata, "metadata")
 		}
 		for j, e := range entries {
 			if e.Index != uint64(j+i) {
@@ -346,7 +330,7 @@ func TestOpenAtUncommittedIndex(t *testing.T) {
 	}
 	defer os.RemoveAll(p)
 
-	w, err := Create(p)
+	w, err := Create(p, nil)
 	if err != nil {
 		t.Fatal(err)
 	}