Browse Source

etcdserver: support newly-join member bootstrap

Yicheng Qin 11 years ago
parent
commit
08593bcdf6
5 changed files with 164 additions and 6 deletions
  1. 37 0
      etcdserver/cluster.go
  2. 3 1
      etcdserver/cluster_state.go
  3. 70 0
      etcdserver/cluster_test.go
  4. 8 0
      etcdserver/member.go
  5. 46 5
      etcdserver/server.go

+ 37 - 0
etcdserver/cluster.go

@@ -24,6 +24,7 @@ import (
 	"log"
 	"net/url"
 	"path"
+	"reflect"
 	"sort"
 	"strings"
 
@@ -118,6 +119,15 @@ func NewClusterFromStore(name string, st store.Store) *Cluster {
 	return c
 }
 
+func NewClusterFromMembers(name string, id uint64, membs []*Member) *Cluster {
+	c := newCluster(name)
+	c.id = id
+	for _, m := range membs {
+		c.members[m.ID] = m
+	}
+	return c
+}
+
 func newCluster(name string) *Cluster {
 	return &Cluster{
 		name:    name,
@@ -214,6 +224,33 @@ func (c Cluster) String() string {
 	return strings.Join(sl, ",")
 }
 
+// ValidateAndAssignIDs validates the given members by matching their PeerURLs
+// with the existing members in the cluster. If the validation succeeds, it
+// assigns the IDs from the given members to the existing members in the
+// cluster. If the validation fails, an error will be returned.
+func (c *Cluster) ValidateAndAssignIDs(membs []*Member) error {
+	if len(c.members) != len(membs) {
+		return fmt.Errorf("cannot update %v from %v because the member count is unequal", c.members, membs)
+	}
+	omembs := make([]*Member, 0)
+	for _, m := range c.members {
+		omembs = append(omembs, m)
+	}
+	sort.Sort(SortableMemberSliceByPeerURLs(omembs))
+	sort.Sort(SortableMemberSliceByPeerURLs(membs))
+	for i := range omembs {
+		if !reflect.DeepEqual(omembs[i].PeerURLs, membs[i].PeerURLs) {
+			return fmt.Errorf("unmatched member while checking PeerURLs")
+		}
+		omembs[i].ID = membs[i].ID
+	}
+	c.members = make(map[uint64]*Member)
+	for _, m := range omembs {
+		c.members[m.ID] = m
+	}
+	return nil
+}
+
 func (c *Cluster) genID() {
 	mIDs := c.MemberIDs()
 	b := make([]byte, 8*len(mIDs))

+ 3 - 1
etcdserver/cluster_state.go

@@ -21,12 +21,14 @@ import (
 )
 
 const (
-	ClusterStateValueNew = "new"
+	ClusterStateValueNew      = "new"
+	ClusterStateValueExisting = "existing"
 )
 
 var (
 	ClusterStateValues = []string{
 		ClusterStateValueNew,
+		ClusterStateValueExisting,
 	}
 )
 

+ 70 - 0
etcdserver/cluster_test.go

@@ -282,6 +282,76 @@ func TestClusterClientURLs(t *testing.T) {
 	}
 }
 
+func TestClusterValidateAndAssignIDsBad(t *testing.T) {
+	tests := []struct {
+		clmembs []Member
+		membs   []*Member
+	}{
+		{
+			// unmatched length
+			[]Member{
+				newTestMember(1, []string{"http://127.0.0.1:2379"}, "", nil),
+			},
+			[]*Member{},
+		},
+		{
+			// unmatched peer urls
+			[]Member{
+				newTestMember(1, []string{"http://127.0.0.1:2379"}, "", nil),
+			},
+			[]*Member{
+				newTestMemberp(1, []string{"http://127.0.0.1:4001"}, "", nil),
+			},
+		},
+		{
+			// unmatched peer urls
+			[]Member{
+				newTestMember(1, []string{"http://127.0.0.1:2379"}, "", nil),
+				newTestMember(2, []string{"http://127.0.0.2:2379"}, "", nil),
+			},
+			[]*Member{
+				newTestMemberp(1, []string{"http://127.0.0.1:2379"}, "", nil),
+				newTestMemberp(2, []string{"http://127.0.0.2:4001"}, "", nil),
+			},
+		},
+	}
+	for i, tt := range tests {
+		cl := newTestCluster(tt.clmembs)
+		if err := cl.ValidateAndAssignIDs(tt.membs); err == nil {
+			t.Errorf("#%d: unexpected update success", i)
+		}
+	}
+}
+
+func TestClusterValidateAndAssignIDs(t *testing.T) {
+	tests := []struct {
+		clmembs []Member
+		membs   []*Member
+		wids    []uint64
+	}{
+		{
+			[]Member{
+				newTestMember(1, []string{"http://127.0.0.1:2379"}, "", nil),
+				newTestMember(2, []string{"http://127.0.0.2:2379"}, "", nil),
+			},
+			[]*Member{
+				newTestMemberp(3, []string{"http://127.0.0.1:2379"}, "", nil),
+				newTestMemberp(4, []string{"http://127.0.0.2:2379"}, "", nil),
+			},
+			[]uint64{3, 4},
+		},
+	}
+	for i, tt := range tests {
+		cl := newTestCluster(tt.clmembs)
+		if err := cl.ValidateAndAssignIDs(tt.membs); err != nil {
+			t.Errorf("#%d: unexpect update error: %v", i, err)
+		}
+		if !reflect.DeepEqual(cl.MemberIDs(), tt.wids) {
+			t.Errorf("#%d: ids = %v, want %v", i, cl.MemberIDs(), tt.wids)
+		}
+	}
+}
+
 func TestClusterGenID(t *testing.T) {
 	cs := newTestCluster([]Member{
 		newTestMember(1, nil, "", nil),

+ 8 - 0
etcdserver/member.go

@@ -96,3 +96,11 @@ func parseMemberID(key string) uint64 {
 func removedMemberStoreKey(id uint64) string {
 	return path.Join(storeRemovedMembersPrefix, idAsHex(id))
 }
+
+type SortableMemberSliceByPeerURLs []*Member
+
+func (p SortableMemberSliceByPeerURLs) Len() int { return len(p) }
+func (p SortableMemberSliceByPeerURLs) Less(i, j int) bool {
+	return p[i].PeerURLs[0] < p[j].PeerURLs[0]
+}
+func (p SortableMemberSliceByPeerURLs) Swap(i, j int) { p[i], p[j] = p[j], p[i] }

+ 46 - 5
etcdserver/server.go

@@ -19,8 +19,10 @@ package etcdserver
 import (
 	"encoding/json"
 	"errors"
+	"io/ioutil"
 	"log"
 	"math/rand"
+	"net/http"
 	"os"
 	"path"
 	"strconv"
@@ -176,7 +178,17 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
 	var w *wal.WAL
 	var n raft.Node
 	var id uint64
-	if !wal.Exist(cfg.WALDir()) {
+	haveWAL := wal.Exist(cfg.WALDir())
+	switch {
+	case !haveWAL && cfg.ClusterState == ClusterStateValueExisting:
+		cl := getClusterFromPeers(cfg.Cluster.PeerURLs())
+		if err := cfg.Cluster.ValidateAndAssignIDs(cl.Members()); err != nil {
+			log.Fatalf("etcdserver: %v", err)
+		}
+		cfg.Cluster.SetID(cl.id)
+		cfg.Cluster.SetStore(st)
+		id, n, w = startNode(cfg, nil)
+	case !haveWAL && cfg.ClusterState == ClusterStateValueNew:
 		if err := cfg.VerifyBootstrapConfig(); err != nil {
 			log.Fatalf("etcdserver: %v", err)
 		}
@@ -195,8 +207,8 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
 			}
 		}
 		cfg.Cluster.SetStore(st)
-		id, n, w = startNode(cfg)
-	} else {
+		id, n, w = startNode(cfg, cfg.Cluster.MemberIDs())
+	case haveWAL:
 		if cfg.ShouldDiscover() {
 			log.Printf("etcdserver: warn: ignoring discovery: etcd has already been initialized and has a valid log in %q", cfg.WALDir())
 		}
@@ -212,6 +224,8 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
 		}
 		cfg.Cluster = NewClusterFromStore(cfg.Cluster.name, st)
 		id, n, w = restartNode(cfg, index, snapshot)
+	default:
+		log.Fatalf("etcdserver: unsupported bootstrap config")
 	}
 
 	sstats := &stats.ServerStats{
@@ -642,7 +656,35 @@ func (s *EtcdServer) snapshot(snapi uint64, snapnodes []uint64) {
 	s.storage.Cut()
 }
 
-func startNode(cfg *ServerConfig) (id uint64, n raft.Node, w *wal.WAL) {
+func getClusterFromPeers(urls []string) *Cluster {
+	for _, u := range urls {
+		resp, err := http.Get(u + "/members")
+		if err != nil {
+			log.Printf("etcdserver: get /members on %s: %v", u, err)
+			continue
+		}
+		b, err := ioutil.ReadAll(resp.Body)
+		if err != nil {
+			log.Printf("etcdserver: read body error: %v", err)
+			continue
+		}
+		var membs []*Member
+		if err := json.Unmarshal(b, &membs); err != nil {
+			log.Printf("etcdserver: unmarshal body error: %v", err)
+			continue
+		}
+		id, err := strconv.ParseUint(resp.Header.Get("X-Etcd-Cluster-ID"), 16, 64)
+		if err != nil {
+			log.Printf("etcdserver: parse uint error: %v", err)
+			continue
+		}
+		return NewClusterFromMembers("", id, membs)
+	}
+	log.Fatalf("etcdserver: could not retrieve cluster information from the given urls")
+	return nil
+}
+
+func startNode(cfg *ServerConfig, ids []uint64) (id uint64, n raft.Node, w *wal.WAL) {
 	var err error
 	// TODO: remove the discoveryURL when it becomes part of the source for
 	// generating nodeID.
@@ -651,7 +693,6 @@ func startNode(cfg *ServerConfig) (id uint64, n raft.Node, w *wal.WAL) {
 	if w, err = wal.Create(cfg.WALDir(), metadata); err != nil {
 		log.Fatal(err)
 	}
-	ids := cfg.Cluster.MemberIDs()
 	peers := make([]raft.Peer, len(ids))
 	for i, id := range ids {
 		ctx, err := json.Marshal((*cfg.Cluster).Member(id))