Browse Source

raft: unblock progress by deproitizing proposals in Step

Blake Mizerany 11 years ago
parent
commit
481e229ad4
1 changed files with 31 additions and 15 deletions
  1. 31 15
      raft/node.go

+ 31 - 15
raft/node.go

@@ -1,7 +1,11 @@
 // Package raft implements raft.
 package raft
 
-import "code.google.com/p/go.net/context"
+import (
+	"sort"
+
+	"code.google.com/p/go.net/context"
+)
 
 type stateResp struct {
 	st          State
@@ -95,24 +99,30 @@ func (n *Node) Tick() error {
 
 // Propose proposes data be appended to the log.
 func (n *Node) Propose(ctx context.Context, data []byte) error {
-	return n.Step(ctx, Message{Type: msgProp, Entries: []Entry{{Data: data}}})
+	return n.Step(ctx, []Message{{Type: msgProp, Entries: []Entry{{Data: data}}}})
 }
 
-// Step advances the state machine using m.
-func (n *Node) Step(ctx context.Context, m Message) error {
-	ch := n.recvc
-	if m.Type == msgProp {
-		ch = n.propc
-	}
+// Step advances the state machine using msgs. Proposals are priotized last so
+// that any votes and vote requests will not be wedged behind proposals and
+// prevent this cluster from making progress.
+func (n *Node) Step(ctx context.Context, msgs []Message) error {
+	sort.Sort(sort.Reverse(messages(msgs)))
+	for _, m := range msgs {
+		ch := n.recvc
+		if m.Type == msgProp {
+			ch = n.propc
+		}
 
-	select {
-	case ch <- m:
-		return nil
-	case <-ctx.Done():
-		return ctx.Err()
-	case <-n.ctx.Done():
-		return n.ctx.Err()
+		select {
+		case ch <- m:
+			return nil
+		case <-ctx.Done():
+			return ctx.Err()
+		case <-n.ctx.Done():
+			return n.ctx.Err()
+		}
 	}
+	return nil
 }
 
 // ReadState returns the current point-in-time state.
@@ -126,3 +136,9 @@ func (n *Node) ReadState(ctx context.Context) (st State, ents, cents []Entry, ms
 		return State{}, nil, nil, nil, n.ctx.Err()
 	}
 }
+
+type messages []Message
+
+func (msgs messages) Len() int           { return len(msgs) }
+func (msgs messages) Less(i, j int) bool { return msgs[i].Type == msgProp }
+func (msgs messages) Swap(i, j int)      { msgs[i], msgs[j] = msgs[i], msgs[j] }