|
@@ -11,6 +11,7 @@ import (
|
|
|
|
|
|
|
|
type node struct {
|
|
type node struct {
|
|
|
raft.Node
|
|
raft.Node
|
|
|
|
|
+ id uint64
|
|
|
paused bool
|
|
paused bool
|
|
|
nt network
|
|
nt network
|
|
|
stopc chan struct{}
|
|
stopc chan struct{}
|
|
@@ -25,12 +26,18 @@ func startNode(id uint64, peers []raft.Peer, nt network) *node {
|
|
|
rn := raft.StartNode(id, peers, 10, 1, st)
|
|
rn := raft.StartNode(id, peers, 10, 1, st)
|
|
|
n := &node{
|
|
n := &node{
|
|
|
Node: rn,
|
|
Node: rn,
|
|
|
|
|
+ id: id,
|
|
|
storage: st,
|
|
storage: st,
|
|
|
nt: nt,
|
|
nt: nt,
|
|
|
- stopc: make(chan struct{}),
|
|
|
|
|
}
|
|
}
|
|
|
|
|
+ n.start()
|
|
|
|
|
+ return n
|
|
|
|
|
+}
|
|
|
|
|
|
|
|
|
|
+func (n *node) start() {
|
|
|
|
|
+ n.stopc = make(chan struct{})
|
|
|
ticker := time.Tick(5 * time.Millisecond)
|
|
ticker := time.Tick(5 * time.Millisecond)
|
|
|
|
|
+
|
|
|
go func() {
|
|
go func() {
|
|
|
for {
|
|
for {
|
|
|
select {
|
|
select {
|
|
@@ -39,32 +46,46 @@ func startNode(id uint64, peers []raft.Peer, nt network) *node {
|
|
|
case rd := <-n.Ready():
|
|
case rd := <-n.Ready():
|
|
|
if !raft.IsEmptyHardState(rd.HardState) {
|
|
if !raft.IsEmptyHardState(rd.HardState) {
|
|
|
n.state = rd.HardState
|
|
n.state = rd.HardState
|
|
|
|
|
+ n.storage.SetHardState(n.state)
|
|
|
}
|
|
}
|
|
|
n.storage.Append(rd.Entries)
|
|
n.storage.Append(rd.Entries)
|
|
|
go func() {
|
|
go func() {
|
|
|
for _, m := range rd.Messages {
|
|
for _, m := range rd.Messages {
|
|
|
- nt.send(m)
|
|
|
|
|
|
|
+ n.nt.send(m)
|
|
|
}
|
|
}
|
|
|
}()
|
|
}()
|
|
|
n.Advance()
|
|
n.Advance()
|
|
|
case m := <-n.nt.recv():
|
|
case m := <-n.nt.recv():
|
|
|
n.Step(context.TODO(), m)
|
|
n.Step(context.TODO(), m)
|
|
|
case <-n.stopc:
|
|
case <-n.stopc:
|
|
|
- log.Printf("raft.%d: stop", id)
|
|
|
|
|
|
|
+ n.Stop()
|
|
|
|
|
+ log.Printf("raft.%d: stop", n.id)
|
|
|
|
|
+ n.Node = nil
|
|
|
|
|
+ close(n.stopc)
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
}()
|
|
}()
|
|
|
- return n
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (n *node) stop() { close(n.stopc) }
|
|
|
|
|
-
|
|
|
|
|
-// restart restarts the node with the given delay.
|
|
|
|
|
-// All in memory state of node is reset to initialized state.
|
|
|
|
|
|
|
+// stop stops the node. stop a stopped node might panic.
|
|
|
|
|
+// All in memory state of node is discarded.
|
|
|
// All stable MUST be unchanged.
|
|
// All stable MUST be unchanged.
|
|
|
-func (n *node) restart(delay time.Duration) {
|
|
|
|
|
- panic("unimplemented")
|
|
|
|
|
|
|
+func (n *node) stop() {
|
|
|
|
|
+ n.nt.disconnect(n.id)
|
|
|
|
|
+ n.stopc <- struct{}{}
|
|
|
|
|
+ // wait for the shutdown
|
|
|
|
|
+ <-n.stopc
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+// restart restarts the node. restart a started node
|
|
|
|
|
+// blocks and might affect the future stop operation.
|
|
|
|
|
+func (n *node) restart() {
|
|
|
|
|
+ // wait for the shutdown
|
|
|
|
|
+ <-n.stopc
|
|
|
|
|
+ n.Node = raft.RestartNode(n.id, 10, 1, n.storage, 0)
|
|
|
|
|
+ n.start()
|
|
|
|
|
+ n.nt.connect(n.id)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// pause pauses the node.
|
|
// pause pauses the node.
|