|
|
@@ -19,7 +19,7 @@ func (sr stateResp) containsUpdates(prev stateResp) bool {
|
|
|
|
|
|
type Node struct {
|
|
|
ctx context.Context
|
|
|
- propc chan []byte
|
|
|
+ propc chan Message
|
|
|
recvc chan Message
|
|
|
statec chan stateResp
|
|
|
tickc chan struct{}
|
|
|
@@ -28,7 +28,7 @@ type Node struct {
|
|
|
func Start(ctx context.Context, id int64, peers []int64) *Node {
|
|
|
n := &Node{
|
|
|
ctx: ctx,
|
|
|
- propc: make(chan []byte),
|
|
|
+ propc: make(chan Message),
|
|
|
recvc: make(chan Message),
|
|
|
statec: make(chan stateResp),
|
|
|
tickc: make(chan struct{}),
|
|
|
@@ -67,8 +67,9 @@ func (n *Node) run(r *raft) {
|
|
|
}
|
|
|
|
|
|
select {
|
|
|
- case p := <-propc:
|
|
|
- r.propose(p)
|
|
|
+ case m := <-propc:
|
|
|
+ m.From = r.id
|
|
|
+ r.Step(m)
|
|
|
case m := <-n.recvc:
|
|
|
r.Step(m) // raft never returns an error
|
|
|
case <-n.tickc:
|
|
|
@@ -94,21 +95,21 @@ func (n *Node) Tick() error {
|
|
|
|
|
|
// Propose proposes data be appended to the log.
|
|
|
func (n *Node) Propose(ctx context.Context, data []byte) error {
|
|
|
- select {
|
|
|
- case n.propc <- data:
|
|
|
- return nil
|
|
|
- case <-ctx.Done():
|
|
|
- return ctx.Err()
|
|
|
- case <-n.ctx.Done():
|
|
|
- return n.ctx.Err()
|
|
|
- }
|
|
|
+ return n.Step(ctx, Message{Type: msgProp, Entries: []Entry{{Data: data}}})
|
|
|
}
|
|
|
|
|
|
// Step advances the state machine using m.
|
|
|
-func (n *Node) Step(m Message) error {
|
|
|
+func (n *Node) Step(ctx context.Context, m Message) error {
|
|
|
+ ch := n.recvc
|
|
|
+ if m.Type == msgProp {
|
|
|
+ ch = n.propc
|
|
|
+ }
|
|
|
+
|
|
|
select {
|
|
|
- case n.recvc <- m:
|
|
|
+ case ch <- m:
|
|
|
return nil
|
|
|
+ case <-ctx.Done():
|
|
|
+ return ctx.Err()
|
|
|
case <-n.ctx.Done():
|
|
|
return n.ctx.Err()
|
|
|
}
|