|
@@ -2,10 +2,14 @@
|
|
|
package raft
|
|
package raft
|
|
|
|
|
|
|
|
import (
|
|
import (
|
|
|
|
|
+ "errors"
|
|
|
|
|
+
|
|
|
"code.google.com/p/go.net/context"
|
|
"code.google.com/p/go.net/context"
|
|
|
pb "github.com/coreos/etcd/raft/raftpb"
|
|
pb "github.com/coreos/etcd/raft/raftpb"
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
|
|
+var ErrStopped = errors.New("raft: stopped")
|
|
|
|
|
+
|
|
|
type Ready struct {
|
|
type Ready struct {
|
|
|
// The current state of a Node
|
|
// The current state of a Node
|
|
|
pb.State
|
|
pb.State
|
|
@@ -39,22 +43,27 @@ type Node struct {
|
|
|
readyc chan Ready
|
|
readyc chan Ready
|
|
|
tickc chan struct{}
|
|
tickc chan struct{}
|
|
|
alwaysreadyc chan Ready
|
|
alwaysreadyc chan Ready
|
|
|
|
|
+ done chan struct{}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func Start(ctx context.Context, id int64, peers []int64) Node {
|
|
|
|
|
|
|
+func Start(id int64, peers []int64) Node {
|
|
|
n := Node{
|
|
n := Node{
|
|
|
- ctx: ctx,
|
|
|
|
|
propc: make(chan pb.Message),
|
|
propc: make(chan pb.Message),
|
|
|
recvc: make(chan pb.Message),
|
|
recvc: make(chan pb.Message),
|
|
|
readyc: make(chan Ready),
|
|
readyc: make(chan Ready),
|
|
|
tickc: make(chan struct{}),
|
|
tickc: make(chan struct{}),
|
|
|
alwaysreadyc: make(chan Ready),
|
|
alwaysreadyc: make(chan Ready),
|
|
|
|
|
+ done: make(chan struct{}),
|
|
|
}
|
|
}
|
|
|
r := newRaft(id, peers)
|
|
r := newRaft(id, peers)
|
|
|
go n.run(r)
|
|
go n.run(r)
|
|
|
return n
|
|
return n
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+func (n *Node) Stop() {
|
|
|
|
|
+ close(n.done)
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
func (n *Node) run(r *raft) {
|
|
func (n *Node) run(r *raft) {
|
|
|
propc := n.propc
|
|
propc := n.propc
|
|
|
readyc := n.readyc
|
|
readyc := n.readyc
|
|
@@ -98,7 +107,7 @@ func (n *Node) run(r *raft) {
|
|
|
r.msgs = nil
|
|
r.msgs = nil
|
|
|
case n.alwaysreadyc <- rd:
|
|
case n.alwaysreadyc <- rd:
|
|
|
// this is for testing only
|
|
// this is for testing only
|
|
|
- case <-n.ctx.Done():
|
|
|
|
|
|
|
+ case <-n.done:
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
@@ -108,7 +117,7 @@ func (n *Node) Tick() error {
|
|
|
select {
|
|
select {
|
|
|
case n.tickc <- struct{}{}:
|
|
case n.tickc <- struct{}{}:
|
|
|
return nil
|
|
return nil
|
|
|
- case <-n.ctx.Done():
|
|
|
|
|
|
|
+ case <-n.done:
|
|
|
return n.ctx.Err()
|
|
return n.ctx.Err()
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
@@ -135,8 +144,8 @@ func (n *Node) Step(ctx context.Context, m pb.Message) error {
|
|
|
return nil
|
|
return nil
|
|
|
case <-ctx.Done():
|
|
case <-ctx.Done():
|
|
|
return ctx.Err()
|
|
return ctx.Err()
|
|
|
- case <-n.ctx.Done():
|
|
|
|
|
- return n.ctx.Err()
|
|
|
|
|
|
|
+ case <-n.done:
|
|
|
|
|
+ return ErrStopped
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|