Browse Source

raft: attempt first version of Interface

Blake Mizerany 11 years ago
parent
commit
50e0db4038
3 changed files with 58 additions and 33 deletions
  1. 31 0
      raft/node.go
  2. 4 8
      raft/raft.go
  3. 23 25
      raft/raft_test.go

+ 31 - 0
raft/node.go

@@ -0,0 +1,31 @@
+package raft
+
+import "sync"
+
+type Interface interface {
+	Step(m Message)
+}
+
+type Node struct {
+	lk sync.Mutex
+	sm *stateMachine
+}
+
+func New(k, addr int, next Interface) Interface {
+	n := &Node{
+		sm: newStateMachine(k, addr, next),
+	}
+	return n
+}
+
+// Propose asynchronously proposes data be applied to the underlying state machine.
+func (n *Node) Propose(data []byte) {
+	m := Message{Type: msgHup, Data: data}
+	n.Step(m)
+}
+
+func (n *Node) Step(m Message) {
+	n.lk.Lock()
+	defer n.lk.Unlock()
+	n.sm.Step(m)
+}

+ 4 - 8
raft/raft.go

@@ -69,10 +69,6 @@ type Message struct {
 	Data     []byte
 }
 
-type stepper interface {
-	step(m Message)
-}
-
 type index struct {
 	match, next int
 }
@@ -112,13 +108,13 @@ type stateMachine struct {
 
 	votes map[int]bool
 
-	next stepper
+	next Interface
 
 	// the leader addr
 	lead int
 }
 
-func newStateMachine(k, addr int, next stepper) *stateMachine {
+func newStateMachine(k, addr int, next Interface) *stateMachine {
 	log := make([]Entry, 1, 1024)
 	sm := &stateMachine{k: k, addr: addr, next: next, log: log}
 	sm.reset()
@@ -160,7 +156,7 @@ func (sm *stateMachine) isLogOk(i, term int) bool {
 func (sm *stateMachine) send(m Message) {
 	m.From = sm.addr
 	m.Term = sm.term
-	sm.next.step(m)
+	sm.next.Step(m)
 }
 
 // sendAppend sends RRPC, with entries to all peers that are not up-to-date according to sm.mis.
@@ -236,7 +232,7 @@ func (sm *stateMachine) becomeFollower(term, lead int) {
 	sm.state = stateFollower
 }
 
-func (sm *stateMachine) step(m Message) {
+func (sm *stateMachine) Step(m Message) {
 	switch m.Type {
 	case msgHup:
 		sm.term++

+ 23 - 25
raft/raft_test.go

@@ -45,7 +45,7 @@ func TestLeaderElection(t *testing.T) {
 	}
 
 	for i, tt := range tests {
-		tt.step(Message{To: 0, Type: msgHup})
+		tt.Step(Message{To: 0, Type: msgHup})
 		sm := tt.network.ss[0].(*stateMachine)
 		if sm.state != tt.state {
 			t.Errorf("#%d: state = %s, want %s", i, sm.state, tt.state)
@@ -65,7 +65,7 @@ func TestDualingCandidates(t *testing.T) {
 	heal := false
 	next := stepperFunc(func(m Message) {
 		if heal {
-			tt.step(m)
+			tt.Step(m)
 		}
 	})
 	a.next = next
@@ -74,12 +74,12 @@ func TestDualingCandidates(t *testing.T) {
 	tt.tee = stepperFunc(func(m Message) {
 		t.Logf("m = %+v", m)
 	})
-	tt.step(Message{To: 0, Type: msgHup})
-	tt.step(Message{To: 2, Type: msgHup})
+	tt.Step(Message{To: 0, Type: msgHup})
+	tt.Step(Message{To: 2, Type: msgHup})
 
 	t.Log("healing")
 	heal = true
-	tt.step(Message{To: 2, Type: msgHup})
+	tt.Step(Message{To: 2, Type: msgHup})
 
 	tests := []struct {
 		sm    *stateMachine
@@ -115,15 +115,15 @@ func TestCandidateConcede(t *testing.T) {
 
 	a.next = nopStepper
 
-	tt.step(Message{To: 0, Type: msgHup})
-	tt.step(Message{To: 2, Type: msgHup})
+	tt.Step(Message{To: 0, Type: msgHup})
+	tt.Step(Message{To: 2, Type: msgHup})
 
 	// heal the partition
 	a.next = tt
 
 	data := []byte("force follower")
 	// send a proposal to 2 to flush out a msgApp to 0
-	tt.step(Message{To: 2, Type: msgProp, Data: data})
+	tt.Step(Message{To: 2, Type: msgProp, Data: data})
 
 	if g := a.state; g != stateFollower {
 		t.Errorf("state = %s, want %s", g, stateFollower)
@@ -142,11 +142,11 @@ func TestCandidateConcede(t *testing.T) {
 func TestOldMessages(t *testing.T) {
 	tt := newNetwork(nil, nil, nil)
 	// make 0 leader @ term 3
-	tt.step(Message{To: 0, Type: msgHup})
-	tt.step(Message{To: 0, Type: msgHup})
-	tt.step(Message{To: 0, Type: msgHup})
+	tt.Step(Message{To: 0, Type: msgHup})
+	tt.Step(Message{To: 0, Type: msgHup})
+	tt.Step(Message{To: 0, Type: msgHup})
 	// pretend we're an old leader trying to make progress
-	tt.step(Message{To: 0, Type: msgApp, Term: 1, Entries: []Entry{{Term: 1}}})
+	tt.Step(Message{To: 0, Type: msgApp, Term: 1, Entries: []Entry{{Term: 1}}})
 	if g := diffLogs(defaultLog, tt.logs()); g != nil {
 		for _, diff := range g {
 			t.Errorf("bag log:\n%s", diff)
@@ -184,7 +184,7 @@ func TestProposal(t *testing.T) {
 					}
 				}
 			}()
-			tt.step(m)
+			tt.Step(m)
 		})
 
 		data := []byte("somedata")
@@ -224,10 +224,10 @@ func TestProposalByProxy(t *testing.T) {
 		})
 
 		// promote 0 the leader
-		tt.step(Message{To: 0, Type: msgHup})
+		tt.Step(Message{To: 0, Type: msgHup})
 
 		// propose via follower
-		tt.step(Message{To: 1, Type: msgProp, Data: []byte("somedata")})
+		tt.Step(Message{To: 1, Type: msgProp, Data: []byte("somedata")})
 
 		wantLog := []Entry{{}, {Term: 1, Data: data}}
 		if g := diffLogs(wantLog, tt.logs()); g != nil {
@@ -277,7 +277,7 @@ func TestVote(t *testing.T) {
 				t.Errorf("#%d, m.Index = %d, want %d", i, m.Index, tt.w)
 			}
 		})
-		sm.step(Message{Type: msgVote, Index: tt.i, LogTerm: tt.term})
+		sm.Step(Message{Type: msgVote, Index: tt.i, LogTerm: tt.term})
 		if !called {
 			t.Fatal("#%d: not called", i)
 		}
@@ -302,14 +302,14 @@ func TestLogDiff(t *testing.T) {
 }
 
 type network struct {
-	tee stepper
-	ss  []stepper
+	tee Interface
+	ss  []Interface
 }
 
 // newNetwork initializes a network from nodes. A nil node will be replaced
 // with a new *stateMachine. A *stateMachine will get its k, addr, and next
 // fields set.
-func newNetwork(nodes ...stepper) *network {
+func newNetwork(nodes ...Interface) *network {
 	nt := &network{ss: nodes}
 	for i, n := range nodes {
 		switch v := n.(type) {
@@ -328,11 +328,11 @@ func newNetwork(nodes ...stepper) *network {
 	return nt
 }
 
-func (nt network) step(m Message) {
+func (nt network) Step(m Message) {
 	if nt.tee != nil {
-		nt.tee.step(m)
+		nt.tee.Step(m)
 	}
-	nt.ss[m.To].step(m)
+	nt.ss[m.To].Step(m)
 }
 
 // logs returns all logs in nt prepended with want. If a node is not a
@@ -424,8 +424,6 @@ func diffLogs(base []Entry, logs [][]Entry) []diff {
 
 type stepperFunc func(Message)
 
-func (f stepperFunc) step(m Message) { f(m) }
+func (f stepperFunc) Step(m Message) { f(m) }
 
 var nopStepper = stepperFunc(func(Message) {})
-
-type nextStepperFunc func(Message, stepper)