|
|
@@ -1,5 +1,9 @@
|
|
|
package raft
|
|
|
|
|
|
+import (
|
|
|
+ "encoding/json"
|
|
|
+)
|
|
|
+
|
|
|
type Interface interface {
|
|
|
Step(m Message)
|
|
|
Msgs() []Message
|
|
|
@@ -7,6 +11,11 @@ type Interface interface {
|
|
|
|
|
|
type tick int
|
|
|
|
|
|
+type ConfigCmd struct {
|
|
|
+ Type string
|
|
|
+ Id int
|
|
|
+}
|
|
|
+
|
|
|
type Node struct {
|
|
|
// election timeout and heartbeat timeout in tick
|
|
|
election tick
|
|
|
@@ -37,6 +46,20 @@ func (n *Node) Propose(data []byte) {
|
|
|
n.Step(m)
|
|
|
}
|
|
|
|
|
|
+func (n *Node) Add(id int) {
|
|
|
+ c := &ConfigCmd{
|
|
|
+ Type: "add",
|
|
|
+ Id: id,
|
|
|
+ }
|
|
|
+
|
|
|
+ data, err := json.Marshal(c)
|
|
|
+ if err != nil {
|
|
|
+ panic(err)
|
|
|
+ }
|
|
|
+ m := Message{Type: msgProp, Entries: []Entry{Entry{Type: config, Data: data}}}
|
|
|
+ n.Step(m)
|
|
|
+}
|
|
|
+
|
|
|
func (n *Node) Msgs() []Message {
|
|
|
return n.sm.Msgs()
|
|
|
}
|
|
|
@@ -59,10 +82,25 @@ func (n *Node) Step(m Message) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-// Next advances the commit index and returns any new
|
|
|
-// commitable entries.
|
|
|
-func (n *Node) Next() []Entry {
|
|
|
- return n.sm.nextEnts()
|
|
|
+// Next applies all available committed commands.
|
|
|
+func (n *Node) Next() {
|
|
|
+ ents := n.sm.nextEnts()
|
|
|
+ for i := range ents {
|
|
|
+ switch ents[i].Type {
|
|
|
+ case normal:
|
|
|
+ // dispatch to the application state machine
|
|
|
+ case config:
|
|
|
+ c := new(ConfigCmd)
|
|
|
+ err := json.Unmarshal(ents[i].Data, c)
|
|
|
+ if err != nil {
|
|
|
+ // warning
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ n.updateConf(c)
|
|
|
+ default:
|
|
|
+ panic("unexpected entry type")
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// Tick triggers the node to do a tick.
|
|
|
@@ -80,3 +118,12 @@ func (n *Node) Tick() {
|
|
|
n.elapsed++
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+func (n *Node) updateConf(c *ConfigCmd) {
|
|
|
+ switch c.Type {
|
|
|
+ case "add":
|
|
|
+ n.sm.Add(c.Id)
|
|
|
+ default:
|
|
|
+ // warn
|
|
|
+ }
|
|
|
+}
|