Browse Source

raft: add Id to Entry

Blake Mizerany 11 years ago
parent
commit
225e618b8f
5 changed files with 41 additions and 28 deletions
  1. 21 0
      raft/entry.pb.go
  2. 1 0
      raft/entry.proto
  3. 4 4
      raft/example_test.go
  4. 2 2
      raft/node.go
  5. 13 22
      raft/state.pb.go

+ 21 - 0
raft/entry.pb.go

@@ -7,6 +7,7 @@
 
 	It is generated from these files:
 		entry.proto
+		state.proto
 
 	It has these top-level messages:
 		Entry
@@ -32,6 +33,7 @@ type Entry struct {
 	Term             int64  `protobuf:"varint,2,req,name=term" json:"term"`
 	Index            int64  `protobuf:"varint,3,req,name=index" json:"index"`
 	Data             []byte `protobuf:"bytes,4,opt,name=data" json:"data,omitempty"`
+	Id               int64  `protobuf:"varint,5,req,name=id" json:"id"`
 	XXX_unrecognized []byte `json:"-"`
 }
 
@@ -127,6 +129,21 @@ func (m *Entry) Unmarshal(data []byte) error {
 			}
 			m.Data = append(m.Data, data[index:postIndex]...)
 			index = postIndex
+		case 5:
+			if wireType != 0 {
+				return code_google_com_p_gogoprotobuf_proto.ErrWrongType
+			}
+			for shift := uint(0); ; shift += 7 {
+				if index >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[index]
+				index++
+				m.Id |= (int64(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
 		default:
 			var sizeOfWire int
 			for {
@@ -160,6 +177,7 @@ func (m *Entry) Size() (n int) {
 		l = len(m.Data)
 		n += 1 + l + sovEntry(uint64(l))
 	}
+	n += 1 + sovEntry(uint64(m.Id))
 	if m.XXX_unrecognized != nil {
 		n += len(m.XXX_unrecognized)
 	}
@@ -209,6 +227,9 @@ func (m *Entry) MarshalTo(data []byte) (n int, err error) {
 		i = encodeVarintEntry(data, i, uint64(len(m.Data)))
 		i += copy(data[i:], m.Data)
 	}
+	data[i] = 0x28
+	i++
+	i = encodeVarintEntry(data, i, uint64(m.Id))
 	if m.XXX_unrecognized != nil {
 		i += copy(data[i:], m.XXX_unrecognized)
 	}

+ 1 - 0
raft/entry.proto

@@ -12,4 +12,5 @@ message Entry {
 	required int64 term  = 2 [(gogoproto.nullable) = false];
 	required int64 index = 3 [(gogoproto.nullable) = false];
 	optional bytes data  = 4;
+	required int64 id = 5 [(gogoproto.nullable) = false];
 }

+ 4 - 4
raft/example_test.go

@@ -18,12 +18,12 @@ func Example_Node() {
 		// ReadState blocks until there is new state ready.
 		rd := <-n.Ready()
 		if !prev.Equal(rd.State) {
-			saveStateToDisk(st)
+			saveStateToDisk(rd.State)
 			prev = rd.State
 		}
 
-		saveToDisk(ents)
-		go applyToStore(cents)
-		sendMessages(msgs)
+		saveToDisk(rd.Entries)
+		go applyToStore(rd.CommittedEntries)
+		sendMessages(rd.Messages)
 	}
 }

+ 2 - 2
raft/node.go

@@ -110,8 +110,8 @@ func (n *Node) Tick() error {
 }
 
 // Propose proposes data be appended to the log.
-func (n *Node) Propose(ctx context.Context, data []byte) error {
-	return n.Step(ctx, []Message{{Type: msgProp, Entries: []Entry{{Data: data}}}})
+func (n *Node) Propose(ctx context.Context, id int64, data []byte) error {
+	return n.Step(ctx, []Message{{Type: msgProp, Entries: []Entry{{Id: id, Data: data}}}})
 }
 
 // Step advances the state machine using msgs. Proposals are priotized last so

+ 13 - 22
raft/state.pb.go

@@ -2,15 +2,6 @@
 // source: state.proto
 // DO NOT EDIT!
 
-/*
-	Package raft is a generated protocol buffer package.
-
-	It is generated from these files:
-		state.proto
-
-	It has these top-level messages:
-		State
-*/
 package raft
 
 import proto "code.google.com/p/gogoprotobuf/proto"
@@ -19,8 +10,8 @@ import math "math"
 
 // discarding unused import gogoproto "code.google.com/p/gogoprotobuf/gogoproto/gogo.pb"
 
-import io "io"
-import code_google_com_p_gogoprotobuf_proto "code.google.com/p/gogoprotobuf/proto"
+import io1 "io"
+import code_google_com_p_gogoprotobuf_proto1 "code.google.com/p/gogoprotobuf/proto"
 
 // Reference proto, json, and math imports to suppress error if they are not otherwise used.
 var _ = proto.Marshal
@@ -48,7 +39,7 @@ func (m *State) Unmarshal(data []byte) error {
 		var wire uint64
 		for shift := uint(0); ; shift += 7 {
 			if index >= l {
-				return io.ErrUnexpectedEOF
+				return io1.ErrUnexpectedEOF
 			}
 			b := data[index]
 			index++
@@ -62,11 +53,11 @@ func (m *State) Unmarshal(data []byte) error {
 		switch fieldNum {
 		case 1:
 			if wireType != 0 {
-				return code_google_com_p_gogoprotobuf_proto.ErrWrongType
+				return code_google_com_p_gogoprotobuf_proto1.ErrWrongType
 			}
 			for shift := uint(0); ; shift += 7 {
 				if index >= l {
-					return io.ErrUnexpectedEOF
+					return io1.ErrUnexpectedEOF
 				}
 				b := data[index]
 				index++
@@ -77,11 +68,11 @@ func (m *State) Unmarshal(data []byte) error {
 			}
 		case 2:
 			if wireType != 0 {
-				return code_google_com_p_gogoprotobuf_proto.ErrWrongType
+				return code_google_com_p_gogoprotobuf_proto1.ErrWrongType
 			}
 			for shift := uint(0); ; shift += 7 {
 				if index >= l {
-					return io.ErrUnexpectedEOF
+					return io1.ErrUnexpectedEOF
 				}
 				b := data[index]
 				index++
@@ -92,11 +83,11 @@ func (m *State) Unmarshal(data []byte) error {
 			}
 		case 3:
 			if wireType != 0 {
-				return code_google_com_p_gogoprotobuf_proto.ErrWrongType
+				return code_google_com_p_gogoprotobuf_proto1.ErrWrongType
 			}
 			for shift := uint(0); ; shift += 7 {
 				if index >= l {
-					return io.ErrUnexpectedEOF
+					return io1.ErrUnexpectedEOF
 				}
 				b := data[index]
 				index++
@@ -107,11 +98,11 @@ func (m *State) Unmarshal(data []byte) error {
 			}
 		case 4:
 			if wireType != 0 {
-				return code_google_com_p_gogoprotobuf_proto.ErrWrongType
+				return code_google_com_p_gogoprotobuf_proto1.ErrWrongType
 			}
 			for shift := uint(0); ; shift += 7 {
 				if index >= l {
-					return io.ErrUnexpectedEOF
+					return io1.ErrUnexpectedEOF
 				}
 				b := data[index]
 				index++
@@ -130,12 +121,12 @@ func (m *State) Unmarshal(data []byte) error {
 				}
 			}
 			index -= sizeOfWire
-			skippy, err := code_google_com_p_gogoprotobuf_proto.Skip(data[index:])
+			skippy, err := code_google_com_p_gogoprotobuf_proto1.Skip(data[index:])
 			if err != nil {
 				return err
 			}
 			if (index + skippy) > l {
-				return io.ErrUnexpectedEOF
+				return io1.ErrUnexpectedEOF
 			}
 			m.XXX_unrecognized = append(m.XXX_unrecognized, data[index:index+skippy]...)
 			index += skippy