Browse Source

raft: node.tick

Xiang Li 11 years ago
parent
commit
09d1575eeb
3 changed files with 116 additions and 10 deletions
  1. 52 10
      raft/node.go
  2. 55 0
      raft/node_test.go
  3. 9 0
      raft/raft.go

+ 52 - 10
raft/node.go

@@ -1,20 +1,35 @@
 package raft
 
-import "sync"
-
 type Interface interface {
 	Step(m Message)
 }
 
+type tick int
+
 type Node struct {
-	lk sync.Mutex
-	sm *stateMachine
+	// election timeout and heartbeat timeout in tick
+	election  tick
+	heartbeat tick
+
+	// elapsed ticks after the last reset
+	elapsed tick
+	sm      *stateMachine
+
+	next Interface
 }
 
-func New(k, addr int, next Interface) *Node {
+func New(k, addr int, heartbeat, election tick, next Interface) *Node {
+	if election < heartbeat*3 {
+		panic("election is least three times as heartbeat [election: %d, heartbeat: %d]")
+	}
+
 	n := &Node{
-		sm: newStateMachine(k, addr),
+		sm:        newStateMachine(k, addr),
+		next:      next,
+		heartbeat: heartbeat,
+		election:  election,
 	}
+
 	return n
 }
 
@@ -25,15 +40,42 @@ func (n *Node) Propose(data []byte) {
 }
 
 func (n *Node) Step(m Message) {
-	n.lk.Lock()
-	defer n.lk.Unlock()
 	n.sm.Step(m)
+	ms := n.sm.Msgs()
+	for _, m := range ms {
+		// reset elapsed in two cases:
+		// msgAppResp -> heard from the leader of the same term
+		// msgVoteResp with grant -> heard from the candidate the node voted for
+		switch m.Type {
+		case msgAppResp:
+			n.elapsed = 0
+		case msgVoteResp:
+			if m.Index >= 0 {
+				n.elapsed = 0
+			}
+		}
+		n.next.Step(m)
+	}
 }
 
 // Next advances the commit index and returns any new
 // commitable entries.
 func (n *Node) Next() []Entry {
-	n.lk.Lock()
-	defer n.lk.Unlock()
 	return n.sm.nextEnts()
 }
+
+// Tick triggers the node to do a tick.
+// If the current elapsed is greater or equal than the timeout,
+// node will send corresponding message to the statemachine.
+func (n *Node) Tick() {
+	timeout, msgType := n.election, msgHup
+	if n.sm.state == stateLeader {
+		timeout, msgType = n.heartbeat, msgBeat
+	}
+	if n.elapsed >= timeout {
+		n.Step(Message{Type: msgType})
+		n.elapsed = 0
+	} else {
+		n.elapsed++
+	}
+}

+ 55 - 0
raft/node_test.go

@@ -0,0 +1,55 @@
+package raft
+
+import "testing"
+
+const (
+	defaultHeartbeat = 1
+	defaultElection  = 5
+)
+
+func TestTickMsgHub(t *testing.T) {
+	n := New(3, 0, defaultHeartbeat, defaultElection, nil)
+
+	called := false
+	n.next = stepperFunc(func(m Message) {
+		if m.Type == msgVote {
+			called = true
+		}
+	})
+
+	for i := 0; i < defaultElection+1; i++ {
+		n.Tick()
+	}
+
+	if !called {
+		t.Errorf("called = %v, want true", called)
+	}
+}
+
+func TestTickMsgBeat(t *testing.T) {
+	k := 3
+	n := New(k, 0, defaultHeartbeat, defaultElection, nil)
+
+	called := 0
+	n.next = stepperFunc(func(m Message) {
+		if m.Type == msgApp {
+			called++
+		}
+		if m.Type == msgVote {
+			n.Step(Message{From: 1, Type: msgVoteResp, Index: 1, Term: 1})
+		}
+	})
+
+	n.Step(Message{Type: msgHup}) // become leader please
+
+	for i := 0; i < defaultHeartbeat+1; i++ {
+		n.Tick()
+	}
+
+	// becomeLeader -> k-1 append
+	// msgBeat -> k-1 append
+	w := (k - 1) * 2
+	if called != w {
+		t.Errorf("called = %v, want %v", called, w)
+	}
+}

+ 9 - 0
raft/raft.go

@@ -11,6 +11,7 @@ type messageType int
 
 const (
 	msgHup messageType = iota
+	msgBeat
 	msgProp
 	msgApp
 	msgAppResp
@@ -20,6 +21,7 @@ const (
 
 var mtmap = [...]string{
 	msgHup:      "msgHup",
+	msgBeat:     "msgBeat",
 	msgProp:     "msgProp",
 	msgApp:      "msgApp",
 	msgAppResp:  "msgAppResp",
@@ -247,6 +249,13 @@ func (sm *stateMachine) Step(m Message) {
 			sm.send(Message{To: i, Type: msgVote, Index: lasti, LogTerm: sm.log.term(lasti)})
 		}
 		return
+	case msgBeat:
+		if sm.state != stateLeader {
+			return
+		}
+		// todo(xiangli) broadcast append
+		// blocker github issue #13
+		sm.sendAppend()
 	case msgProp:
 		switch sm.lead {
 		case sm.addr: