Ver Fonte

Merge pull request #2008 from xiang90/server_clean

etcdserver: cleanup server.go
Xiang Li há 11 anos atrás
pai
commit
e056e96ad5
2 ficheiros alterados com 79 adições e 77 exclusões
  1. 58 75
      etcdserver/server.go
  2. 21 2
      etcdserver/storage.go

+ 58 - 75
etcdserver/server.go

@@ -198,10 +198,10 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
 		if err := cfg.VerifyBootstrapConfig(); err != nil {
 			return nil, err
 		}
-		if err := checkClientURLsEmptyFromPeers(cfg.Cluster, cfg.Name); err != nil {
-			return nil, err
-		}
 		m := cfg.Cluster.MemberByName(cfg.Name)
+		if isBootstrapped(cfg.Cluster, cfg.Name) {
+			return nil, fmt.Errorf("member %s has already been bootstrapped", m.ID)
+		}
 		if cfg.ShouldDiscover() {
 			s, err := discovery.JoinCluster(cfg.DiscoveryURL, cfg.DiscoveryProxy, m.ID, cfg.Cluster.String())
 			if err != nil {
@@ -216,7 +216,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
 		id, n, s, w = startNode(cfg, cfg.Cluster.MemberIDs())
 	case haveWAL:
 		if walVersion != wal.WALv0_5 {
-			if err := UpgradeWAL(cfg, walVersion); err != nil {
+			if err := upgradeWAL(cfg, walVersion); err != nil {
 				return nil, err
 			}
 		}
@@ -837,24 +837,68 @@ func (s *EtcdServer) ResumeSending() {
 	hub.resume()
 }
 
-// checkClientURLsEmptyFromPeers does its best to get the cluster from peers,
-// and if this succeeds, checks that the member of the given id exists in the
-// cluster, and its ClientURLs is empty.
-func checkClientURLsEmptyFromPeers(cl *Cluster, name string) error {
-	us := getOtherPeerURLs(cl, name)
+func startNode(cfg *ServerConfig, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) {
+	var err error
+	member := cfg.Cluster.MemberByName(cfg.Name)
+	metadata := pbutil.MustMarshal(
+		&pb.Metadata{
+			NodeID:    uint64(member.ID),
+			ClusterID: uint64(cfg.Cluster.ID()),
+		},
+	)
+	if err := os.MkdirAll(cfg.SnapDir(), privateDirMode); err != nil {
+		log.Fatalf("etcdserver create snapshot directory error: %v", err)
+	}
+	if w, err = wal.Create(cfg.WALDir(), metadata); err != nil {
+		log.Fatalf("etcdserver: create wal error: %v", err)
+	}
+	peers := make([]raft.Peer, len(ids))
+	for i, id := range ids {
+		ctx, err := json.Marshal((*cfg.Cluster).Member(id))
+		if err != nil {
+			log.Panicf("marshal member should never fail: %v", err)
+		}
+		peers[i] = raft.Peer{ID: uint64(id), Context: ctx}
+	}
+	id = member.ID
+	log.Printf("etcdserver: start member %s in cluster %s", id, cfg.Cluster.ID())
+	s = raft.NewMemoryStorage()
+	n = raft.StartNode(uint64(id), peers, 10, 1, s)
+	return
+}
+
+func restartNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (types.ID, raft.Node, *raft.MemoryStorage, *wal.WAL) {
+	w, id, cid, st, ents := readWAL(cfg.WALDir(), index)
+	cfg.Cluster.SetID(cid)
+
+	log.Printf("etcdserver: restart member %s in cluster %s at commit index %d", id, cfg.Cluster.ID(), st.Commit)
+	s := raft.NewMemoryStorage()
+	if snapshot != nil {
+		s.ApplySnapshot(*snapshot)
+	}
+	s.SetHardState(st)
+	s.Append(ents)
+	n := raft.RestartNode(uint64(id), 10, 1, s)
+	return id, n, s, w
+}
+
+// isBootstrapped tries to check if the given member has been bootstrapped
+// in the given cluster.
+func isBootstrapped(cl *Cluster, member string) bool {
+	us := getOtherPeerURLs(cl, member)
 	rcl, err := getClusterFromPeers(us, false)
 	if err != nil {
-		return nil
+		return false
 	}
-	id := cl.MemberByName(name).ID
+	id := cl.MemberByName(member).ID
 	m := rcl.Member(id)
 	if m == nil {
-		return nil
+		return false
 	}
 	if len(m.ClientURLs) > 0 {
-		return fmt.Errorf("etcdserver: member with id %s has started and registered its client urls", id)
+		return true
 	}
-	return nil
+	return false
 }
 
 // GetClusterFromPeers takes a set of URLs representing etcd peers, and
@@ -908,36 +952,6 @@ func getClusterFromPeers(urls []string, logerr bool) (*Cluster, error) {
 	return nil, fmt.Errorf("etcdserver: could not retrieve cluster information from the given urls")
 }
 
-func startNode(cfg *ServerConfig, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) {
-	var err error
-	member := cfg.Cluster.MemberByName(cfg.Name)
-	metadata := pbutil.MustMarshal(
-		&pb.Metadata{
-			NodeID:    uint64(member.ID),
-			ClusterID: uint64(cfg.Cluster.ID()),
-		},
-	)
-	if err := os.MkdirAll(cfg.SnapDir(), privateDirMode); err != nil {
-		log.Fatalf("etcdserver create snapshot directory error: %v", err)
-	}
-	if w, err = wal.Create(cfg.WALDir(), metadata); err != nil {
-		log.Fatalf("etcdserver: create wal error: %v", err)
-	}
-	peers := make([]raft.Peer, len(ids))
-	for i, id := range ids {
-		ctx, err := json.Marshal((*cfg.Cluster).Member(id))
-		if err != nil {
-			log.Panicf("marshal member should never fail: %v", err)
-		}
-		peers[i] = raft.Peer{ID: uint64(id), Context: ctx}
-	}
-	id = member.ID
-	log.Printf("etcdserver: start member %s in cluster %s", id, cfg.Cluster.ID())
-	s = raft.NewMemoryStorage()
-	n = raft.StartNode(uint64(id), peers, 10, 1, s)
-	return
-}
-
 func getOtherMembers(cl ClusterInfo, self string) []*Member {
 	var ms []*Member
 	for _, m := range cl.Members() {
@@ -962,37 +976,6 @@ func getOtherPeerURLs(cl ClusterInfo, self string) []string {
 	return us
 }
 
-func restartNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (types.ID, raft.Node, *raft.MemoryStorage, *wal.WAL) {
-	w, id, cid, st, ents := readWAL(cfg.WALDir(), index)
-	cfg.Cluster.SetID(cid)
-
-	log.Printf("etcdserver: restart member %s in cluster %s at commit index %d", id, cfg.Cluster.ID(), st.Commit)
-	s := raft.NewMemoryStorage()
-	if snapshot != nil {
-		s.ApplySnapshot(*snapshot)
-	}
-	s.SetHardState(st)
-	s.Append(ents)
-	n := raft.RestartNode(uint64(id), 10, 1, s)
-	return id, n, s, w
-}
-
-func readWAL(waldir string, index uint64) (w *wal.WAL, id, cid types.ID, st raftpb.HardState, ents []raftpb.Entry) {
-	var err error
-	if w, err = wal.Open(waldir, index); err != nil {
-		log.Fatalf("etcdserver: open wal error: %v", err)
-	}
-	var wmetadata []byte
-	if wmetadata, st, ents, err = w.ReadAll(); err != nil {
-		log.Fatalf("etcdserver: read wal error: %v", err)
-	}
-	var metadata pb.Metadata
-	pbutil.MustUnmarshal(&metadata, wmetadata)
-	id = types.ID(metadata.NodeID)
-	cid = types.ID(metadata.ClusterID)
-	return
-}
-
 // TODO: move the function to /id pkg maybe?
 // GenID generates a random id that is not equal to 0.
 func GenID() (n uint64) {

+ 21 - 2
etcdserver/storage.go

@@ -3,7 +3,10 @@ package etcdserver
 import (
 	"log"
 
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/migrate"
+	"github.com/coreos/etcd/pkg/pbutil"
+	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/raft/raftpb"
 	"github.com/coreos/etcd/snap"
 	"github.com/coreos/etcd/wal"
@@ -47,9 +50,25 @@ func (st *storage) SaveSnap(snap raftpb.Snapshot) error {
 	return nil
 }
 
-// UpgradeWAL converts an older version of the etcdServer data to the newest version.
+func readWAL(waldir string, index uint64) (w *wal.WAL, id, cid types.ID, st raftpb.HardState, ents []raftpb.Entry) {
+	var err error
+	if w, err = wal.Open(waldir, index); err != nil {
+		log.Fatalf("etcdserver: open wal error: %v", err)
+	}
+	var wmetadata []byte
+	if wmetadata, st, ents, err = w.ReadAll(); err != nil {
+		log.Fatalf("etcdserver: read wal error: %v", err)
+	}
+	var metadata pb.Metadata
+	pbutil.MustUnmarshal(&metadata, wmetadata)
+	id = types.ID(metadata.NodeID)
+	cid = types.ID(metadata.ClusterID)
+	return
+}
+
+// upgradeWAL converts an older version of the etcdServer data to the newest version.
 // It must ensure that, after upgrading, the most recent version is present.
-func UpgradeWAL(cfg *ServerConfig, ver wal.WalVersion) error {
+func upgradeWAL(cfg *ServerConfig, ver wal.WalVersion) error {
 	if ver == wal.WALv0_4 {
 		log.Print("etcdserver: converting v0.4 log to v2.0")
 		err := migrate.Migrate4To2(cfg.DataDir, cfg.Name)