Browse Source

main: move server configuration to etcdserver package

Jonathan Boulle 11 years ago
parent
commit
af6b29f291
5 changed files with 107 additions and 83 deletions
  1. 2 2
      etcdserver/etcdhttp/http.go
  2. 1 1
      etcdserver/etcdhttp/http_test.go
  3. 89 11
      etcdserver/server.go
  4. 4 4
      etcdserver/server_test.go
  5. 11 65
      main.go

+ 2 - 2
etcdserver/etcdhttp/http.go

@@ -35,10 +35,10 @@ const (
 var errClosed = errors.New("etcdhttp: client closed connection")
 var errClosed = errors.New("etcdhttp: client closed connection")
 
 
 // NewClientHandler generates a muxed http.Handler with the given parameters to serve etcd client requests.
 // NewClientHandler generates a muxed http.Handler with the given parameters to serve etcd client requests.
-func NewClientHandler(server *etcdserver.EtcdServer, clusterStore etcdserver.ClusterStore, timeout time.Duration) http.Handler {
+func NewClientHandler(server *etcdserver.EtcdServer, timeout time.Duration) http.Handler {
 	sh := &serverHandler{
 	sh := &serverHandler{
 		server:       server,
 		server:       server,
-		clusterStore: clusterStore,
+		clusterStore: server.ClusterStore,
 		timer:        server,
 		timer:        server,
 		timeout:      timeout,
 		timeout:      timeout,
 	}
 	}

+ 1 - 1
etcdserver/etcdhttp/http_test.go

@@ -591,7 +591,7 @@ func TestV2MachinesEndpoint(t *testing.T) {
 		{"POST", http.StatusMethodNotAllowed},
 		{"POST", http.StatusMethodNotAllowed},
 	}
 	}
 
 
-	m := NewClientHandler(nil, &fakeCluster{}, time.Hour)
+	m := NewClientHandler(&etcdserver.EtcdServer{ClusterStore: &fakeCluster{}}, time.Hour)
 	s := httptest.NewServer(m)
 	s := httptest.NewServer(m)
 	defer s.Close()
 	defer s.Close()
 
 

+ 89 - 11
etcdserver/server.go

@@ -5,6 +5,7 @@ import (
 	"errors"
 	"errors"
 	"log"
 	"log"
 	"math/rand"
 	"math/rand"
+	"net/http"
 	"sync/atomic"
 	"sync/atomic"
 	"time"
 	"time"
 
 
@@ -12,9 +13,11 @@ import (
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/raft"
 	"github.com/coreos/etcd/raft"
 	"github.com/coreos/etcd/raft/raftpb"
 	"github.com/coreos/etcd/raft/raftpb"
+	"github.com/coreos/etcd/snap"
 	"github.com/coreos/etcd/store"
 	"github.com/coreos/etcd/store"
 	"github.com/coreos/etcd/third_party/code.google.com/p/go.net/context"
 	"github.com/coreos/etcd/third_party/code.google.com/p/go.net/context"
 	"github.com/coreos/etcd/wait"
 	"github.com/coreos/etcd/wait"
+	"github.com/coreos/etcd/wal"
 )
 )
 
 
 const (
 const (
@@ -76,13 +79,89 @@ type RaftTimer interface {
 	Term() int64
 	Term() int64
 }
 }
 
 
+type ServerConfig struct {
+	Name       string
+	ClientURLs types.URLs
+	SnapDir    string
+	SnapCount  int64
+	WalDir     string
+	Cluster    *Cluster
+	Transport  *http.Transport
+}
+
+// NewServer creates a new EtcdServer from the supplied configuration. The
+// configuration is considered static for the lifetime of the EtcdServer.
+func NewServer(cfg *ServerConfig) *EtcdServer {
+	m := cfg.Cluster.FindName(cfg.Name)
+	if m == nil {
+		// Should never happen
+		log.Fatalf("could not find name %v in cluster!", cfg.Name)
+	}
+	st := store.New()
+	ss := snap.New(cfg.SnapDir)
+	var w *wal.WAL
+	var n raft.Node
+	var err error
+	if !wal.Exist(cfg.WalDir) {
+		if w, err = wal.Create(cfg.WalDir); err != nil {
+			log.Fatal(err)
+		}
+		n = raft.StartNode(m.ID, cfg.Cluster.IDs(), 10, 1)
+	} else {
+		var index int64
+		snapshot, err := ss.Load()
+		if err != nil && err != snap.ErrNoSnapshot {
+			log.Fatal(err)
+		}
+		if snapshot != nil {
+			log.Printf("etcdserver: restart from snapshot at index %d", snapshot.Index)
+			st.Recovery(snapshot.Data)
+			index = snapshot.Index
+		}
+
+		// restart a node from previous wal
+		if w, err = wal.OpenAtIndex(cfg.WalDir, index); err != nil {
+			log.Fatal(err)
+		}
+		wid, st, ents, err := w.ReadAll()
+		if err != nil {
+			log.Fatal(err)
+		}
+		// TODO(xiangli): save/recovery nodeID?
+		if wid != 0 {
+			log.Fatalf("unexpected nodeid %d: nodeid should always be zero until we save nodeid into wal", wid)
+		}
+		n = raft.RestartNode(m.ID, cfg.Cluster.IDs(), 10, 1, snapshot, st, ents)
+	}
+
+	cls := NewClusterStore(st, *cfg.Cluster)
+
+	s := &EtcdServer{
+		Store: st,
+		Node:  n,
+		name:  cfg.Name,
+		Storage: struct {
+			*wal.WAL
+			*snap.Snapshotter
+		}{w, ss},
+		Send:         Sender(cfg.Transport, cls),
+		clientURLs:   cfg.ClientURLs,
+		Ticker:       time.Tick(100 * time.Millisecond),
+		SyncTicker:   time.Tick(500 * time.Millisecond),
+		SnapCount:    cfg.SnapCount,
+		ClusterStore: cls,
+	}
+	return s
+}
+
 // EtcdServer is the production implementation of the Server interface
 // EtcdServer is the production implementation of the Server interface
 type EtcdServer struct {
 type EtcdServer struct {
-	w    wait.Wait
-	done chan struct{}
+	w          wait.Wait
+	done       chan struct{}
+	name       string
+	clientURLs types.URLs
 
 
-	Name       string
-	ClientURLs types.URLs
+	ClusterStore ClusterStore
 
 
 	Node  raft.Node
 	Node  raft.Node
 	Store store.Store
 	Store store.Store
@@ -101,9 +180,8 @@ type EtcdServer struct {
 	SnapCount int64 // number of entries to trigger a snapshot
 	SnapCount int64 // number of entries to trigger a snapshot
 
 
 	// Cache of the latest raft index and raft term the server has seen
 	// Cache of the latest raft index and raft term the server has seen
-	raftIndex    int64
-	raftTerm     int64
-	ClusterStore ClusterStore
+	raftIndex int64
+	raftTerm  int64
 }
 }
 
 
 // Start prepares and starts server in a new goroutine. It is no longer safe to
 // Start prepares and starts server in a new goroutine. It is no longer safe to
@@ -338,14 +416,14 @@ func (s *EtcdServer) sync(timeout time.Duration) {
 }
 }
 
 
 // publish registers server information into the cluster. The information
 // publish registers server information into the cluster. The information
-// is the json format of its self member struct, whose ClientURLs may be
-// updated.
+// is the JSON representation of this server's member struct, updated with the
+// static clientURLs of the server.
 // The function keeps attempting to register until it succeeds,
 // The function keeps attempting to register until it succeeds,
 // or its server is stopped.
 // or its server is stopped.
 // TODO: take care of info fetched from cluster store after having reconfig.
 // TODO: take care of info fetched from cluster store after having reconfig.
 func (s *EtcdServer) publish(retryInterval time.Duration) {
 func (s *EtcdServer) publish(retryInterval time.Duration) {
-	m := *s.ClusterStore.Get().FindName(s.Name)
-	m.ClientURLs = s.ClientURLs.StringSlice()
+	m := *s.ClusterStore.Get().FindName(s.name)
+	m.ClientURLs = s.clientURLs.StringSlice()
 	b, err := json.Marshal(m)
 	b, err := json.Marshal(m)
 	if err != nil {
 	if err != nil {
 		log.Printf("etcdserver: json marshal error: %v", err)
 		log.Printf("etcdserver: json marshal error: %v", err)

+ 4 - 4
etcdserver/server_test.go

@@ -856,8 +856,8 @@ func TestPublish(t *testing.T) {
 	ch <- Response{}
 	ch <- Response{}
 	w := &waitWithResponse{ch: ch}
 	w := &waitWithResponse{ch: ch}
 	srv := &EtcdServer{
 	srv := &EtcdServer{
-		Name:         "node1",
-		ClientURLs:   []url.URL{{Scheme: "http", Host: "a"}, {Scheme: "http", Host: "b"}},
+		name:         "node1",
+		clientURLs:   []url.URL{{Scheme: "http", Host: "a"}, {Scheme: "http", Host: "b"}},
 		Node:         n,
 		Node:         n,
 		ClusterStore: cs,
 		ClusterStore: cs,
 		w:            w,
 		w:            w,
@@ -892,7 +892,7 @@ func TestPublish(t *testing.T) {
 func TestPublishStopped(t *testing.T) {
 func TestPublishStopped(t *testing.T) {
 	cs := mustClusterStore(t, []Member{{ID: 1, Name: "node1"}})
 	cs := mustClusterStore(t, []Member{{ID: 1, Name: "node1"}})
 	srv := &EtcdServer{
 	srv := &EtcdServer{
-		Name:         "node1",
+		name:         "node1",
 		Node:         &nodeRecorder{},
 		Node:         &nodeRecorder{},
 		ClusterStore: cs,
 		ClusterStore: cs,
 		w:            &waitRecorder{},
 		w:            &waitRecorder{},
@@ -907,7 +907,7 @@ func TestPublishRetry(t *testing.T) {
 	n := &nodeRecorder{}
 	n := &nodeRecorder{}
 	cs := mustClusterStore(t, []Member{{ID: 1, Name: "node1"}})
 	cs := mustClusterStore(t, []Member{{ID: 1, Name: "node1"}})
 	srv := &EtcdServer{
 	srv := &EtcdServer{
-		Name:         "node1",
+		name:         "node1",
 		Node:         n,
 		Node:         n,
 		ClusterStore: cs,
 		ClusterStore: cs,
 		w:            &waitRecorder{},
 		w:            &waitRecorder{},

+ 11 - 65
main.go

@@ -17,9 +17,6 @@ import (
 	"github.com/coreos/etcd/pkg/transport"
 	"github.com/coreos/etcd/pkg/transport"
 	"github.com/coreos/etcd/proxy"
 	"github.com/coreos/etcd/proxy"
 	"github.com/coreos/etcd/raft"
 	"github.com/coreos/etcd/raft"
-	"github.com/coreos/etcd/snap"
-	"github.com/coreos/etcd/store"
-	"github.com/coreos/etcd/wal"
 )
 )
 
 
 const (
 const (
@@ -33,7 +30,7 @@ var (
 	name         = flag.String("name", "default", "Unique human-readable name for this node")
 	name         = flag.String("name", "default", "Unique human-readable name for this node")
 	timeout      = flag.Duration("timeout", 10*time.Second, "Request Timeout")
 	timeout      = flag.Duration("timeout", 10*time.Second, "Request Timeout")
 	dir          = flag.String("data-dir", "", "Path to the data directory")
 	dir          = flag.String("data-dir", "", "Path to the data directory")
-	snapCount    = flag.Int64("snapshot-count", etcdserver.DefaultSnapCount, "Number of committed transactions to trigger a snapshot")
+	snapCount    = flag.Uint64("snapshot-count", etcdserver.DefaultSnapCount, "Number of committed transactions to trigger a snapshot")
 	printVersion = flag.Bool("version", false, "Print the version and exit")
 	printVersion = flag.Bool("version", false, "Print the version and exit")
 
 
 	cluster   = &etcdserver.Cluster{}
 	cluster   = &etcdserver.Cluster{}
@@ -125,93 +122,42 @@ func startEtcd() {
 		log.Fatalf("etcd: cannot use None(%d) as member id", raft.None)
 		log.Fatalf("etcd: cannot use None(%d) as member id", raft.None)
 	}
 	}
 
 
-	if *snapCount <= 0 {
-		log.Fatalf("etcd: snapshot-count must be greater than 0: snapshot-count=%d", *snapCount)
-	}
-
 	if *dir == "" {
 	if *dir == "" {
 		*dir = fmt.Sprintf("%v_etcd_data", self.ID)
 		*dir = fmt.Sprintf("%v_etcd_data", self.ID)
-		log.Printf("main: no data-dir is given, using default data-dir ./%s", *dir)
+		log.Printf("main: no data-dir provided, using default data-dir ./%s", *dir)
 	}
 	}
 	if err := os.MkdirAll(*dir, privateDirMode); err != nil {
 	if err := os.MkdirAll(*dir, privateDirMode); err != nil {
 		log.Fatalf("main: cannot create data directory: %v", err)
 		log.Fatalf("main: cannot create data directory: %v", err)
 	}
 	}
 	snapdir := path.Join(*dir, "snap")
 	snapdir := path.Join(*dir, "snap")
 	if err := os.MkdirAll(snapdir, privateDirMode); err != nil {
 	if err := os.MkdirAll(snapdir, privateDirMode); err != nil {
-		log.Fatalf("etcd: cannot create snapshot directory: %v", err)
+		log.Fatalf("main: cannot create snapshot directory: %v", err)
 	}
 	}
-	snapshotter := snap.New(snapdir)
-
 	waldir := path.Join(*dir, "wal")
 	waldir := path.Join(*dir, "wal")
-	var w *wal.WAL
-	var n raft.Node
-	var err error
-	st := store.New()
-
-	if !wal.Exist(waldir) {
-		w, err = wal.Create(waldir)
-		if err != nil {
-			log.Fatal(err)
-		}
-		n = raft.StartNode(self.ID, cluster.IDs(), 10, 1)
-	} else {
-		var index int64
-		snapshot, err := snapshotter.Load()
-		if err != nil && err != snap.ErrNoSnapshot {
-			log.Fatal(err)
-		}
-		if snapshot != nil {
-			log.Printf("etcd: restart from snapshot at index %d", snapshot.Index)
-			st.Recovery(snapshot.Data)
-			index = snapshot.Index
-		}
-
-		// restart a node from previous wal
-		if w, err = wal.OpenAtIndex(waldir, index); err != nil {
-			log.Fatal(err)
-		}
-		wid, st, ents, err := w.ReadAll()
-		if err != nil {
-			log.Fatal(err)
-		}
-		// TODO(xiangli): save/recovery nodeID?
-		if wid != 0 {
-			log.Fatalf("unexpected nodeid %d: nodeid should always be zero until we save nodeid into wal", wid)
-		}
-		n = raft.RestartNode(self.ID, cluster.IDs(), 10, 1, snapshot, st, ents)
-	}
 
 
 	pt, err := transport.NewTransport(peerTLSInfo)
 	pt, err := transport.NewTransport(peerTLSInfo)
 	if err != nil {
 	if err != nil {
 		log.Fatal(err)
 		log.Fatal(err)
 	}
 	}
 
 
-	cls := etcdserver.NewClusterStore(st, *cluster)
-
 	acurls, err := pkg.URLsFromFlags(flag.CommandLine, "advertise-client-urls", "addr", clientTLSInfo)
 	acurls, err := pkg.URLsFromFlags(flag.CommandLine, "advertise-client-urls", "addr", clientTLSInfo)
 	if err != nil {
 	if err != nil {
 		log.Fatal(err.Error())
 		log.Fatal(err.Error())
 	}
 	}
-
-	s := &etcdserver.EtcdServer{
+	cfg := &etcdserver.ServerConfig{
 		Name:       *name,
 		Name:       *name,
 		ClientURLs: acurls,
 		ClientURLs: acurls,
-		Store:      st,
-		Node:       n,
-		Storage: struct {
-			*wal.WAL
-			*snap.Snapshotter
-		}{w, snapshotter},
-		Send:         etcdserver.Sender(pt, cls),
-		Ticker:       time.Tick(100 * time.Millisecond),
-		SyncTicker:   time.Tick(500 * time.Millisecond),
-		SnapCount:    *snapCount,
-		ClusterStore: cls,
+		SnapDir:    snapdir,
+		SnapCount:  int64(*snapCount),
+		WalDir:     waldir,
+		Cluster:    cluster,
+		Transport:  pt,
 	}
 	}
+	s := etcdserver.NewServer(cfg)
 	s.Start()
 	s.Start()
 
 
 	ch := &pkg.CORSHandler{
 	ch := &pkg.CORSHandler{
-		Handler: etcdhttp.NewClientHandler(s, cls, *timeout),
+		Handler: etcdhttp.NewClientHandler(s, *timeout),
 		Info:    cors,
 		Info:    cors,
 	}
 	}
 	ph := etcdhttp.NewPeerHandler(s)
 	ph := etcdhttp.NewPeerHandler(s)