Browse Source

contrib/raftexample: Allow nodes to be added to a running cluster

A node with ID n can be added by POSTing the new node's URL to /n on the
HTTP server.
Adam Wolfe Gordon 10 years ago
parent
commit
7c0b6d9be9

+ 37 - 4
contrib/raftexample/httpapi.go

@@ -19,11 +19,14 @@ import (
 	"log"
 	"log"
 	"net/http"
 	"net/http"
 	"strconv"
 	"strconv"
+
+	"github.com/coreos/etcd/raft/raftpb"
 )
 )
 
 
 // Handler for a http based key-value store backed by raft
 // Handler for a http based key-value store backed by raft
 type httpKVAPI struct {
 type httpKVAPI struct {
-	store *kvstore
+	store       *kvstore
+	confChangeC chan<- raftpb.ConfChange
 }
 }
 
 
 func (h *httpKVAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 func (h *httpKVAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@@ -48,18 +51,48 @@ func (h *httpKVAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 		} else {
 		} else {
 			http.Error(w, "Failed to GET", http.StatusNotFound)
 			http.Error(w, "Failed to GET", http.StatusNotFound)
 		}
 		}
+	case r.Method == "POST":
+		url, err := ioutil.ReadAll(r.Body)
+		if err != nil {
+			log.Printf("Failed to read on POST (%v)\n", err)
+			http.Error(w, "Failed on POST", http.StatusBadRequest)
+			return
+		}
+
+		nodeId, err := strconv.ParseUint(key[1:], 0, 64)
+		if err != nil {
+			log.Printf("Failed to convert ID for conf change (%v)\n", err)
+			http.Error(w, "Failed on POST", http.StatusBadRequest)
+			return
+		}
+
+		cc := raftpb.ConfChange{
+			Type:    raftpb.ConfChangeAddNode,
+			NodeID:  nodeId,
+			Context: url,
+		}
+		h.confChangeC <- cc
+
+		// As above, optimistic that raft will apply the conf change
+		w.WriteHeader(http.StatusNoContent)
 	default:
 	default:
 		w.Header().Set("Allow", "PUT")
 		w.Header().Set("Allow", "PUT")
 		w.Header().Add("Allow", "GET")
 		w.Header().Add("Allow", "GET")
+		w.Header().Add("Allow", "POST")
 		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
 		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
 	}
 	}
 }
 }
 
 
 // serveHttpKVAPI starts a key-value server with a GET/PUT API and listens.
 // serveHttpKVAPI starts a key-value server with a GET/PUT API and listens.
-func serveHttpKVAPI(port int, proposeC chan<- string, commitC <-chan *string, errorC <-chan error) {
+func serveHttpKVAPI(port int, proposeC chan<- string, confChangeC chan<- raftpb.ConfChange,
+	commitC <-chan *string, errorC <-chan error) {
+
 	srv := http.Server{
 	srv := http.Server{
-		Addr:    ":" + strconv.Itoa(port),
-		Handler: &httpKVAPI{newKVStore(proposeC, commitC, errorC)},
+		Addr: ":" + strconv.Itoa(port),
+		Handler: &httpKVAPI{
+			store:       newKVStore(proposeC, commitC, errorC),
+			confChangeC: confChangeC,
+		},
 	}
 	}
 	if err := srv.ListenAndServe(); err != nil {
 	if err := srv.ListenAndServe(); err != nil {
 		log.Fatal(err)
 		log.Fatal(err)

+ 3 - 2
contrib/raftexample/main.go

@@ -25,6 +25,7 @@ func main() {
 	cluster := flag.String("cluster", "http://127.0.0.1:9021", "comma separated cluster peers")
 	cluster := flag.String("cluster", "http://127.0.0.1:9021", "comma separated cluster peers")
 	id := flag.Int("id", 1, "node ID")
 	id := flag.Int("id", 1, "node ID")
 	kvport := flag.Int("port", 9121, "key-value server port")
 	kvport := flag.Int("port", 9121, "key-value server port")
+	join := flag.Bool("join", false, "join an existing cluster")
 	flag.Parse()
 	flag.Parse()
 
 
 	proposeC := make(chan string)
 	proposeC := make(chan string)
@@ -33,8 +34,8 @@ func main() {
 	defer close(confChangeC)
 	defer close(confChangeC)
 
 
 	// raft provides a commit stream for the proposals from the http api
 	// raft provides a commit stream for the proposals from the http api
-	commitC, errorC := newRaftNode(*id, strings.Split(*cluster, ","), proposeC, confChangeC)
+	commitC, errorC := newRaftNode(*id, strings.Split(*cluster, ","), *join, proposeC, confChangeC)
 
 
 	// the key-value http handler will propose updates to raft
 	// the key-value http handler will propose updates to raft
-	serveHttpKVAPI(*kvport, proposeC, commitC, errorC)
+	serveHttpKVAPI(*kvport, proposeC, confChangeC, commitC, errorC)
 }
 }

+ 8 - 2
contrib/raftexample/raft.go

@@ -43,6 +43,7 @@ type raftNode struct {
 
 
 	id     int      // client ID for raft session
 	id     int      // client ID for raft session
 	peers  []string // raft peer URLs
 	peers  []string // raft peer URLs
+	join   bool     // node is joining an existing cluster
 	waldir string   // path to WAL directory
 	waldir string   // path to WAL directory
 
 
 	// raft backing for the commit/error channel
 	// raft backing for the commit/error channel
@@ -60,7 +61,7 @@ type raftNode struct {
 // provided the proposal channel. All log entries are replayed over the
 // provided the proposal channel. All log entries are replayed over the
 // commit channel, followed by a nil message (to indicate the channel is
 // commit channel, followed by a nil message (to indicate the channel is
 // current), then new log entries. To shutdown, close proposeC and read errorC.
 // current), then new log entries. To shutdown, close proposeC and read errorC.
-func newRaftNode(id int, peers []string, proposeC <-chan string,
+func newRaftNode(id int, peers []string, join bool, proposeC <-chan string,
 	confChangeC <-chan raftpb.ConfChange) (<-chan *string, <-chan error) {
 	confChangeC <-chan raftpb.ConfChange) (<-chan *string, <-chan error) {
 
 
 	rc := &raftNode{
 	rc := &raftNode{
@@ -70,6 +71,7 @@ func newRaftNode(id int, peers []string, proposeC <-chan string,
 		errorC:      make(chan error),
 		errorC:      make(chan error),
 		id:          id,
 		id:          id,
 		peers:       peers,
 		peers:       peers,
+		join:        join,
 		waldir:      fmt.Sprintf("raftexample-%d", id),
 		waldir:      fmt.Sprintf("raftexample-%d", id),
 		raftStorage: raft.NewMemoryStorage(),
 		raftStorage: raft.NewMemoryStorage(),
 		stopc:       make(chan struct{}),
 		stopc:       make(chan struct{}),
@@ -186,7 +188,11 @@ func (rc *raftNode) startRaft() {
 	if oldwal {
 	if oldwal {
 		rc.node = raft.RestartNode(c)
 		rc.node = raft.RestartNode(c)
 	} else {
 	} else {
-		rc.node = raft.StartNode(c, rpeers)
+		startPeers := rpeers
+		if rc.join {
+			startPeers = nil
+		}
+		rc.node = raft.StartNode(c, startPeers)
 	}
 	}
 
 
 	ss := &stats.ServerStats{}
 	ss := &stats.ServerStats{}

+ 1 - 1
contrib/raftexample/raftexample_test.go

@@ -49,7 +49,7 @@ func newCluster(n int) *cluster {
 		os.RemoveAll(fmt.Sprintf("raftexample-%d", i+1))
 		os.RemoveAll(fmt.Sprintf("raftexample-%d", i+1))
 		clus.proposeC[i] = make(chan string, 1)
 		clus.proposeC[i] = make(chan string, 1)
 		clus.confChangeC[i] = make(chan raftpb.ConfChange, 1)
 		clus.confChangeC[i] = make(chan raftpb.ConfChange, 1)
-		clus.commitC[i], clus.errorC[i] = newRaftNode(i+1, clus.peers, clus.proposeC[i], clus.confChangeC[i])
+		clus.commitC[i], clus.errorC[i] = newRaftNode(i+1, clus.peers, false, clus.proposeC[i], clus.confChangeC[i])
 	}
 	}
 
 
 	return clus
 	return clus