Browse Source

contrib/raftexample: Handle conf change entries

So far we don't propose conf changes, but we'll be ready to handle them
when we do.
Adam Wolfe Gordon 10 years ago
parent
commit
eb7fef559d
1 changed files with 32 additions and 9 deletions
  1. 32 9
      contrib/raftexample/raft.go

+ 32 - 9
contrib/raftexample/raft.go

@@ -81,15 +81,36 @@ func newRaftNode(id int, peers []string, proposeC <-chan string) (<-chan *string
 // whether all entries could be published.
 func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool {
 	for i := range ents {
-		if ents[i].Type != raftpb.EntryNormal || len(ents[i].Data) == 0 {
-			// ignore conf changes and empty messages
-			continue
-		}
-		s := string(ents[i].Data)
-		select {
-		case rc.commitC <- &s:
-		case <-rc.stopc:
-			return false
+		switch ents[i].Type {
+		case raftpb.EntryNormal:
+			if len(ents[i].Data) == 0 {
+				// ignore conf changes and empty messages
+				continue
+			}
+			s := string(ents[i].Data)
+			select {
+			case rc.commitC <- &s:
+			case <-rc.stopc:
+				return false
+			}
+
+		case raftpb.EntryConfChange:
+			var cc raftpb.ConfChange
+			cc.Unmarshal(ents[i].Data)
+
+			rc.node.ApplyConfChange(cc)
+			switch cc.Type {
+			case raftpb.ConfChangeAddNode:
+				if len(cc.Context) > 0 {
+					rc.transport.AddPeer(types.ID(cc.NodeID), []string{string(cc.Context)})
+				}
+			case raftpb.ConfChangeRemoveNode:
+				if cc.NodeID == uint64(rc.id) {
+					log.Println("I've been removed from the cluster! Shutting down.")
+					return false
+				}
+				rc.transport.RemovePeer(types.ID(cc.NodeID))
+			}
 		}
 	}
 	return true
@@ -193,6 +214,8 @@ func (rc *raftNode) stop() {
 	close(rc.commitC)
 	close(rc.errorC)
 	rc.node.Stop()
+
+	os.Exit(0)
 }
 
 func (rc *raftNode) stopHTTP() {