Browse Source

Merge pull request #1288 from coreos/cleanup

etcdserver: clean NewServer
Xiang Li 11 years ago
parent
commit
09f9884c6a
2 changed files with 131 additions and 105 deletions
  1. 16 0
      etcdserver/config.go
  2. 115 105
      etcdserver/server.go

+ 16 - 0
etcdserver/config.go

@@ -3,6 +3,7 @@ package etcdserver
 import (
 	"fmt"
 	"net/http"
+	"path"
 
 	"github.com/coreos/etcd/pkg/types"
 )
@@ -40,3 +41,18 @@ func (c *ServerConfig) Verify() error {
 	}
 	return nil
 }
+
+func (c *ServerConfig) WALDir() string { return path.Join(c.DataDir, "wal") }
+
+func (c *ServerConfig) SnapDir() string { return path.Join(c.DataDir, "snap") }
+
+func (c *ServerConfig) ID() uint64 { return c.Cluster.FindName(c.Name).ID }
+
+func (c *ServerConfig) ShouldDiscover() bool {
+	return c.DiscoveryURL != ""
+}
+
+// IsBootstrap returns true if a bootstrap method is provided.
+func (c *ServerConfig) IsBootstrap() bool {
+	return c.DiscoveryURL != "" || c.ClusterState == ClusterStateValueNew
+}

+ 115 - 105
etcdserver/server.go

@@ -6,7 +6,6 @@ import (
 	"log"
 	"math/rand"
 	"os"
-	"path"
 	"sync/atomic"
 	"time"
 
@@ -82,26 +81,55 @@ type RaftTimer interface {
 	Term() uint64
 }
 
+// EtcdServer is the production implementation of the Server interface
+type EtcdServer struct {
+	w          wait.Wait
+	done       chan struct{}
+	id         uint64
+	attributes Attributes
+
+	ClusterStore ClusterStore
+
+	node  raft.Node
+	store store.Store
+
+	// send specifies the send function for sending msgs to members. send
+	// MUST NOT block. It is okay to drop messages, since clients should
+	// timeout and reissue their messages.  If send is nil, server will
+	// panic.
+	send sendFunc
+
+	storage Storage
+
+	ticker     <-chan time.Time
+	syncTicker <-chan time.Time
+
+	snapCount uint64 // number of entries to trigger a snapshot
+
+	// Cache of the latest raft index and raft term the server has seen
+	raftIndex uint64
+	raftTerm  uint64
+}
+
 // NewServer creates a new EtcdServer from the supplied configuration. The
 // configuration is considered static for the lifetime of the EtcdServer.
 func NewServer(cfg *ServerConfig) *EtcdServer {
-	err := cfg.Verify()
-	if err != nil {
+	if err := cfg.Verify(); err != nil {
 		log.Fatalln(err)
 	}
-	snapdir := path.Join(cfg.DataDir, "snap")
-	if err := os.MkdirAll(snapdir, privateDirMode); err != nil {
+	if err := os.MkdirAll(cfg.SnapDir(), privateDirMode); err != nil {
 		log.Fatalf("etcdserver: cannot create snapshot directory: %v", err)
 	}
-	ss := snap.New(snapdir)
+	ss := snap.New(cfg.SnapDir())
 	st := store.New()
 	var w *wal.WAL
 	var n raft.Node
-	m := cfg.Cluster.FindName(cfg.Name)
-	waldir := path.Join(cfg.DataDir, "wal")
-	if !wal.Exist(waldir) {
-		if cfg.DiscoveryURL != "" {
-			d, err := discovery.New(cfg.DiscoveryURL, m.ID, cfg.Cluster.String())
+	if !wal.Exist(cfg.WALDir()) {
+		if !cfg.IsBootstrap() {
+			log.Fatalf("etcd: initial cluster state unset and no wal or discovery URL found")
+		}
+		if cfg.ShouldDiscover() {
+			d, err := discovery.New(cfg.DiscoveryURL, cfg.ID(), cfg.Cluster.String())
 			if err != nil {
 				log.Fatalf("etcd: cannot init discovery %v", err)
 			}
@@ -112,31 +140,11 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
 			if err = cfg.Cluster.Set(s); err != nil {
 				log.Fatalf("etcd: %v", err)
 			}
-		} else if (cfg.ClusterState) != ClusterStateValueNew {
-			log.Fatalf("etcd: initial cluster state unset and no wal or discovery URL found")
 		}
-		i := pb.Info{ID: m.ID}
-		b, err := i.Marshal()
-		if err != nil {
-			log.Fatal(err)
-		}
-		if w, err = wal.Create(waldir, b); err != nil {
-			log.Fatal(err)
-		}
-
-		ids := cfg.Cluster.IDs()
-		peers := make([]raft.Peer, len(ids))
-		for i, id := range ids {
-			ctx, err := json.Marshal((*cfg.Cluster)[id])
-			if err != nil {
-				log.Fatal(err)
-			}
-			peers[i] = raft.Peer{ID: id, Context: ctx}
-		}
-		n = raft.StartNode(m.ID, peers, 10, 1)
+		n, w = startNode(cfg)
 	} else {
-		if cfg.DiscoveryURL != "" {
-			log.Printf("etcd: warn: ignoring discovery URL: etcd has already been initialized and has a valid log in %q", waldir)
+		if cfg.ShouldDiscover() {
+			log.Printf("etcd: warn: ignoring discovery: etcd has already been initialized and has a valid log in %q", cfg.WALDir())
 		}
 		var index uint64
 		snapshot, err := ss.Load()
@@ -148,31 +156,14 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
 			st.Recovery(snapshot.Data)
 			index = snapshot.Index
 		}
-
-		// restart a node from previous wal
-		if w, err = wal.OpenAtIndex(waldir, index); err != nil {
-			log.Fatal(err)
-		}
-		md, st, ents, err := w.ReadAll()
-		if err != nil {
-			log.Fatal(err)
-		}
-		var info pb.Info
-		if err := info.Unmarshal(md); err != nil {
-			log.Fatal(err)
-		}
-		// TODO(xiangli): save/recovery nodeID?
-		if info.ID != m.ID {
-			log.Fatalf("unexpected nodeid %x, want %x: nodeid should always be the same until we support name/peerURLs update or dynamic configuration", info.ID, m.ID)
-		}
-		n = raft.RestartNode(m.ID, 10, 1, snapshot, st, ents)
+		n, w = restartNode(cfg, index, snapshot)
 	}
 
 	cls := &clusterStore{Store: st}
 	s := &EtcdServer{
 		store:      st,
 		node:       n,
-		id:         m.ID,
+		id:         cfg.ID(),
 		attributes: Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
 		storage: struct {
 			*wal.WAL
@@ -187,36 +178,6 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
 	return s
 }
 
-// EtcdServer is the production implementation of the Server interface
-type EtcdServer struct {
-	w          wait.Wait
-	done       chan struct{}
-	id         uint64
-	attributes Attributes
-
-	ClusterStore ClusterStore
-
-	node  raft.Node
-	store store.Store
-
-	// send specifies the send function for sending msgs to members. send
-	// MUST NOT block. It is okay to drop messages, since clients should
-	// timeout and reissue their messages.  If send is nil, server will
-	// panic.
-	send sendFunc
-
-	storage Storage
-
-	ticker     <-chan time.Time
-	syncTicker <-chan time.Time
-
-	snapCount uint64 // number of entries to trigger a snapshot
-
-	// Cache of the latest raft index and raft term the server has seen
-	raftIndex uint64
-	raftTerm  uint64
-}
-
 // Start prepares and starts server in a new goroutine. It is no longer safe to
 // modify a server's fields after it has been sent to Start.
 // It also starts a goroutine to publish its server information.
@@ -262,27 +223,8 @@ func (s *EtcdServer) run() {
 			// care to apply entries in a single goroutine, and not
 			// race them.
 			// TODO: apply configuration change into ClusterStore.
-			for _, e := range rd.CommittedEntries {
-				switch e.Type {
-				case raftpb.EntryNormal:
-					var r pb.Request
-					if err := r.Unmarshal(e.Data); err != nil {
-						panic("TODO: this is bad, what do we do about it?")
-					}
-					s.w.Trigger(r.ID, s.applyRequest(r))
-				case raftpb.EntryConfChange:
-					var cc raftpb.ConfChange
-					if err := cc.Unmarshal(e.Data); err != nil {
-						panic("TODO: this is bad, what do we do about it?")
-					}
-					s.applyConfChange(cc)
-					s.w.Trigger(cc.ID, nil)
-				default:
-					panic("unexpected entry type")
-				}
-				atomic.StoreUint64(&s.raftIndex, e.Index)
-				atomic.StoreUint64(&s.raftTerm, e.Term)
-				appliedi = e.Index
+			if len(rd.CommittedEntries) != 0 {
+				appliedi = s.apply(rd.CommittedEntries)
 			}
 
 			if rd.SoftState != nil {
@@ -336,7 +278,7 @@ func (s *EtcdServer) Stop() {
 // an error.
 func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) {
 	if r.ID == 0 {
-		panic("r.Id cannot be 0")
+		panic("r.ID cannot be 0")
 	}
 	if r.Method == "GET" && r.Quorum {
 		r.Method = "QGET"
@@ -498,6 +440,34 @@ func getExpirationTime(r *pb.Request) time.Time {
 	return t
 }
 
+func (s *EtcdServer) apply(es []raftpb.Entry) uint64 {
+	var applied uint64
+	for i := range es {
+		e := es[i]
+		switch e.Type {
+		case raftpb.EntryNormal:
+			var r pb.Request
+			if err := r.Unmarshal(e.Data); err != nil {
+				panic("TODO: this is bad, what do we do about it?")
+			}
+			s.w.Trigger(r.ID, s.applyRequest(r))
+		case raftpb.EntryConfChange:
+			var cc raftpb.ConfChange
+			if err := cc.Unmarshal(e.Data); err != nil {
+				panic("TODO: this is bad, what do we do about it?")
+			}
+			s.applyConfChange(cc)
+			s.w.Trigger(cc.ID, nil)
+		default:
+			panic("unexpected entry type")
+		}
+		atomic.StoreUint64(&s.raftIndex, e.Index)
+		atomic.StoreUint64(&s.raftTerm, e.Term)
+		applied = e.Index
+	}
+	return applied
+}
+
 // applyRequest interprets r as a call to store.X and returns a Response interpreted
 // from store.Event
 func (s *EtcdServer) applyRequest(r pb.Request) Response {
@@ -570,6 +540,46 @@ func (s *EtcdServer) snapshot(snapi uint64, snapnodes []uint64) {
 	s.storage.Cut()
 }
 
+func startNode(cfg *ServerConfig) (n raft.Node, w *wal.WAL) {
+	i := pb.Info{ID: cfg.ID()}
+	b, err := i.Marshal()
+	if err != nil {
+		log.Fatal(err)
+	}
+	if w, err = wal.Create(cfg.WALDir(), b); err != nil {
+		log.Fatal(err)
+	}
+	ids := cfg.Cluster.IDs()
+	peers := make([]raft.Peer, len(ids))
+	for i, id := range ids {
+		ctx, err := json.Marshal((*cfg.Cluster)[id])
+		if err != nil {
+			log.Fatal(err)
+		}
+		peers[i] = raft.Peer{ID: id, Context: ctx}
+	}
+	n = raft.StartNode(cfg.ID(), peers, 10, 1)
+	return
+}
+
+func restartNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (n raft.Node, w *wal.WAL) {
+	var err error
+	// restart a node from previous wal
+	if w, err = wal.OpenAtIndex(cfg.WALDir(), index); err != nil {
+		log.Fatal(err)
+	}
+	md, st, ents, err := w.ReadAll()
+	if err != nil {
+		log.Fatal(err)
+	}
+	var info pb.Info
+	if err := info.Unmarshal(md); err != nil {
+		log.Fatal(err)
+	}
+	n = raft.RestartNode(info.ID, 10, 1, snapshot, st, ents)
+	return
+}
+
 // TODO: move the function to /id pkg maybe?
 // GenID generates a random id that is not equal to 0.
 func GenID() (n uint64) {