|
|
@@ -36,12 +36,14 @@ import (
|
|
|
|
|
|
// A key-value stream backed by raft
|
|
|
type raftNode struct {
|
|
|
- proposeC <-chan string // proposed messages (k,v)
|
|
|
- commitC chan *string // entries committed to log (k,v)
|
|
|
- errorC chan error // errors from raft session
|
|
|
+ proposeC <-chan string // proposed messages (k,v)
|
|
|
+ confChangeC <-chan raftpb.ConfChange // proposed cluster config changes
|
|
|
+ commitC chan *string // entries committed to log (k,v)
|
|
|
+ errorC chan error // errors from raft session
|
|
|
|
|
|
id int // client ID for raft session
|
|
|
peers []string // raft peer URLs
|
|
|
+ join bool // node is joining an existing cluster
|
|
|
waldir string // path to WAL directory
|
|
|
|
|
|
// raft backing for the commit/error channel
|
|
|
@@ -59,13 +61,17 @@ type raftNode struct {
|
|
|
// provided the proposal channel. All log entries are replayed over the
|
|
|
// commit channel, followed by a nil message (to indicate the channel is
|
|
|
// current), then new log entries. To shutdown, close proposeC and read errorC.
|
|
|
-func newRaftNode(id int, peers []string, proposeC <-chan string) (<-chan *string, <-chan error) {
|
|
|
+func newRaftNode(id int, peers []string, join bool, proposeC <-chan string,
|
|
|
+ confChangeC <-chan raftpb.ConfChange) (<-chan *string, <-chan error) {
|
|
|
+
|
|
|
rc := &raftNode{
|
|
|
proposeC: proposeC,
|
|
|
+ confChangeC: confChangeC,
|
|
|
commitC: make(chan *string),
|
|
|
errorC: make(chan error),
|
|
|
id: id,
|
|
|
peers: peers,
|
|
|
+ join: join,
|
|
|
waldir: fmt.Sprintf("raftexample-%d", id),
|
|
|
raftStorage: raft.NewMemoryStorage(),
|
|
|
stopc: make(chan struct{}),
|
|
|
@@ -81,15 +87,36 @@ func newRaftNode(id int, peers []string, proposeC <-chan string) (<-chan *string
|
|
|
// whether all entries could be published.
|
|
|
func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool {
|
|
|
for i := range ents {
|
|
|
- if ents[i].Type != raftpb.EntryNormal || len(ents[i].Data) == 0 {
|
|
|
- // ignore conf changes and empty messages
|
|
|
- continue
|
|
|
- }
|
|
|
- s := string(ents[i].Data)
|
|
|
- select {
|
|
|
- case rc.commitC <- &s:
|
|
|
- case <-rc.stopc:
|
|
|
- return false
|
|
|
+ switch ents[i].Type {
|
|
|
+ case raftpb.EntryNormal:
|
|
|
+ if len(ents[i].Data) == 0 {
|
|
|
+ // ignore conf changes and empty messages
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ s := string(ents[i].Data)
|
|
|
+ select {
|
|
|
+ case rc.commitC <- &s:
|
|
|
+ case <-rc.stopc:
|
|
|
+ return false
|
|
|
+ }
|
|
|
+
|
|
|
+ case raftpb.EntryConfChange:
|
|
|
+ var cc raftpb.ConfChange
|
|
|
+ cc.Unmarshal(ents[i].Data)
|
|
|
+
|
|
|
+ rc.node.ApplyConfChange(cc)
|
|
|
+ switch cc.Type {
|
|
|
+ case raftpb.ConfChangeAddNode:
|
|
|
+ if len(cc.Context) > 0 {
|
|
|
+ rc.transport.AddPeer(types.ID(cc.NodeID), []string{string(cc.Context)})
|
|
|
+ }
|
|
|
+ case raftpb.ConfChangeRemoveNode:
|
|
|
+ if cc.NodeID == uint64(rc.id) {
|
|
|
+ log.Println("I've been removed from the cluster! Shutting down.")
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ rc.transport.RemovePeer(types.ID(cc.NodeID))
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
return true
|
|
|
@@ -161,7 +188,11 @@ func (rc *raftNode) startRaft() {
|
|
|
if oldwal {
|
|
|
rc.node = raft.RestartNode(c)
|
|
|
} else {
|
|
|
- rc.node = raft.StartNode(c, rpeers)
|
|
|
+ startPeers := rpeers
|
|
|
+ if rc.join {
|
|
|
+ startPeers = nil
|
|
|
+ }
|
|
|
+ rc.node = raft.StartNode(c, startPeers)
|
|
|
}
|
|
|
|
|
|
ss := &stats.ServerStats{}
|
|
|
@@ -193,6 +224,8 @@ func (rc *raftNode) stop() {
|
|
|
close(rc.commitC)
|
|
|
close(rc.errorC)
|
|
|
rc.node.Stop()
|
|
|
+
|
|
|
+ os.Exit(0)
|
|
|
}
|
|
|
|
|
|
func (rc *raftNode) stopHTTP() {
|
|
|
@@ -209,9 +242,27 @@ func (rc *raftNode) serveChannels() {
|
|
|
|
|
|
// send proposals over raft
|
|
|
go func() {
|
|
|
- for prop := range rc.proposeC {
|
|
|
- // blocks until accepted by raft state machine
|
|
|
- rc.node.Propose(context.TODO(), []byte(prop))
|
|
|
+ var confChangeCount uint64 = 0
|
|
|
+
|
|
|
+ for rc.proposeC != nil && rc.confChangeC != nil {
|
|
|
+ select {
|
|
|
+ case prop, ok := <-rc.proposeC:
|
|
|
+ if !ok {
|
|
|
+ rc.proposeC = nil
|
|
|
+ } else {
|
|
|
+ // blocks until accepted by raft state machine
|
|
|
+ rc.node.Propose(context.TODO(), []byte(prop))
|
|
|
+ }
|
|
|
+
|
|
|
+ case cc, ok := <-rc.confChangeC:
|
|
|
+ if !ok {
|
|
|
+ rc.confChangeC = nil
|
|
|
+ } else {
|
|
|
+ confChangeCount += 1
|
|
|
+ cc.ID = confChangeCount
|
|
|
+ rc.node.ProposeConfChange(context.TODO(), cc)
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
// client closed channel; shutdown raft if not already
|
|
|
close(rc.stopc)
|
|
|
@@ -228,7 +279,7 @@ func (rc *raftNode) serveChannels() {
|
|
|
rc.wal.Save(rd.HardState, rd.Entries)
|
|
|
rc.raftStorage.Append(rd.Entries)
|
|
|
rc.transport.Send(rd.Messages)
|
|
|
- if ok := rc.publishEntries(rd.Entries); !ok {
|
|
|
+ if ok := rc.publishEntries(rd.CommittedEntries); !ok {
|
|
|
rc.stop()
|
|
|
return
|
|
|
}
|