Browse Source

raft: make Info a protobuf type

Xiang Li 11 years ago
parent
commit
d6c3ebb1a0
6 changed files with 218 additions and 31 deletions
  1. 3 2
      etcd/participant.go
  2. 170 0
      raft/info.pb.go
  3. 12 0
      raft/info.proto
  4. 4 0
      raft/node.go
  5. 12 12
      wal/wal.go
  6. 17 17
      wal/wal_test.go

+ 3 - 2
etcd/participant.go

@@ -121,10 +121,11 @@ func newParticipant(id int64, pubAddr string, raftPubAddr string, dir string, cl
 		if w, err = wal.New(walPath); err != nil {
 			return nil, err
 		}
-		if err = w.SaveInfo(p.id); err != nil {
+		p.node.Node = raft.New(p.id, defaultHeartbeat, defaultElection)
+		info := p.node.Info()
+		if err = w.SaveInfo(&info); err != nil {
 			return nil, err
 		}
-		p.node.Node = raft.New(p.id, defaultHeartbeat, defaultElection)
 		log.Printf("id=%x participant.new path=%s\n", p.id, walPath)
 	} else {
 		n, err := w.LoadNode()

+ 170 - 0
raft/info.pb.go

@@ -0,0 +1,170 @@
+// Code generated by protoc-gen-gogo.
+// source: info.proto
+// DO NOT EDIT!
+
+/*
+Package raft is a generated protocol buffer package.
+
+It is generated from these files:
+	info.proto
+
+It has these top-level messages:
+	Info
+*/
+package raft
+
+import proto "code.google.com/p/gogoprotobuf/proto"
+import json "encoding/json"
+import math "math"
+
+// discarding unused import gogoproto "code.google.com/p/gogoprotobuf/gogoproto/gogo.pb"
+
+import io "io"
+import code_google_com_p_gogoprotobuf_proto "code.google.com/p/gogoprotobuf/proto"
+
+// Reference proto, json, and math imports to suppress error if they are not otherwise used.
+var _ = proto.Marshal
+var _ = &json.SyntaxError{}
+var _ = math.Inf
+
+type Info struct {
+	Id               int64  `protobuf:"varint,1,req,name=id" json:"id"`
+	XXX_unrecognized []byte `json:"-"`
+}
+
+func (m *Info) Reset()         { *m = Info{} }
+func (m *Info) String() string { return proto.CompactTextString(m) }
+func (*Info) ProtoMessage()    {}
+
+func init() {
+}
+func (m *Info) Unmarshal(data []byte) error {
+	l := len(data)
+	index := 0
+	for index < l {
+		var wire uint64
+		for shift := uint(0); ; shift += 7 {
+			if index >= l {
+				return io.ErrUnexpectedEOF
+			}
+			b := data[index]
+			index++
+			wire |= (uint64(b) & 0x7F) << shift
+			if b < 0x80 {
+				break
+			}
+		}
+		fieldNum := int32(wire >> 3)
+		wireType := int(wire & 0x7)
+		switch fieldNum {
+		case 1:
+			if wireType != 0 {
+				return code_google_com_p_gogoprotobuf_proto.ErrWrongType
+			}
+			for shift := uint(0); ; shift += 7 {
+				if index >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[index]
+				index++
+				m.Id |= (int64(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+		default:
+			var sizeOfWire int
+			for {
+				sizeOfWire++
+				wire >>= 7
+				if wire == 0 {
+					break
+				}
+			}
+			index -= sizeOfWire
+			skippy, err := code_google_com_p_gogoprotobuf_proto.Skip(data[index:])
+			if err != nil {
+				return err
+			}
+			if (index + skippy) > l {
+				return io.ErrUnexpectedEOF
+			}
+			m.XXX_unrecognized = append(m.XXX_unrecognized, data[index:index+skippy]...)
+			index += skippy
+		}
+	}
+	return nil
+}
+func (m *Info) Size() (n int) {
+	var l int
+	_ = l
+	n += 1 + sovInfo(uint64(m.Id))
+	if m.XXX_unrecognized != nil {
+		n += len(m.XXX_unrecognized)
+	}
+	return n
+}
+
+func sovInfo(x uint64) (n int) {
+	for {
+		n++
+		x >>= 7
+		if x == 0 {
+			break
+		}
+	}
+	return n
+}
+func sozInfo(x uint64) (n int) {
+	return sovInfo(uint64((x << 1) ^ uint64((int64(x) >> 63))))
+}
+func (m *Info) Marshal() (data []byte, err error) {
+	size := m.Size()
+	data = make([]byte, size)
+	n, err := m.MarshalTo(data)
+	if err != nil {
+		return nil, err
+	}
+	return data[:n], nil
+}
+
+func (m *Info) MarshalTo(data []byte) (n int, err error) {
+	var i int
+	_ = i
+	var l int
+	_ = l
+	data[i] = 0x8
+	i++
+	i = encodeVarintInfo(data, i, uint64(m.Id))
+	if m.XXX_unrecognized != nil {
+		i += copy(data[i:], m.XXX_unrecognized)
+	}
+	return i, nil
+}
+func encodeFixed64Info(data []byte, offset int, v uint64) int {
+	data[offset] = uint8(v)
+	data[offset+1] = uint8(v >> 8)
+	data[offset+2] = uint8(v >> 16)
+	data[offset+3] = uint8(v >> 24)
+	data[offset+4] = uint8(v >> 32)
+	data[offset+5] = uint8(v >> 40)
+	data[offset+6] = uint8(v >> 48)
+	data[offset+7] = uint8(v >> 56)
+	return offset + 8
+}
+func encodeFixed32Info(data []byte, offset int, v uint32) int {
+	data[offset] = uint8(v)
+	data[offset+1] = uint8(v >> 8)
+	data[offset+2] = uint8(v >> 16)
+	data[offset+3] = uint8(v >> 24)
+	return offset + 4
+}
+func encodeVarintInfo(data []byte, offset int, v uint64) int {
+	for v >= 1<<7 {
+		data[offset] = uint8(v&0x7f | 0x80)
+		v >>= 7
+		offset++
+	}
+	data[offset] = uint8(v)
+	return offset + 1
+}

+ 12 - 0
raft/info.proto

@@ -0,0 +1,12 @@
+package raft;
+
+import "code.google.com/p/gogoprotobuf/gogoproto/gogo.proto";
+
+option (gogoproto.marshaler_all) = true;
+option (gogoproto.sizer_all) = true;
+option (gogoproto.unmarshaler_all) = true;
+option (gogoproto.goproto_getters_all) = false;
+
+message Info {
+	required int64 id   = 1 [(gogoproto.nullable) = false];
+}

+ 4 - 0
raft/node.go

@@ -63,6 +63,10 @@ func (n *Node) Id() int64 { return n.sm.id }
 
 func (n *Node) ClusterId() int64 { return n.sm.clusterId }
 
+func (n *Node) Info() Info {
+	return Info{Id: n.Id()}
+}
+
 func (n *Node) Index() int64 { return n.sm.index.Get() }
 
 func (n *Node) Term() int64 { return n.sm.term.Get() }

+ 12 - 12
wal/wal.go

@@ -83,17 +83,16 @@ func (w *WAL) Close() {
 	}
 }
 
-func (w *WAL) SaveInfo(id int64) error {
-	log.Printf("path=%s wal.saveInfo id=%d", w.f.Name(), id)
+func (w *WAL) SaveInfo(i *raft.Info) error {
+	log.Printf("path=%s wal.saveInfo id=%d", w.f.Name(), i.Id)
 	if err := w.checkAtHead(); err != nil {
 		return err
 	}
-	w.buf.Reset()
-	err := binary.Write(w.buf, binary.LittleEndian, id)
+	b, err := i.Marshal()
 	if err != nil {
 		panic(err)
 	}
-	return writeBlock(w.bw, infoType, w.buf.Bytes())
+	return writeBlock(w.bw, infoType, b)
 }
 
 func (w *WAL) SaveEntry(e *raft.Entry) error {
@@ -147,7 +146,7 @@ func (w *WAL) LoadNode() (*Node, error) {
 	if b.t != infoType {
 		return nil, fmt.Errorf("the first block of wal is not infoType but %d", b.t)
 	}
-	id, err := loadInfo(b.d)
+	i, err := loadInfo(b.d)
 	if err != nil {
 		return nil, err
 	}
@@ -175,15 +174,16 @@ func (w *WAL) LoadNode() (*Node, error) {
 	if err != io.EOF {
 		return nil, err
 	}
-	return &Node{id, ents, state}, nil
+	return &Node{i.Id, ents, state}, nil
 }
 
-func loadInfo(d []byte) (int64, error) {
-	if len(d) != 8 {
-		return 0, fmt.Errorf("len = %d, want 8", len(d))
+func loadInfo(d []byte) (raft.Info, error) {
+	var i raft.Info
+	err := i.Unmarshal(d)
+	if err != nil {
+		panic(err)
 	}
-	buf := bytes.NewBuffer(d)
-	return readInt64(buf)
+	return i, err
 }
 
 func loadEntry(d []byte) (raft.Entry, error) {

+ 17 - 17
wal/wal_test.go

@@ -27,8 +27,8 @@ import (
 )
 
 var (
-	infoData  = []byte("\xef\xbe\x00\x00\x00\x00\x00\x00")
-	infoBlock = append([]byte("\x01\x00\x00\x00\x00\x00\x00\x00\b\x00\x00\x00\x00\x00\x00\x00"), infoData...)
+	infoData  = []byte("\b\xef\xfd\x02")
+	infoBlock = append([]byte("\x01\x00\x00\x00\x00\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00"), infoData...)
 
 	stateData  = []byte("\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00")
 	stateBlock = append([]byte("\x03\x00\x00\x00\x00\x00\x00\x00\x18\x00\x00\x00\x00\x00\x00\x00"), stateData...)
@@ -95,24 +95,24 @@ func TestSaveInfo(t *testing.T) {
 	if err != nil {
 		t.Fatal(err)
 	}
-	id := int64(0xBEEF)
-	err = w.SaveInfo(id)
+	i := &raft.Info{Id: int64(0xBEEF)}
+	err = w.SaveInfo(i)
 	if err != nil {
 		t.Fatal(err)
 	}
 
 	// make sure we can only write info at the head of the wal file
 	// still in buffer
-	err = w.SaveInfo(id)
-	if err == nil || err.Error() != "cannot write info at 24, expect 0" {
-		t.Errorf("err = %v, want cannot write info at 8, expect 0", err)
+	err = w.SaveInfo(i)
+	if err == nil || err.Error() != "cannot write info at 20, expect 0" {
+		t.Errorf("err = %v, want cannot write info at 20, expect 0", err)
 	}
 
 	// sync to disk
 	w.Sync()
-	err = w.SaveInfo(id)
-	if err == nil || err.Error() != "cannot write info at 24, expect 0" {
-		t.Errorf("err = %v, want cannot write info at 8, expect 0", err)
+	err = w.SaveInfo(i)
+	if err == nil || err.Error() != "cannot write info at 20, expect 0" {
+		t.Errorf("err = %v, want cannot write info at 20, expect 0", err)
 	}
 	w.Close()
 
@@ -158,12 +158,12 @@ func TestSaveState(t *testing.T) {
 }
 
 func TestLoadInfo(t *testing.T) {
-	id, err := loadInfo(infoData)
+	i, err := loadInfo(infoData)
 	if err != nil {
 		t.Fatal(err)
 	}
-	if id != 0xBEEF {
-		t.Errorf("id = %x, want 0xBEEF", id)
+	if i.Id != 0xBEEF {
+		t.Errorf("id = %x, want 0xBEEF", i.Id)
 	}
 }
 
@@ -195,8 +195,8 @@ func TestLoadNode(t *testing.T) {
 	if err != nil {
 		t.Fatal(err)
 	}
-	id := int64(0xBEEF)
-	if err = w.SaveInfo(id); err != nil {
+	i := &raft.Info{Id: int64(0xBEEF)}
+	if err = w.SaveInfo(i); err != nil {
 		t.Fatal(err)
 	}
 	ents := []raft.Entry{{Type: 1, Index: 1, Term: 1, Data: []byte{1}}, {Type: 2, Index: 2, Term: 2, Data: []byte{2}}}
@@ -221,8 +221,8 @@ func TestLoadNode(t *testing.T) {
 	if err != nil {
 		t.Fatal(err)
 	}
-	if n.Id != id {
-		t.Errorf("id = %d, want %d", n.Id, id)
+	if n.Id != i.Id {
+		t.Errorf("id = %d, want %d", n.Id, i.Id)
 	}
 	if !reflect.DeepEqual(n.Ents, ents) {
 		t.Errorf("ents = %+v, want %+v", n.Ents, ents)