Browse Source

Merge pull request #1245 from unihorn/155

main/raft: write addNode ConfChange entries in log when start raft
Yicheng Qin 11 years ago
parent
commit
3ca3c9ad4c
7 changed files with 51 additions and 23 deletions
  1. 1 0
      etcdserver/cluster.go
  2. 2 10
      etcdserver/cluster_test.go
  3. 1 0
      etcdserver/server.go
  4. 6 6
      etcdserver/server_test.go
  5. 8 0
      pkg/types/slice.go
  6. 14 0
      raft/node.go
  7. 19 7
      raft/node_test.go

+ 1 - 0
etcdserver/cluster.go

@@ -100,6 +100,7 @@ func (c Cluster) IDs() []int64 {
 	for _, m := range c {
 	for _, m := range c {
 		ids = append(ids, m.ID)
 		ids = append(ids, m.ID)
 	}
 	}
+	sort.Sort(types.Int64Slice(ids))
 	return ids
 	return ids
 }
 }
 
 

+ 2 - 10
etcdserver/cluster_test.go

@@ -2,7 +2,6 @@ package etcdserver
 
 
 import (
 import (
 	"reflect"
 	"reflect"
-	"sort"
 	"testing"
 	"testing"
 )
 )
 
 
@@ -201,12 +200,6 @@ func TestClusterSetBad(t *testing.T) {
 	}
 	}
 }
 }
 
 
-type int64slice []int64
-
-func (a int64slice) Len() int           { return len(a) }
-func (a int64slice) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
-func (a int64slice) Less(i, j int) bool { return a[i] < a[j] }
-
 func TestClusterIDs(t *testing.T) {
 func TestClusterIDs(t *testing.T) {
 	cs := Cluster{}
 	cs := Cluster{}
 	cs.AddSlice([]Member{
 	cs.AddSlice([]Member{
@@ -214,9 +207,8 @@ func TestClusterIDs(t *testing.T) {
 		{ID: 4},
 		{ID: 4},
 		{ID: 100},
 		{ID: 100},
 	})
 	})
-	w := int64slice([]int64{1, 4, 100})
-	g := int64slice(cs.IDs())
-	sort.Sort(g)
+	w := []int64{1, 4, 100}
+	g := cs.IDs()
 	if !reflect.DeepEqual(w, g) {
 	if !reflect.DeepEqual(w, g) {
 		t.Errorf("IDs=%+v, want %+v", g, w)
 		t.Errorf("IDs=%+v, want %+v", g, w)
 	}
 	}

+ 1 - 0
etcdserver/server.go

@@ -132,6 +132,7 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
 		if w, err = wal.Create(waldir); err != nil {
 		if w, err = wal.Create(waldir); err != nil {
 			log.Fatal(err)
 			log.Fatal(err)
 		}
 		}
+		// TODO: add context for PeerURLs
 		n = raft.StartNode(m.ID, cfg.Cluster.IDs(), 10, 1)
 		n = raft.StartNode(m.ID, cfg.Cluster.IDs(), 10, 1)
 	} else {
 	} else {
 		var index int64
 		var index int64

+ 6 - 6
etcdserver/server_test.go

@@ -712,7 +712,7 @@ func TestTriggerSnap(t *testing.T) {
 	}
 	}
 
 
 	s.start()
 	s.start()
-	for i := 0; int64(i) < s.snapCount; i++ {
+	for i := 0; int64(i) < s.snapCount-1; i++ {
 		s.Do(ctx, pb.Request{Method: "PUT", ID: 1})
 		s.Do(ctx, pb.Request{Method: "PUT", ID: 1})
 	}
 	}
 	time.Sleep(time.Millisecond)
 	time.Sleep(time.Millisecond)
@@ -720,12 +720,12 @@ func TestTriggerSnap(t *testing.T) {
 
 
 	gaction := p.Action()
 	gaction := p.Action()
 	// each operation is recorded as a Save
 	// each operation is recorded as a Save
-	// Nop + SnapCount * Puts + Cut + SaveSnap = Save + SnapCount * Save + Cut + SaveSnap
-	if len(gaction) != 3+int(s.snapCount) {
-		t.Fatalf("len(action) = %d, want %d", len(gaction), 3+int(s.snapCount))
+	// BootstrapConfig/Nop + (SnapCount - 1) * Puts + Cut + SaveSnap = Save + (SnapCount - 1) * Save + Cut + SaveSnap
+	if len(gaction) != 2+int(s.snapCount) {
+		t.Fatalf("len(action) = %d, want %d", len(gaction), 2+int(s.snapCount))
 	}
 	}
-	if !reflect.DeepEqual(gaction[12], action{name: "SaveSnap"}) {
-		t.Errorf("action = %s, want SaveSnap", gaction[12])
+	if !reflect.DeepEqual(gaction[11], action{name: "SaveSnap"}) {
+		t.Errorf("action = %s, want SaveSnap", gaction[11])
 	}
 	}
 }
 }
 
 

+ 8 - 0
pkg/types/slice.go

@@ -0,0 +1,8 @@
+package types
+
+// Int64Slice implements sort interface
+type Int64Slice []int64
+
+func (p Int64Slice) Len() int           { return len(p) }
+func (p Int64Slice) Less(i, j int) bool { return p[i] < p[j] }
+func (p Int64Slice) Swap(i, j int)      { p[i], p[j] = p[j], p[i] }

+ 14 - 0
raft/node.go

@@ -101,9 +101,23 @@ type Node interface {
 
 
 // StartNode returns a new Node given a unique raft id, a list of raft peers, and
 // StartNode returns a new Node given a unique raft id, a list of raft peers, and
 // the election and heartbeat timeouts in units of ticks.
 // the election and heartbeat timeouts in units of ticks.
+// It also builds ConfChangeAddNode entry for each peer and puts them at the head of the log.
 func StartNode(id int64, peers []int64, election, heartbeat int) Node {
 func StartNode(id int64, peers []int64, election, heartbeat int) Node {
 	n := newNode()
 	n := newNode()
 	r := newRaft(id, peers, election, heartbeat)
 	r := newRaft(id, peers, election, heartbeat)
+
+	ents := make([]pb.Entry, len(peers))
+	for i, peer := range peers {
+		cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer}
+		data, err := cc.Marshal()
+		if err != nil {
+			panic("unexpected marshal error")
+		}
+		ents[i] = pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: int64(i + 1), Data: data}
+	}
+	r.raftLog.append(0, ents...)
+	r.raftLog.committed = int64(len(ents))
+
 	go n.run(r)
 	go n.run(r)
 	return &n
 	return &n
 }
 }

+ 19 - 7
raft/node_test.go

@@ -149,17 +149,29 @@ func TestNode(t *testing.T) {
 	ctx, cancel := context.WithCancel(context.Background())
 	ctx, cancel := context.WithCancel(context.Background())
 	defer cancel()
 	defer cancel()
 
 
+	cc := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1}
+	ccdata, err := cc.Marshal()
+	if err != nil {
+		t.Fatalf("unexpected marshal error: %v", err)
+	}
 	wants := []Ready{
 	wants := []Ready{
 		{
 		{
-			SoftState:        &SoftState{Lead: 1, RaftState: StateLeader},
-			HardState:        raftpb.HardState{Term: 1, Commit: 1},
-			Entries:          []raftpb.Entry{{}, {Term: 1, Index: 1}},
-			CommittedEntries: []raftpb.Entry{{Term: 1, Index: 1}},
+			SoftState: &SoftState{Lead: 1, RaftState: StateLeader},
+			HardState: raftpb.HardState{Term: 1, Commit: 2},
+			Entries: []raftpb.Entry{
+				{},
+				{Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata},
+				{Term: 1, Index: 2},
+			},
+			CommittedEntries: []raftpb.Entry{
+				{Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata},
+				{Term: 1, Index: 2},
+			},
 		},
 		},
 		{
 		{
-			HardState:        raftpb.HardState{Term: 1, Commit: 2},
-			Entries:          []raftpb.Entry{{Term: 1, Index: 2, Data: []byte("foo")}},
-			CommittedEntries: []raftpb.Entry{{Term: 1, Index: 2, Data: []byte("foo")}},
+			HardState:        raftpb.HardState{Term: 1, Commit: 3},
+			Entries:          []raftpb.Entry{{Term: 1, Index: 3, Data: []byte("foo")}},
+			CommittedEntries: []raftpb.Entry{{Term: 1, Index: 3, Data: []byte("foo")}},
 		},
 		},
 	}
 	}