Browse Source

contrib/raftexample: Add a channel for proposing config changes

Add a channel over which we can propose cluster config changes to
raft. In an upcoming commit we'll add an HTTP endpoint that sends config
changes over this channel.
Adam Wolfe Gordon 10 years ago
parent
commit
7d862960cc
3 changed files with 49 additions and 17 deletions
  1. 5 1
      contrib/raftexample/main.go
  2. 29 7
      contrib/raftexample/raft.go
  3. 15 9
      contrib/raftexample/raftexample_test.go

+ 5 - 1
contrib/raftexample/main.go

@@ -17,6 +17,8 @@ package main
 import (
 	"flag"
 	"strings"
+
+	"github.com/coreos/etcd/raft/raftpb"
 )
 
 func main() {
@@ -27,9 +29,11 @@ func main() {
 
 	proposeC := make(chan string)
 	defer close(proposeC)
+	confChangeC := make(chan raftpb.ConfChange)
+	defer close(confChangeC)
 
 	// raft provides a commit stream for the proposals from the http api
-	commitC, errorC := newRaftNode(*id, strings.Split(*cluster, ","), proposeC)
+	commitC, errorC := newRaftNode(*id, strings.Split(*cluster, ","), proposeC, confChangeC)
 
 	// the key-value http handler will propose updates to raft
 	serveHttpKVAPI(*kvport, proposeC, commitC, errorC)

+ 29 - 7
contrib/raftexample/raft.go

@@ -36,9 +36,10 @@ import (
 
 // A key-value stream backed by raft
 type raftNode struct {
-	proposeC <-chan string // proposed messages (k,v)
-	commitC  chan *string  // entries committed to log (k,v)
-	errorC   chan error    // errors from raft session
+	proposeC    <-chan string            // proposed messages (k,v)
+	confChangeC <-chan raftpb.ConfChange // proposed cluster config changes
+	commitC     chan *string             // entries committed to log (k,v)
+	errorC      chan error               // errors from raft session
 
 	id     int      // client ID for raft session
 	peers  []string // raft peer URLs
@@ -59,9 +60,12 @@ type raftNode struct {
 // provided the proposal channel. All log entries are replayed over the
 // commit channel, followed by a nil message (to indicate the channel is
 // current), then new log entries. To shutdown, close proposeC and read errorC.
-func newRaftNode(id int, peers []string, proposeC <-chan string) (<-chan *string, <-chan error) {
+func newRaftNode(id int, peers []string, proposeC <-chan string,
+	confChangeC <-chan raftpb.ConfChange) (<-chan *string, <-chan error) {
+
 	rc := &raftNode{
 		proposeC:    proposeC,
+		confChangeC: confChangeC,
 		commitC:     make(chan *string),
 		errorC:      make(chan error),
 		id:          id,
@@ -232,9 +236,27 @@ func (rc *raftNode) serveChannels() {
 
 	// send proposals over raft
 	go func() {
-		for prop := range rc.proposeC {
-			// blocks until accepted by raft state machine
-			rc.node.Propose(context.TODO(), []byte(prop))
+		var confChangeCount uint64 = 0
+
+		for rc.proposeC != nil && rc.confChangeC != nil {
+			select {
+			case prop, ok := <-rc.proposeC:
+				if !ok {
+					rc.proposeC = nil
+				} else {
+					// blocks until accepted by raft state machine
+					rc.node.Propose(context.TODO(), []byte(prop))
+				}
+
+			case cc, ok := <-rc.confChangeC:
+				if !ok {
+					rc.confChangeC = nil
+				} else {
+					confChangeCount += 1
+					cc.ID = confChangeCount
+					rc.node.ProposeConfChange(context.TODO(), cc)
+				}
+			}
 		}
 		// client closed channel; shutdown raft if not already
 		close(rc.stopc)

+ 15 - 9
contrib/raftexample/raftexample_test.go

@@ -18,13 +18,16 @@ import (
 	"fmt"
 	"os"
 	"testing"
+
+	"github.com/coreos/etcd/raft/raftpb"
 )
 
 type cluster struct {
-	peers    []string
-	commitC  []<-chan *string
-	errorC   []<-chan error
-	proposeC []chan string
+	peers       []string
+	commitC     []<-chan *string
+	errorC      []<-chan error
+	proposeC    []chan string
+	confChangeC []chan raftpb.ConfChange
 }
 
 // newCluster creates a cluster of n nodes
@@ -35,15 +38,18 @@ func newCluster(n int) *cluster {
 	}
 
 	clus := &cluster{
-		peers:    peers,
-		commitC:  make([]<-chan *string, len(peers)),
-		errorC:   make([]<-chan error, len(peers)),
-		proposeC: make([]chan string, len(peers))}
+		peers:       peers,
+		commitC:     make([]<-chan *string, len(peers)),
+		errorC:      make([]<-chan error, len(peers)),
+		proposeC:    make([]chan string, len(peers)),
+		confChangeC: make([]chan raftpb.ConfChange, len(peers)),
+	}
 
 	for i := range clus.peers {
 		os.RemoveAll(fmt.Sprintf("raftexample-%d", i+1))
 		clus.proposeC[i] = make(chan string, 1)
-		clus.commitC[i], clus.errorC[i] = newRaftNode(i+1, clus.peers, clus.proposeC[i])
+		clus.confChangeC[i] = make(chan raftpb.ConfChange, 1)
+		clus.commitC[i], clus.errorC[i] = newRaftNode(i+1, clus.peers, clus.proposeC[i], clus.confChangeC[i])
 	}
 
 	return clus