Browse Source

raft: use EntryType in protobuf

Yicheng Qin 11 years ago
parent
commit
b82d70871f
8 changed files with 64 additions and 29 deletions
  1. 2 2
      etcdserver/server.go
  2. 1 1
      etcdserver/server_test.go
  3. 0 5
      raft/log.go
  4. 1 1
      raft/node.go
  5. 2 2
      raft/raft.go
  6. 8 8
      raft/raft_test.go
  7. 40 6
      raft/raftpb/raft.pb.go
  8. 10 4
      raft/raftpb/raft.proto

+ 2 - 2
etcdserver/server.go

@@ -127,13 +127,13 @@ func (s *EtcdServer) run() {
 			// race them.
 			// race them.
 			for _, e := range rd.CommittedEntries {
 			for _, e := range rd.CommittedEntries {
 				switch e.Type {
 				switch e.Type {
-				case raft.EntryNormal:
+				case raftpb.EntryNormal:
 					var r pb.Request
 					var r pb.Request
 					if err := r.Unmarshal(e.Data); err != nil {
 					if err := r.Unmarshal(e.Data); err != nil {
 						panic("TODO: this is bad, what do we do about it?")
 						panic("TODO: this is bad, what do we do about it?")
 					}
 					}
 					s.w.Trigger(r.Id, s.applyRequest(r))
 					s.w.Trigger(r.Id, s.applyRequest(r))
-				case raft.EntryConfig:
+				case raftpb.EntryConfig:
 					var c pb.Config
 					var c pb.Config
 					if err := c.Unmarshal(e.Data); err != nil {
 					if err := c.Unmarshal(e.Data); err != nil {
 						panic("TODO: this is bad, what do we do about it?")
 						panic("TODO: this is bad, what do we do about it?")

+ 1 - 1
etcdserver/server_test.go

@@ -895,7 +895,7 @@ func (n *nodeCommitterRecorder) Propose(ctx context.Context, data []byte) error
 	return n.nodeRecorder.Propose(ctx, data)
 	return n.nodeRecorder.Propose(ctx, data)
 }
 }
 func (n *nodeCommitterRecorder) Configure(ctx context.Context, data []byte) error {
 func (n *nodeCommitterRecorder) Configure(ctx context.Context, data []byte) error {
-	n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Type: raft.EntryConfig, Data: data}}}
+	n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Type: raftpb.EntryConfig, Data: data}}}
 	return n.nodeRecorder.Configure(ctx, data)
 	return n.nodeRecorder.Configure(ctx, data)
 }
 }
 func (n *nodeCommitterRecorder) Ready() <-chan raft.Ready {
 func (n *nodeCommitterRecorder) Ready() <-chan raft.Ready {

+ 0 - 5
raft/log.go

@@ -10,11 +10,6 @@ const (
 	defaultCompactThreshold = 10000
 	defaultCompactThreshold = 10000
 )
 )
 
 
-const (
-	EntryNormal int64 = iota
-	EntryConfig
-)
-
 type raftLog struct {
 type raftLog struct {
 	ents      []pb.Entry
 	ents      []pb.Entry
 	unstable  int64
 	unstable  int64

+ 1 - 1
raft/node.go

@@ -247,7 +247,7 @@ func (n *node) Propose(ctx context.Context, data []byte) error {
 }
 }
 
 
 func (n *node) Configure(ctx context.Context, data []byte) error {
 func (n *node) Configure(ctx context.Context, data []byte) error {
-	return n.Step(ctx, pb.Message{Type: msgProp, Entries: []pb.Entry{{Type: EntryConfig, Data: data}}})
+	return n.Step(ctx, pb.Message{Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfig, Data: data}}})
 }
 }
 
 
 // Step advances the state machine using msgs. The ctx.Err() will be returned,
 // Step advances the state machine using msgs. The ctx.Err() will be returned,

+ 2 - 2
raft/raft.go

@@ -313,7 +313,7 @@ func (r *raft) becomeLeader() {
 	r.lead = r.id
 	r.lead = r.id
 	r.state = StateLeader
 	r.state = StateLeader
 	for _, e := range r.raftLog.entries(r.raftLog.committed + 1) {
 	for _, e := range r.raftLog.entries(r.raftLog.committed + 1) {
-		if e.Type != EntryConfig {
+		if e.Type != pb.EntryConfig {
 			continue
 			continue
 		}
 		}
 		if r.pendingConf {
 		if r.pendingConf {
@@ -407,7 +407,7 @@ func stepLeader(r *raft, m pb.Message) {
 			panic("unexpected length(entries) of a msgProp")
 			panic("unexpected length(entries) of a msgProp")
 		}
 		}
 		e := m.Entries[0]
 		e := m.Entries[0]
-		if e.Type == EntryConfig {
+		if e.Type == pb.EntryConfig {
 			if r.pendingConf {
 			if r.pendingConf {
 				return
 				return
 			}
 			}

+ 8 - 8
raft/raft_test.go

@@ -956,7 +956,7 @@ func TestStepConfig(t *testing.T) {
 	r.becomeCandidate()
 	r.becomeCandidate()
 	r.becomeLeader()
 	r.becomeLeader()
 	index := r.raftLog.lastIndex()
 	index := r.raftLog.lastIndex()
-	r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: EntryConfig}}})
+	r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfig}}})
 	if g := r.raftLog.lastIndex(); g != index+1 {
 	if g := r.raftLog.lastIndex(); g != index+1 {
 		t.Errorf("index = %d, want %d", g, index+1)
 		t.Errorf("index = %d, want %d", g, index+1)
 	}
 	}
@@ -973,10 +973,10 @@ func TestStepIgnoreConfig(t *testing.T) {
 	r := newRaft(1, []int64{1, 2}, 0, 0)
 	r := newRaft(1, []int64{1, 2}, 0, 0)
 	r.becomeCandidate()
 	r.becomeCandidate()
 	r.becomeLeader()
 	r.becomeLeader()
-	r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: EntryConfig}}})
+	r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfig}}})
 	index := r.raftLog.lastIndex()
 	index := r.raftLog.lastIndex()
 	pendingConf := r.pendingConf
 	pendingConf := r.pendingConf
-	r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: EntryConfig}}})
+	r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfig}}})
 	if g := r.raftLog.lastIndex(); g != index {
 	if g := r.raftLog.lastIndex(); g != index {
 		t.Errorf("index = %d, want %d", g, index)
 		t.Errorf("index = %d, want %d", g, index)
 	}
 	}
@@ -989,11 +989,11 @@ func TestStepIgnoreConfig(t *testing.T) {
 // based on uncommitted entries.
 // based on uncommitted entries.
 func TestRecoverPendingConfig(t *testing.T) {
 func TestRecoverPendingConfig(t *testing.T) {
 	tests := []struct {
 	tests := []struct {
-		entType  int64
+		entType  pb.EntryType
 		wpending bool
 		wpending bool
 	}{
 	}{
-		{EntryNormal, false},
-		{EntryConfig, true},
+		{pb.EntryNormal, false},
+		{pb.EntryConfig, true},
 	}
 	}
 	for i, tt := range tests {
 	for i, tt := range tests {
 		r := newRaft(1, []int64{1, 2}, 0, 0)
 		r := newRaft(1, []int64{1, 2}, 0, 0)
@@ -1016,8 +1016,8 @@ func TestRecoverDoublePendingConfig(t *testing.T) {
 			}
 			}
 		}()
 		}()
 		r := newRaft(1, []int64{1, 2}, 0, 0)
 		r := newRaft(1, []int64{1, 2}, 0, 0)
-		r.appendEntry(pb.Entry{Type: EntryConfig})
-		r.appendEntry(pb.Entry{Type: EntryConfig})
+		r.appendEntry(pb.Entry{Type: pb.EntryConfig})
+		r.appendEntry(pb.Entry{Type: pb.EntryConfig})
 		r.becomeCandidate()
 		r.becomeCandidate()
 		r.becomeLeader()
 		r.becomeLeader()
 	}()
 	}()

+ 40 - 6
raft/raftpb/raft.pb.go

@@ -31,6 +31,39 @@ var _ = proto.Marshal
 var _ = &json.SyntaxError{}
 var _ = &json.SyntaxError{}
 var _ = math.Inf
 var _ = math.Inf
 
 
+type EntryType int32
+
+const (
+	EntryNormal EntryType = 0
+	EntryConfig EntryType = 1
+)
+
+var EntryType_name = map[int32]string{
+	0: "EntryNormal",
+	1: "EntryConfig",
+}
+var EntryType_value = map[string]int32{
+	"EntryNormal": 0,
+	"EntryConfig": 1,
+}
+
+func (x EntryType) Enum() *EntryType {
+	p := new(EntryType)
+	*p = x
+	return p
+}
+func (x EntryType) String() string {
+	return proto.EnumName(EntryType_name, int32(x))
+}
+func (x *EntryType) UnmarshalJSON(data []byte) error {
+	value, err := proto.UnmarshalJSONEnum(EntryType_value, data, "EntryType")
+	if err != nil {
+		return err
+	}
+	*x = EntryType(value)
+	return nil
+}
+
 type Info struct {
 type Info struct {
 	Id               int64  `protobuf:"varint,1,req,name=id" json:"id"`
 	Id               int64  `protobuf:"varint,1,req,name=id" json:"id"`
 	XXX_unrecognized []byte `json:"-"`
 	XXX_unrecognized []byte `json:"-"`
@@ -41,11 +74,11 @@ func (m *Info) String() string { return proto.CompactTextString(m) }
 func (*Info) ProtoMessage()    {}
 func (*Info) ProtoMessage()    {}
 
 
 type Entry struct {
 type Entry struct {
-	Type             int64  `protobuf:"varint,1,req,name=type" json:"type"`
-	Term             int64  `protobuf:"varint,2,req,name=term" json:"term"`
-	Index            int64  `protobuf:"varint,3,req,name=index" json:"index"`
-	Data             []byte `protobuf:"bytes,4,opt,name=data" json:"data"`
-	XXX_unrecognized []byte `json:"-"`
+	Type             EntryType `protobuf:"varint,1,req,enum=raftpb.EntryType" json:"Type"`
+	Term             int64     `protobuf:"varint,2,req" json:"Term"`
+	Index            int64     `protobuf:"varint,3,req" json:"Index"`
+	Data             []byte    `protobuf:"bytes,4,opt" json:"Data"`
+	XXX_unrecognized []byte    `json:"-"`
 }
 }
 
 
 func (m *Entry) Reset()         { *m = Entry{} }
 func (m *Entry) Reset()         { *m = Entry{} }
@@ -93,6 +126,7 @@ func (m *HardState) String() string { return proto.CompactTextString(m) }
 func (*HardState) ProtoMessage()    {}
 func (*HardState) ProtoMessage()    {}
 
 
 func init() {
 func init() {
+	proto.RegisterEnum("raftpb.EntryType", EntryType_name, EntryType_value)
 }
 }
 func (m *Info) Unmarshal(data []byte) error {
 func (m *Info) Unmarshal(data []byte) error {
 	l := len(data)
 	l := len(data)
@@ -180,7 +214,7 @@ func (m *Entry) Unmarshal(data []byte) error {
 				}
 				}
 				b := data[index]
 				b := data[index]
 				index++
 				index++
-				m.Type |= (int64(b) & 0x7F) << shift
+				m.Type |= (EntryType(b) & 0x7F) << shift
 				if b < 0x80 {
 				if b < 0x80 {
 					break
 					break
 				}
 				}

+ 10 - 4
raft/raftpb/raft.proto

@@ -6,16 +6,22 @@ option (gogoproto.marshaler_all) = true;
 option (gogoproto.sizer_all) = true;
 option (gogoproto.sizer_all) = true;
 option (gogoproto.unmarshaler_all) = true;
 option (gogoproto.unmarshaler_all) = true;
 option (gogoproto.goproto_getters_all) = false;
 option (gogoproto.goproto_getters_all) = false;
+option (gogoproto.goproto_enum_prefix_all) = false;
 
 
 message Info {
 message Info {
 	required int64 id   = 1 [(gogoproto.nullable) = false];
 	required int64 id   = 1 [(gogoproto.nullable) = false];
 }
 }
 
 
+enum EntryType {
+	EntryNormal = 0;
+	EntryConfig = 1;
+}
+
 message Entry {
 message Entry {
-	required int64 type  = 1 [(gogoproto.nullable) = false];
-	required int64 term  = 2 [(gogoproto.nullable) = false];
-	required int64 index = 3 [(gogoproto.nullable) = false];
-	optional bytes data  = 4 [(gogoproto.nullable) = false];
+	required EntryType Type  = 1 [(gogoproto.nullable) = false];
+	required int64     Term  = 2 [(gogoproto.nullable) = false];
+	required int64     Index = 3 [(gogoproto.nullable) = false];
+	optional bytes     Data  = 4 [(gogoproto.nullable) = false];
 }
 }
 
 
 message Snapshot {
 message Snapshot {