Browse Source

Merge pull request #1647 from xiangli-cmu/force_cluster

etcdserver: force new cluster
Xiang Li 11 years ago
parent
commit
bf47fe7cac
5 changed files with 277 additions and 30 deletions
  1. 18 16
      etcdmain/etcd.go
  2. 10 9
      etcdserver/config.go
  3. 116 0
      etcdserver/force_cluster.go
  4. 114 0
      etcdserver/force_cluster_test.go
  5. 19 5
      etcdserver/server.go

+ 18 - 16
etcdmain/etcd.go

@@ -53,13 +53,14 @@ const (
 )
 
 var (
-	fs           = flag.NewFlagSet("etcd", flag.ContinueOnError)
-	name         = fs.String("name", "default", "Unique human-readable name for this node")
-	dir          = fs.String("data-dir", "", "Path to the data directory")
-	durl         = fs.String("discovery", "", "Discovery service used to bootstrap the cluster")
-	dproxy       = fs.String("discovery-proxy", "", "HTTP proxy to use for traffic to discovery service")
-	snapCount    = fs.Uint64("snapshot-count", etcdserver.DefaultSnapCount, "Number of committed transactions to trigger a snapshot")
-	printVersion = fs.Bool("version", false, "Print the version and exit")
+	fs              = flag.NewFlagSet("etcd", flag.ContinueOnError)
+	name            = fs.String("name", "default", "Unique human-readable name for this node")
+	dir             = fs.String("data-dir", "", "Path to the data directory")
+	durl            = fs.String("discovery", "", "Discovery service used to bootstrap the cluster")
+	dproxy          = fs.String("discovery-proxy", "", "HTTP proxy to use for traffic to discovery service")
+	snapCount       = fs.Uint64("snapshot-count", etcdserver.DefaultSnapCount, "Number of committed transactions to trigger a snapshot")
+	printVersion    = fs.Bool("version", false, "Print the version and exit")
+	forceNewCluster = fs.Bool("force-new-cluster", false, "Force to create a new one member cluster")
 
 	initialCluster      = fs.String("initial-cluster", "default=http://localhost:2380,default=http://localhost:7001", "Initial cluster configuration for bootstrapping")
 	initialClusterToken = fs.String("initial-cluster-token", "etcd-cluster", "Initial cluster token for the etcd cluster during bootstrap")
@@ -262,15 +263,16 @@ func startEtcd() error {
 	}
 
 	cfg := &etcdserver.ServerConfig{
-		Name:           *name,
-		ClientURLs:     acurls,
-		DataDir:        *dir,
-		SnapCount:      *snapCount,
-		Cluster:        cls,
-		DiscoveryURL:   *durl,
-		DiscoveryProxy: *dproxy,
-		NewCluster:     clusterStateFlag.String() == clusterStateFlagNew,
-		Transport:      pt,
+		Name:            *name,
+		ClientURLs:      acurls,
+		DataDir:         *dir,
+		SnapCount:       *snapCount,
+		Cluster:         cls,
+		DiscoveryURL:    *durl,
+		DiscoveryProxy:  *dproxy,
+		NewCluster:      clusterStateFlag.String() == clusterStateFlagNew,
+		Transport:       pt,
+		ForceNewCluster: *forceNewCluster,
 	}
 	var s *etcdserver.EtcdServer
 	s, err = etcdserver.NewServer(cfg)

+ 10 - 9
etcdserver/config.go

@@ -27,15 +27,16 @@ import (
 
 // ServerConfig holds the configuration of etcd as taken from the command line or discovery.
 type ServerConfig struct {
-	Name           string
-	DiscoveryURL   string
-	DiscoveryProxy string
-	ClientURLs     types.URLs
-	DataDir        string
-	SnapCount      uint64
-	Cluster        *Cluster
-	NewCluster     bool
-	Transport      *http.Transport
+	Name            string
+	DiscoveryURL    string
+	DiscoveryProxy  string
+	ClientURLs      types.URLs
+	DataDir         string
+	SnapCount       uint64
+	Cluster         *Cluster
+	NewCluster      bool
+	Transport       *http.Transport
+	ForceNewCluster bool
 }
 
 // VerifyBootstrapConfig sanity-checks the initial config and returns an error

+ 116 - 0
etcdserver/force_cluster.go

@@ -0,0 +1,116 @@
+/*
+   Copyright 2014 CoreOS, Inc.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+package etcdserver
+
+import (
+	"log"
+
+	"github.com/coreos/etcd/pkg/pbutil"
+	"github.com/coreos/etcd/pkg/types"
+	"github.com/coreos/etcd/raft"
+	"github.com/coreos/etcd/raft/raftpb"
+	"github.com/coreos/etcd/wal"
+)
+
+func restartAsStandaloneNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (id types.ID, n raft.Node, w *wal.WAL) {
+	var err error
+	if w, err = wal.OpenAtIndex(cfg.WALDir(), index); err != nil {
+		log.Fatalf("etcdserver: open wal error: %v", err)
+	}
+	id, cid, st, ents, err := readWAL(w, index)
+	if err != nil {
+		log.Fatalf("etcdserver: read wal error: %v", err)
+	}
+	cfg.Cluster.SetID(cid)
+
+	// discard the previously uncommitted entries
+	if len(ents) != 0 {
+		ents = ents[:st.Commit+1]
+	}
+
+	// force append the configuration change entries
+	toAppEnts := createConfigChangeEnts(getIDset(snapshot, ents), uint64(id), st.Term, st.Commit)
+	ents = append(ents, toAppEnts...)
+
+	// force commit newly appended entries
+	for _, e := range toAppEnts {
+		err := w.SaveEntry(&e)
+		if err != nil {
+			log.Fatalf("etcdserver: %v", err)
+		}
+	}
+	if len(ents) != 0 {
+		st.Commit = ents[len(ents)-1].Index
+	}
+
+	log.Printf("etcdserver: forcing restart of member %s in cluster %s at commit index %d", id, cfg.Cluster.ID(), st.Commit)
+	n = raft.RestartNode(uint64(id), 10, 1, snapshot, st, ents)
+	return
+}
+
+// getIDset returns a set of IDs included in the given snapshot and the entries.
+// The given snapshot contians a list of IDs.
+// The given entries might contain two kinds of ID related entry.
+// If the entry type is Add, the contained ID will be added into the set.
+// If the entry type is Remove, the contained ID will be removed from the set.
+func getIDset(snap *raftpb.Snapshot, ents []raftpb.Entry) map[uint64]bool {
+	ids := make(map[uint64]bool)
+	if snap != nil {
+		for _, id := range snap.Nodes {
+			ids[id] = true
+		}
+	}
+	for _, e := range ents {
+		if e.Type != raftpb.EntryConfChange {
+			continue
+		}
+		var cc raftpb.ConfChange
+		pbutil.MustUnmarshal(&cc, e.Data)
+		switch cc.Type {
+		case raftpb.ConfChangeAddNode:
+			ids[cc.NodeID] = true
+		case raftpb.ConfChangeRemoveNode:
+			delete(ids, cc.NodeID)
+		default:
+			log.Panicf("ConfChange Type should be either ConfChangeAddNode or ConfChangeRemoveNode!")
+		}
+	}
+	return ids
+}
+
+func createConfigChangeEnts(ids map[uint64]bool, self uint64, term, index uint64) []raftpb.Entry {
+	ents := make([]raftpb.Entry, 0)
+	next := index + 1
+	for id := range ids {
+		if id == self {
+			continue
+		}
+		cc := &raftpb.ConfChange{
+			Type:   raftpb.ConfChangeRemoveNode,
+			NodeID: id,
+		}
+		e := raftpb.Entry{
+			Type:  raftpb.EntryConfChange,
+			Data:  pbutil.MustMarshal(cc),
+			Term:  term,
+			Index: next,
+		}
+		ents = append(ents, e)
+		next++
+	}
+	return ents
+}

+ 114 - 0
etcdserver/force_cluster_test.go

@@ -0,0 +1,114 @@
+/*
+   Copyright 2014 CoreOS, Inc.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+package etcdserver
+
+import (
+	"reflect"
+	"testing"
+
+	"github.com/coreos/etcd/pkg/pbutil"
+	"github.com/coreos/etcd/raft/raftpb"
+)
+
+func TestGetIDset(t *testing.T) {
+	addcc := &raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 2}
+	addEntry := raftpb.Entry{Type: raftpb.EntryConfChange, Data: pbutil.MustMarshal(addcc)}
+	removecc := &raftpb.ConfChange{Type: raftpb.ConfChangeRemoveNode, NodeID: 2}
+	removeEntry := raftpb.Entry{Type: raftpb.EntryConfChange, Data: pbutil.MustMarshal(removecc)}
+	normalEntry := raftpb.Entry{Type: raftpb.EntryNormal}
+
+	tests := []struct {
+		snap *raftpb.Snapshot
+		ents []raftpb.Entry
+
+		widSet map[uint64]bool
+	}{
+		{nil, []raftpb.Entry{}, map[uint64]bool{}},
+		{&raftpb.Snapshot{Nodes: []uint64{1}}, []raftpb.Entry{}, map[uint64]bool{1: true}},
+		{&raftpb.Snapshot{Nodes: []uint64{1}}, []raftpb.Entry{addEntry}, map[uint64]bool{1: true, 2: true}},
+		{&raftpb.Snapshot{Nodes: []uint64{1}}, []raftpb.Entry{addEntry, removeEntry}, map[uint64]bool{1: true}},
+		{&raftpb.Snapshot{Nodes: []uint64{1}}, []raftpb.Entry{addEntry, normalEntry}, map[uint64]bool{1: true, 2: true}},
+		{&raftpb.Snapshot{Nodes: []uint64{1}}, []raftpb.Entry{addEntry, removeEntry, normalEntry}, map[uint64]bool{1: true}},
+	}
+
+	for i, tt := range tests {
+		idSet := getIDset(tt.snap, tt.ents)
+		if !reflect.DeepEqual(idSet, tt.widSet) {
+			t.Errorf("#%d: idset = %v, want %v", i, idSet, tt.widSet)
+		}
+	}
+}
+
+func TestCreateConfigChangeEnts(t *testing.T) {
+	removecc2 := &raftpb.ConfChange{Type: raftpb.ConfChangeRemoveNode, NodeID: 2}
+	removecc3 := &raftpb.ConfChange{Type: raftpb.ConfChangeRemoveNode, NodeID: 3}
+	tests := []struct {
+		ids         map[uint64]bool
+		self        uint64
+		term, index uint64
+
+		wents []raftpb.Entry
+	}{
+		{
+			map[uint64]bool{1: true},
+			1,
+			1, 1,
+
+			[]raftpb.Entry{},
+		},
+		{
+			map[uint64]bool{1: true, 2: true},
+			1,
+			1, 1,
+
+			[]raftpb.Entry{{Term: 1, Index: 2, Type: raftpb.EntryConfChange, Data: pbutil.MustMarshal(removecc2)}},
+		},
+		{
+			map[uint64]bool{1: true, 2: true},
+			1,
+			2, 2,
+
+			[]raftpb.Entry{{Term: 2, Index: 3, Type: raftpb.EntryConfChange, Data: pbutil.MustMarshal(removecc2)}},
+		},
+		{
+			map[uint64]bool{1: true, 2: true, 3: true},
+			1,
+			2, 2,
+
+			[]raftpb.Entry{
+				{Term: 2, Index: 3, Type: raftpb.EntryConfChange, Data: pbutil.MustMarshal(removecc2)},
+				{Term: 2, Index: 4, Type: raftpb.EntryConfChange, Data: pbutil.MustMarshal(removecc3)},
+			},
+		},
+		{
+			map[uint64]bool{2: true, 3: true},
+			2,
+			2, 2,
+
+			[]raftpb.Entry{
+				{Term: 2, Index: 3, Type: raftpb.EntryConfChange, Data: pbutil.MustMarshal(removecc3)},
+			},
+		},
+	}
+
+	for i, tt := range tests {
+		gents := createConfigChangeEnts(tt.ids, tt.self, tt.term, tt.index)
+		if !reflect.DeepEqual(gents, tt.wents) {
+			t.Errorf("#%d: ents = %v, want %v", i, gents, tt.wents)
+		}
+	}
+}

+ 19 - 5
etcdserver/server.go

@@ -235,7 +235,11 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
 			index = snapshot.Index
 		}
 		cfg.Cluster = NewClusterFromStore(cfg.Cluster.token, st)
-		id, n, w = restartNode(cfg, index, snapshot)
+		if !cfg.ForceNewCluster {
+			id, n, w = restartNode(cfg, index, snapshot)
+		} else {
+			id, n, w = restartAsStandaloneNode(cfg, index, snapshot)
+		}
 	default:
 		return nil, fmt.Errorf("unsupported bootstrap config")
 	}
@@ -745,17 +749,27 @@ func restartNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (id
 	if w, err = wal.OpenAtIndex(cfg.WALDir(), index); err != nil {
 		log.Fatalf("etcdserver: open wal error: %v", err)
 	}
-	wmetadata, st, ents, err := w.ReadAll()
+	id, clusterID, st, ents, err := readWAL(w, index)
 	if err != nil {
 		log.Fatalf("etcdserver: read wal error: %v", err)
 	}
+	cfg.Cluster.SetID(clusterID)
+	log.Printf("etcdserver: restart member %s in cluster %s at commit index %d", id, cfg.Cluster.ID(), st.Commit)
+	n = raft.RestartNode(uint64(id), 10, 1, snapshot, st, ents)
+	return
+}
+
+func readWAL(w *wal.WAL, index uint64) (id, cid types.ID, st raftpb.HardState, ents []raftpb.Entry, err error) {
+	var wmetadata []byte
+	wmetadata, st, ents, err = w.ReadAll()
+	if err != nil {
+		return
+	}
 
 	var metadata pb.Metadata
 	pbutil.MustUnmarshal(&metadata, wmetadata)
 	id = types.ID(metadata.NodeID)
-	cfg.Cluster.SetID(types.ID(metadata.ClusterID))
-	log.Printf("etcdserver: restart member %s in cluster %s at commit index %d", id, cfg.Cluster.ID(), st.Commit)
-	n = raft.RestartNode(uint64(id), 10, 1, snapshot, st, ents)
+	cid = types.ID(metadata.ClusterID)
 	return
 }