Browse Source

main/raft: write addNode ConfChange entries in log when start raft

Yicheng Qin 11 years ago
parent
commit
314d425718
5 changed files with 56 additions and 17 deletions
  1. 16 1
      etcdserver/server.go
  2. 6 6
      etcdserver/server_test.go
  3. 1 1
      raft/example_test.go
  4. 13 1
      raft/node.go
  5. 20 8
      raft/node_test.go

+ 16 - 1
etcdserver/server.go

@@ -8,6 +8,7 @@ import (
 	"net/http"
 	"os"
 	"path"
+	"sort"
 	"sync/atomic"
 	"time"
 
@@ -129,7 +130,14 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
 		if w, err = wal.Create(waldir); err != nil {
 			log.Fatal(err)
 		}
-		n = raft.StartNode(m.ID, cfg.Cluster.IDs(), 10, 1)
+		ids := cfg.Cluster.IDs()
+		sort.Sort(int64Slice(ids))
+		ccs := make([]raftpb.ConfChange, len(ids))
+		for i, id := range ids {
+			// TODO: add context for PeerURLs
+			ccs[i] = raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: id}
+		}
+		n = raft.StartNode(m.ID, cfg.Cluster.IDs(), 10, 1, ccs)
 	} else {
 		var index int64
 		snapshot, err := ss.Load()
@@ -552,3 +560,10 @@ func getBool(v *bool) (vv bool, set bool) {
 	}
 	return *v, true
 }
+
+// int64Slice implements sort interface
+type int64Slice []int64
+
+func (p int64Slice) Len() int           { return len(p) }
+func (p int64Slice) Less(i, j int) bool { return p[i] < p[j] }
+func (p int64Slice) Swap(i, j int)      { p[i], p[j] = p[j], p[i] }

+ 6 - 6
etcdserver/server_test.go

@@ -391,7 +391,7 @@ func testServer(t *testing.T, ns int64) {
 
 	for i := int64(0); i < ns; i++ {
 		id := i + 1
-		n := raft.StartNode(id, members, 10, 1)
+		n := raft.StartNode(id, members, 10, 1, nil)
 		tk := time.NewTicker(10 * time.Millisecond)
 		defer tk.Stop()
 		srv := &EtcdServer{
@@ -458,7 +458,7 @@ func TestDoProposal(t *testing.T) {
 
 	for i, tt := range tests {
 		ctx, _ := context.WithCancel(context.Background())
-		n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1)
+		n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1, nil)
 		st := &storeRecorder{}
 		tk := make(chan time.Time)
 		// this makes <-tk always successful, which accelerates internal clock
@@ -491,7 +491,7 @@ func TestDoProposal(t *testing.T) {
 func TestDoProposalCancelled(t *testing.T) {
 	ctx, cancel := context.WithCancel(context.Background())
 	// node cannot make any progress because there are two nodes
-	n := raft.StartNode(0xBAD0, []int64{0xBAD0, 0xBAD1}, 10, 1)
+	n := raft.StartNode(0xBAD0, []int64{0xBAD0, 0xBAD1}, 10, 1, nil)
 	st := &storeRecorder{}
 	wait := &waitRecorder{}
 	srv := &EtcdServer{
@@ -527,7 +527,7 @@ func TestDoProposalStopped(t *testing.T) {
 	ctx, cancel := context.WithCancel(context.Background())
 	defer cancel()
 	// node cannot make any progress because there are two nodes
-	n := raft.StartNode(0xBAD0, []int64{0xBAD0, 0xBAD1}, 10, 1)
+	n := raft.StartNode(0xBAD0, []int64{0xBAD0, 0xBAD1}, 10, 1, nil)
 	st := &storeRecorder{}
 	tk := make(chan time.Time)
 	// this makes <-tk always successful, which accelarates internal clock
@@ -668,7 +668,7 @@ func TestSyncTrigger(t *testing.T) {
 // snapshot should snapshot the store and cut the persistent
 // TODO: node.Compact is called... we need to make the node an interface
 func TestSnapshot(t *testing.T) {
-	n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1)
+	n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1, nil)
 	defer n.Stop()
 	st := &storeRecorder{}
 	p := &storageRecorder{}
@@ -699,7 +699,7 @@ func TestSnapshot(t *testing.T) {
 // Applied > SnapCount should trigger a SaveSnap event
 func TestTriggerSnap(t *testing.T) {
 	ctx := context.Background()
-	n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1)
+	n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1, nil)
 	n.Campaign(ctx)
 	st := &storeRecorder{}
 	p := &storageRecorder{}

+ 1 - 1
raft/example_test.go

@@ -10,7 +10,7 @@ func saveStateToDisk(st pb.HardState) {}
 func saveToDisk(ents []pb.Entry)      {}
 
 func Example_Node() {
-	n := StartNode(0, nil, 0, 0)
+	n := StartNode(0, nil, 0, 0, nil)
 
 	// stuff to n happens in other goroutines
 

+ 13 - 1
raft/node.go

@@ -101,9 +101,21 @@ type Node interface {
 
 // StartNode returns a new Node given a unique raft id, a list of raft peers, and
 // the election and heartbeat timeouts in units of ticks.
-func StartNode(id int64, peers []int64, election, heartbeat int) Node {
+// It also wraps ConfChanges in entry and puts them at the head of the log.
+func StartNode(id int64, peers []int64, election, heartbeat int, ccs []pb.ConfChange) Node {
 	n := newNode()
 	r := newRaft(id, peers, election, heartbeat)
+	ents := make([]pb.Entry, len(ccs))
+	for i, cc := range ccs {
+		data, err := cc.Marshal()
+		if err != nil {
+			panic("unexpected marshal error")
+		}
+		ents[i] = pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: int64(i + 1), Data: data}
+	}
+	if !r.raftLog.maybeAppend(0, 0, int64(len(ccs)), ents...) {
+		panic("unexpected append failure")
+	}
 	go n.run(r)
 	return &n
 }

+ 20 - 8
raft/node_test.go

@@ -149,21 +149,33 @@ func TestNode(t *testing.T) {
 	ctx, cancel := context.WithCancel(context.Background())
 	defer cancel()
 
+	cc := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1}
+	ccdata, err := cc.Marshal()
+	if err != nil {
+		t.Fatalf("unexpected marshal error: %v", err)
+	}
 	wants := []Ready{
 		{
-			SoftState:        &SoftState{Lead: 1, RaftState: StateLeader},
-			HardState:        raftpb.HardState{Term: 1, Commit: 1},
-			Entries:          []raftpb.Entry{{}, {Term: 1, Index: 1}},
-			CommittedEntries: []raftpb.Entry{{Term: 1, Index: 1}},
+			SoftState: &SoftState{Lead: 1, RaftState: StateLeader},
+			HardState: raftpb.HardState{Term: 1, Commit: 2},
+			Entries: []raftpb.Entry{
+				{},
+				{Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata},
+				{Term: 1, Index: 2},
+			},
+			CommittedEntries: []raftpb.Entry{
+				{Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata},
+				{Term: 1, Index: 2},
+			},
 		},
 		{
-			HardState:        raftpb.HardState{Term: 1, Commit: 2},
-			Entries:          []raftpb.Entry{{Term: 1, Index: 2, Data: []byte("foo")}},
-			CommittedEntries: []raftpb.Entry{{Term: 1, Index: 2, Data: []byte("foo")}},
+			HardState:        raftpb.HardState{Term: 1, Commit: 3},
+			Entries:          []raftpb.Entry{{Term: 1, Index: 3, Data: []byte("foo")}},
+			CommittedEntries: []raftpb.Entry{{Term: 1, Index: 3, Data: []byte("foo")}},
 		},
 	}
 
-	n := StartNode(1, []int64{1}, 0, 0)
+	n := StartNode(1, []int64{1}, 0, 0, []raftpb.ConfChange{cc})
 	n.Campaign(ctx)
 	if g := <-n.Ready(); !reflect.DeepEqual(g, wants[0]) {
 		t.Errorf("#%d: g = %+v,\n             w   %+v", 1, g, wants[0])