瀏覽代碼

raft: add confAdd and confRemove entry type

Xiang Li 11 年之前
父節點
當前提交
1a75beb57c
共有 5 個文件被更改,包括 32 次插入33 次删除
  1. 1 1
      raft/cluster_test.go
  2. 3 1
      raft/log.go
  3. 22 25
      raft/node.go
  4. 2 2
      raft/raft.go
  5. 4 4
      raft/raft_test.go

+ 1 - 1
raft/cluster_test.go

@@ -94,7 +94,7 @@ func buildCluster(size int) (nt *network, nodes []*Node) {
 
 	nodes[0].StartCluster()
 	for i := 1; i < size; i++ {
-		nt.send(nodes[0].newConfMessage(&ConfigCmd{Type: "add", Addr: i}))
+		nt.send(nodes[0].newConfMessage(configAdd, &Config{NodeId: i}))
 		nodes[i].Start()
 		for j := 0; j < i; j++ {
 			nodes[j].Next()

+ 3 - 1
raft/log.go

@@ -2,7 +2,9 @@ package raft
 
 const (
 	normal int = iota
-	config
+
+	configAdd
+	configRemove
 )
 
 type Entry struct {

+ 22 - 25
raft/node.go

@@ -2,6 +2,7 @@ package raft
 
 import (
 	"encoding/json"
+	golog "log"
 )
 
 type Interface interface {
@@ -11,9 +12,10 @@ type Interface interface {
 
 type tick int
 
-type ConfigCmd struct {
-	Type string
-	Addr int
+type Config struct {
+	NodeId    int
+	ClusterId int
+	Address   string
 }
 
 type Node struct {
@@ -54,7 +56,7 @@ func (n *Node) StartCluster() {
 	}
 	n.sm = newStateMachine(n.addr, []int{n.addr})
 	n.Step(Message{Type: msgHup})
-	n.Step(n.newConfMessage(&ConfigCmd{Type: "add", Addr: n.addr}))
+	n.Step(n.newConfMessage(configAdd, &Config{NodeId: n.addr}))
 	n.Next()
 }
 
@@ -66,11 +68,11 @@ func (n *Node) Start() {
 }
 
 func (n *Node) Add(addr int) {
-	n.Step(n.newConfMessage(&ConfigCmd{Type: "add", Addr: addr}))
+	n.Step(n.newConfMessage(configAdd, &Config{NodeId: addr}))
 }
 
 func (n *Node) Remove(addr int) {
-	n.Step(n.newConfMessage(&ConfigCmd{Type: "remove", Addr: addr}))
+	n.Step(n.newConfMessage(configRemove, &Config{NodeId: addr}))
 }
 
 func (n *Node) Msgs() []Message {
@@ -104,14 +106,20 @@ func (n *Node) Next() []Entry {
 		case normal:
 			// dispatch to the application state machine
 			nents = append(nents, ents[i])
-		case config:
-			c := new(ConfigCmd)
-			err := json.Unmarshal(ents[i].Data, c)
-			if err != nil {
-				// warning
+		case configAdd:
+			c := new(Config)
+			if err := json.Unmarshal(ents[i].Data, c); err != nil {
+				golog.Println(err)
 				continue
 			}
-			n.updateConf(c)
+			n.sm.Add(c.NodeId)
+		case configRemove:
+			c := new(Config)
+			if err := json.Unmarshal(ents[i].Data, c); err != nil {
+				golog.Println(err)
+				continue
+			}
+			n.sm.Remove(c.NodeId)
 		default:
 			panic("unexpected entry type")
 		}
@@ -135,21 +143,10 @@ func (n *Node) Tick() {
 	}
 }
 
-func (n *Node) newConfMessage(c *ConfigCmd) Message {
+func (n *Node) newConfMessage(t int, c *Config) Message {
 	data, err := json.Marshal(c)
 	if err != nil {
 		panic(err)
 	}
-	return Message{Type: msgProp, To: n.addr, Entries: []Entry{Entry{Type: config, Data: data}}}
-}
-
-func (n *Node) updateConf(c *ConfigCmd) {
-	switch c.Type {
-	case "add":
-		n.sm.Add(c.Addr)
-	case "remove":
-		n.sm.Remove(c.Addr)
-	default:
-		// warn
-	}
+	return Message{Type: msgProp, To: n.addr, Entries: []Entry{Entry{Type: t, Data: data}}}
 }

+ 2 - 2
raft/raft.go

@@ -227,7 +227,7 @@ func (sm *stateMachine) becomeLeader() {
 	sm.state = stateLeader
 
 	for _, e := range sm.log.ents[sm.log.committed:] {
-		if e.Type == config {
+		if e.Type == configAdd || e.Type == configRemove {
 			sm.pendingConf = true
 		}
 	}
@@ -270,7 +270,7 @@ func (sm *stateMachine) Step(m Message) {
 		switch sm.lead {
 		case sm.addr:
 			e := m.Entries[0]
-			if e.Type == config {
+			if e.Type == configAdd || e.Type == configRemove {
 				if sm.pendingConf {
 					// todo: deny
 					return

+ 4 - 4
raft/raft_test.go

@@ -497,19 +497,19 @@ func TestConf(t *testing.T) {
 	sm.becomeCandidate()
 	sm.becomeLeader()
 
-	sm.Step(Message{Type: msgProp, Entries: []Entry{{Type: config}}})
+	sm.Step(Message{Type: msgProp, Entries: []Entry{{Type: configAdd}}})
 	if sm.log.lastIndex() != 1 {
 		t.Errorf("lastindex = %d, want %d", sm.log.lastIndex(), 1)
 	}
 	if !sm.pendingConf {
 		t.Errorf("pendingConf = %v, want %v", sm.pendingConf, true)
 	}
-	if sm.log.ents[1].Type != config {
-		t.Errorf("type = %d, want %d", sm.log.ents[1].Type, config)
+	if sm.log.ents[1].Type != configAdd {
+		t.Errorf("type = %d, want %d", sm.log.ents[1].Type, configAdd)
 	}
 
 	// deny the second configuration change request if there is a pending one
-	sm.Step(Message{Type: msgProp, Entries: []Entry{{Type: config}}})
+	sm.Step(Message{Type: msgProp, Entries: []Entry{{Type: configAdd}}})
 	if sm.log.lastIndex() != 1 {
 		t.Errorf("lastindex = %d, want %d", sm.log.lastIndex(), 1)
 	}