Browse Source

Merge pull request #4476 from heyitsanthony/fix-raftexample-restart

contrib/raftexample: fix restart path
Anthony Romano 10 years ago
parent
commit
2a842eb049
2 changed files with 34 additions and 15 deletions
  1. 9 0
      contrib/raftexample/httpapi.go
  2. 25 15
      contrib/raftexample/raft.go

+ 9 - 0
contrib/raftexample/httpapi.go

@@ -18,6 +18,7 @@ import (
 	"io/ioutil"
 	"io/ioutil"
 	"log"
 	"log"
 	"net/http"
 	"net/http"
+	"os"
 	"strconv"
 	"strconv"
 
 
 	"github.com/coreos/etcd/raft/raftpb"
 	"github.com/coreos/etcd/raft/raftpb"
@@ -104,6 +105,14 @@ func (h *httpKVAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 func serveHttpKVAPI(port int, proposeC chan<- string, confChangeC chan<- raftpb.ConfChange,
 func serveHttpKVAPI(port int, proposeC chan<- string, confChangeC chan<- raftpb.ConfChange,
 	commitC <-chan *string, errorC <-chan error) {
 	commitC <-chan *string, errorC <-chan error) {
 
 
+	// exit when raft goes down
+	go func() {
+		if err, ok := <-errorC; ok {
+			log.Fatal(err)
+		}
+		os.Exit(0)
+	}()
+
 	srv := http.Server{
 	srv := http.Server{
 		Addr: ":" + strconv.Itoa(port),
 		Addr: ":" + strconv.Itoa(port),
 		Handler: &httpKVAPI{
 		Handler: &httpKVAPI{

+ 25 - 15
contrib/raftexample/raft.go

@@ -41,10 +41,11 @@ type raftNode struct {
 	commitC     chan *string             // entries committed to log (k,v)
 	commitC     chan *string             // entries committed to log (k,v)
 	errorC      chan error               // errors from raft session
 	errorC      chan error               // errors from raft session
 
 
-	id     int      // client ID for raft session
-	peers  []string // raft peer URLs
-	join   bool     // node is joining an existing cluster
-	waldir string   // path to WAL directory
+	id        int      // client ID for raft session
+	peers     []string // raft peer URLs
+	join      bool     // node is joining an existing cluster
+	waldir    string   // path to WAL directory
+	lastIndex uint64   // index of log at start
 
 
 	// raft backing for the commit/error channel
 	// raft backing for the commit/error channel
 	node        raft.Node
 	node        raft.Node
@@ -90,8 +91,8 @@ func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool {
 		switch ents[i].Type {
 		switch ents[i].Type {
 		case raftpb.EntryNormal:
 		case raftpb.EntryNormal:
 			if len(ents[i].Data) == 0 {
 			if len(ents[i].Data) == 0 {
-				// ignore conf changes and empty messages
-				continue
+				// ignore empty messages
+				break
 			}
 			}
 			s := string(ents[i].Data)
 			s := string(ents[i].Data)
 			select {
 			select {
@@ -103,7 +104,6 @@ func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool {
 		case raftpb.EntryConfChange:
 		case raftpb.EntryConfChange:
 			var cc raftpb.ConfChange
 			var cc raftpb.ConfChange
 			cc.Unmarshal(ents[i].Data)
 			cc.Unmarshal(ents[i].Data)
-
 			rc.node.ApplyConfChange(cc)
 			rc.node.ApplyConfChange(cc)
 			switch cc.Type {
 			switch cc.Type {
 			case raftpb.ConfChangeAddNode:
 			case raftpb.ConfChangeAddNode:
@@ -118,6 +118,15 @@ func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool {
 				rc.transport.RemovePeer(types.ID(cc.NodeID))
 				rc.transport.RemovePeer(types.ID(cc.NodeID))
 			}
 			}
 		}
 		}
+
+		// special nil commit to signal replay has finished
+		if ents[i].Index == rc.lastIndex {
+			select {
+			case rc.commitC <- nil:
+			case <-rc.stopc:
+				return false
+			}
+		}
 	}
 	}
 	return true
 	return true
 }
 }
@@ -144,19 +153,22 @@ func (rc *raftNode) openWAL() *wal.WAL {
 	return w
 	return w
 }
 }
 
 
-// replayWAL replays WAL entries into the raft instance and the commit
-// channel and returns an appendable WAL.
+// replayWAL replays WAL entries into the raft instance.
 func (rc *raftNode) replayWAL() *wal.WAL {
 func (rc *raftNode) replayWAL() *wal.WAL {
 	w := rc.openWAL()
 	w := rc.openWAL()
-	_, _, ents, err := w.ReadAll()
+	_, st, ents, err := w.ReadAll()
 	if err != nil {
 	if err != nil {
 		log.Fatalf("raftexample: failed to read WAL (%v)", err)
 		log.Fatalf("raftexample: failed to read WAL (%v)", err)
 	}
 	}
 	// append to storage so raft starts at the right place in log
 	// append to storage so raft starts at the right place in log
 	rc.raftStorage.Append(ents)
 	rc.raftStorage.Append(ents)
-	rc.publishEntries(ents)
-	// send nil value so client knows commit channel is current
-	rc.commitC <- nil
+	// send nil once lastIndex is published so client knows commit channel is current
+	if len(ents) > 0 {
+		rc.lastIndex = ents[len(ents)-1].Index
+	} else {
+		rc.commitC <- nil
+	}
+	rc.raftStorage.SetHardState(st)
 	return w
 	return w
 }
 }
 
 
@@ -224,8 +236,6 @@ func (rc *raftNode) stop() {
 	close(rc.commitC)
 	close(rc.commitC)
 	close(rc.errorC)
 	close(rc.errorC)
 	rc.node.Stop()
 	rc.node.Stop()
-
-	os.Exit(0)
 }
 }
 
 
 func (rc *raftNode) stopHTTP() {
 func (rc *raftNode) stopHTTP() {