Browse Source

Merge pull request #5925 from heyitsanthony/embed-etcdmain

embeddable etcdmain
Anthony Romano 9 years ago
parent
commit
41a98dbd66
12 changed files with 1063 additions and 703 deletions
  1. 2 0
      Documentation/docs.md
  2. 304 0
      embed/config.go
  3. 83 0
      embed/config_test.go
  4. 21 11
      embed/doc.go
  5. 302 0
      embed/etcd.go
  6. 26 18
      embed/serve.go
  7. 125 271
      etcdmain/config.go
  8. 57 101
      etcdmain/config_test.go
  9. 0 26
      etcdmain/const_windows.go
  10. 33 273
      etcdmain/etcd.go
  11. 7 3
      etcdmain/help.go
  12. 103 0
      integration/embed_test.go

+ 2 - 0
Documentation/docs.md

@@ -14,6 +14,7 @@ The easiest way to get started using etcd as a distributed key-value store is to
  - [Interacting with etcd][interacting]
  - [Interacting with etcd][interacting]
  - [API references][api_ref]
  - [API references][api_ref]
  - [gRPC gateway][api_grpc_gateway]
  - [gRPC gateway][api_grpc_gateway]
+ - [Embedding etcd][embed_etcd]
  - [Experimental features and APIs][experimental]
  - [Experimental features and APIs][experimental]
 
 
 ## Operating etcd clusters
 ## Operating etcd clusters
@@ -56,6 +57,7 @@ To learn more about the concepts and internals behind etcd, read the following p
 [data_model]: learning/data_model.md
 [data_model]: learning/data_model.md
 [demo]: demo.md
 [demo]: demo.md
 [download_build]: dl_build.md
 [download_build]: dl_build.md
+[embed_etcd]: ../embed/
 [failures]: op-guide/failures.md
 [failures]: op-guide/failures.md
 [glossary]: learning/glossary.md
 [glossary]: learning/glossary.md
 [interacting]: dev-guide/interacting_v3.md
 [interacting]: dev-guide/interacting_v3.md

+ 304 - 0
embed/config.go

@@ -0,0 +1,304 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package embed
+
+import (
+	"fmt"
+	"io/ioutil"
+	"net/url"
+	"strings"
+
+	"github.com/coreos/etcd/discovery"
+	"github.com/coreos/etcd/etcdserver"
+	"github.com/coreos/etcd/pkg/cors"
+	"github.com/coreos/etcd/pkg/transport"
+	"github.com/coreos/etcd/pkg/types"
+	"github.com/ghodss/yaml"
+)
+
+const (
+	ClusterStateFlagNew      = "new"
+	ClusterStateFlagExisting = "existing"
+
+	DefaultName                     = "default"
+	DefaultInitialAdvertisePeerURLs = "http://localhost:2380"
+	DefaultAdvertiseClientURLs      = "http://localhost:2379"
+	DefaultListenPeerURLs           = "http://localhost:2380"
+	DefaultListenClientURLs         = "http://localhost:2379"
+	DefaultMaxSnapshots             = 5
+	DefaultMaxWALs                  = 5
+
+	// maxElectionMs specifies the maximum value of election timeout.
+	// More details are listed in ../Documentation/tuning.md#time-parameters.
+	maxElectionMs = 50000
+)
+
+var (
+	ErrConflictBootstrapFlags = fmt.Errorf("multiple discovery or bootstrap flags are set. " +
+		"Choose one of \"initial-cluster\", \"discovery\" or \"discovery-srv\"")
+	ErrUnsetAdvertiseClientURLsFlag = fmt.Errorf("--advertise-client-urls is required when --listen-client-urls is set explicitly")
+)
+
+// Config holds the arguments for configuring an etcd server.
+type Config struct {
+	// member
+
+	CorsInfo                *cors.CORSInfo
+	LPUrls, LCUrls          []url.URL
+	Dir                     string `json:"data-dir"`
+	WalDir                  string `json:"wal-dir"`
+	MaxSnapFiles            uint   `json:"max-snapshots"`
+	MaxWalFiles             uint   `json:"max-wals"`
+	Name                    string `json:"name"`
+	SnapCount               uint64 `json:"snapshot-count"`
+	AutoCompactionRetention int    `json:"auto-compaction-retention"`
+
+	// TickMs is the number of milliseconds between heartbeat ticks.
+	// TODO: decouple tickMs and heartbeat tick (current heartbeat tick = 1).
+	// make ticks a cluster wide configuration.
+	TickMs            uint  `json:"heartbeat-interval"`
+	ElectionMs        uint  `json:"election-timeout"`
+	QuotaBackendBytes int64 `json:"quota-backend-bytes"`
+
+	// clustering
+
+	APUrls, ACUrls      []url.URL
+	ClusterState        string `json:"initial-cluster-state"`
+	DNSCluster          string `json:"discovery-srv"`
+	Dproxy              string `json:"discovery-proxy"`
+	Durl                string `json:"discovery"`
+	InitialCluster      string `json:"initial-cluster"`
+	InitialClusterToken string `json:"initial-cluster-token"`
+	StrictReconfigCheck bool   `json:"strict-reconfig-check"`
+
+	// security
+
+	ClientTLSInfo transport.TLSInfo
+	ClientAutoTLS bool
+	PeerTLSInfo   transport.TLSInfo
+	PeerAutoTLS   bool
+
+	// debug
+
+	Debug        bool   `json:"debug"`
+	LogPkgLevels string `json:"log-package-levels"`
+	EnablePprof  bool
+
+	// ForceNewCluster starts a new cluster even if previously started; unsafe.
+	ForceNewCluster bool `json:"force-new-cluster"`
+}
+
+// configYAML holds the config suitable for yaml parsing
+type configYAML struct {
+	Config
+	configJSON
+}
+
+// configJSON has file options that are translated into Config options
+type configJSON struct {
+	LPUrlsJSON         string         `json:"listen-peer-urls"`
+	LCUrlsJSON         string         `json:"listen-client-urls"`
+	CorsJSON           string         `json:"cors"`
+	APUrlsJSON         string         `json:"initial-advertise-peer-urls"`
+	ACUrlsJSON         string         `json:"advertise-client-urls"`
+	ClientSecurityJSON securityConfig `json:"client-transport-security"`
+	PeerSecurityJSON   securityConfig `json:"peer-transport-security"`
+}
+
+type securityConfig struct {
+	CAFile        string `json:"ca-file"`
+	CertFile      string `json:"cert-file"`
+	KeyFile       string `json:"key-file"`
+	CertAuth      bool   `json:"client-cert-auth"`
+	TrustedCAFile string `json:"trusted-ca-file"`
+	AutoTLS       bool   `json:"auto-tls"`
+}
+
+// NewConfig creates a new Config populated with default values.
+func NewConfig() *Config {
+	apurl, _ := url.Parse(DefaultInitialAdvertisePeerURLs)
+	acurl, _ := url.Parse(DefaultAdvertiseClientURLs)
+	cfg := &Config{
+		CorsInfo:            &cors.CORSInfo{},
+		MaxSnapFiles:        DefaultMaxSnapshots,
+		MaxWalFiles:         DefaultMaxWALs,
+		Name:                DefaultName,
+		SnapCount:           etcdserver.DefaultSnapCount,
+		TickMs:              100,
+		ElectionMs:          1000,
+		APUrls:              []url.URL{*apurl},
+		ACUrls:              []url.URL{*acurl},
+		ClusterState:        ClusterStateFlagNew,
+		InitialClusterToken: "etcd-cluster",
+	}
+	cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
+	return cfg
+}
+
+func ConfigFromFile(path string) (*Config, error) {
+	cfg := &configYAML{}
+	if err := cfg.configFromFile(path); err != nil {
+		return nil, err
+	}
+	return &cfg.Config, nil
+}
+
+func (cfg *configYAML) configFromFile(path string) error {
+	b, err := ioutil.ReadFile(path)
+	if err != nil {
+		return err
+	}
+
+	err = yaml.Unmarshal(b, cfg)
+	if err != nil {
+		return err
+	}
+
+	if cfg.LPUrlsJSON != "" {
+		u, err := types.NewURLs(strings.Split(cfg.LPUrlsJSON, ","))
+		if err != nil {
+			plog.Fatalf("unexpected error setting up listen-peer-urls: %v", err)
+		}
+		cfg.LPUrls = []url.URL(u)
+	}
+
+	if cfg.LCUrlsJSON != "" {
+		u, err := types.NewURLs(strings.Split(cfg.LCUrlsJSON, ","))
+		if err != nil {
+			plog.Fatalf("unexpected error setting up listen-client-urls: %v", err)
+		}
+		cfg.LCUrls = []url.URL(u)
+	}
+
+	if cfg.CorsJSON != "" {
+		if err := cfg.CorsInfo.Set(cfg.CorsJSON); err != nil {
+			plog.Panicf("unexpected error setting up cors: %v", err)
+		}
+	}
+
+	if cfg.APUrlsJSON != "" {
+		u, err := types.NewURLs(strings.Split(cfg.APUrlsJSON, ","))
+		if err != nil {
+			plog.Fatalf("unexpected error setting up initial-advertise-peer-urls: %v", err)
+		}
+		cfg.APUrls = []url.URL(u)
+	}
+
+	if cfg.ACUrlsJSON != "" {
+		u, err := types.NewURLs(strings.Split(cfg.ACUrlsJSON, ","))
+		if err != nil {
+			plog.Fatalf("unexpected error setting up advertise-peer-urls: %v", err)
+		}
+		cfg.ACUrls = []url.URL(u)
+	}
+
+	if cfg.ClusterState == "" {
+		cfg.ClusterState = ClusterStateFlagNew
+	}
+
+	copySecurityDetails := func(tls *transport.TLSInfo, ysc *securityConfig) {
+		tls.CAFile = ysc.CAFile
+		tls.CertFile = ysc.CertFile
+		tls.KeyFile = ysc.KeyFile
+		tls.ClientCertAuth = ysc.CertAuth
+		tls.TrustedCAFile = ysc.TrustedCAFile
+	}
+	copySecurityDetails(&cfg.ClientTLSInfo, &cfg.ClientSecurityJSON)
+	copySecurityDetails(&cfg.PeerTLSInfo, &cfg.PeerSecurityJSON)
+	cfg.ClientAutoTLS = cfg.ClientSecurityJSON.AutoTLS
+	cfg.PeerAutoTLS = cfg.PeerSecurityJSON.AutoTLS
+
+	return cfg.Validate()
+}
+
+func (cfg *Config) Validate() error {
+	// Check if conflicting flags are passed.
+	nSet := 0
+	for _, v := range []bool{cfg.Durl != "", cfg.InitialCluster != "", cfg.DNSCluster != ""} {
+		if v {
+			nSet++
+		}
+	}
+
+	if cfg.ClusterState != ClusterStateFlagNew && cfg.ClusterState != ClusterStateFlagExisting {
+		return fmt.Errorf("unexpected clusterState %q", cfg.ClusterState)
+	}
+
+	if nSet > 1 {
+		return ErrConflictBootstrapFlags
+	}
+
+	if 5*cfg.TickMs > cfg.ElectionMs {
+		return fmt.Errorf("--election-timeout[%vms] should be at least as 5 times as --heartbeat-interval[%vms]", cfg.ElectionMs, cfg.TickMs)
+	}
+	if cfg.ElectionMs > maxElectionMs {
+		return fmt.Errorf("--election-timeout[%vms] is too long, and should be set less than %vms", cfg.ElectionMs, maxElectionMs)
+	}
+
+	// check this last since proxying in etcdmain may make this OK
+	if cfg.LCUrls != nil && cfg.ACUrls == nil {
+		return ErrUnsetAdvertiseClientURLsFlag
+	}
+
+	return nil
+}
+
+// PeerURLsMapAndToken sets up an initial peer URLsMap and cluster token for bootstrap or discovery.
+func (cfg *Config) PeerURLsMapAndToken(which string) (urlsmap types.URLsMap, token string, err error) {
+	switch {
+	case cfg.Durl != "":
+		urlsmap = types.URLsMap{}
+		// If using discovery, generate a temporary cluster based on
+		// self's advertised peer URLs
+		urlsmap[cfg.Name] = cfg.APUrls
+		token = cfg.Durl
+	case cfg.DNSCluster != "":
+		var clusterStr string
+		clusterStr, token, err = discovery.SRVGetCluster(cfg.Name, cfg.DNSCluster, cfg.InitialClusterToken, cfg.APUrls)
+		if err != nil {
+			return nil, "", err
+		}
+		urlsmap, err = types.NewURLsMap(clusterStr)
+		// only etcd member must belong to the discovered cluster.
+		// proxy does not need to belong to the discovered cluster.
+		if which == "etcd" {
+			if _, ok := urlsmap[cfg.Name]; !ok {
+				return nil, "", fmt.Errorf("cannot find local etcd member %q in SRV records", cfg.Name)
+			}
+		}
+	default:
+		// We're statically configured, and cluster has appropriately been set.
+		urlsmap, err = types.NewURLsMap(cfg.InitialCluster)
+		token = cfg.InitialClusterToken
+	}
+	return urlsmap, token, err
+}
+
+func (cfg Config) InitialClusterFromName(name string) (ret string) {
+	if len(cfg.APUrls) == 0 {
+		return ""
+	}
+	n := name
+	if name == "" {
+		n = DefaultName
+	}
+	for i := range cfg.APUrls {
+		ret = ret + "," + n + "=" + cfg.APUrls[i].String()
+	}
+	return ret[1:]
+}
+
+func (cfg Config) IsNewCluster() bool { return cfg.ClusterState == ClusterStateFlagNew }
+func (cfg Config) ElectionTicks() int { return int(cfg.ElectionMs / cfg.TickMs) }

+ 83 - 0
embed/config_test.go

@@ -0,0 +1,83 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package embed
+
+import (
+	"io/ioutil"
+	"os"
+	"testing"
+
+	"github.com/coreos/etcd/pkg/transport"
+	"github.com/ghodss/yaml"
+)
+
+func TestConfigFileOtherFields(t *testing.T) {
+	ctls := securityConfig{CAFile: "cca", CertFile: "ccert", KeyFile: "ckey"}
+	ptls := securityConfig{CAFile: "pca", CertFile: "pcert", KeyFile: "pkey"}
+	yc := struct {
+		ClientSecurityCfgFile securityConfig `json:"client-transport-security"`
+		PeerSecurityCfgFile   securityConfig `json:"peer-transport-security"`
+		ForceNewCluster       bool           `json:"force-new-cluster"`
+	}{
+		ctls,
+		ptls,
+		true,
+	}
+
+	b, err := yaml.Marshal(&yc)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	tmpfile := mustCreateCfgFile(t, b)
+	defer os.Remove(tmpfile.Name())
+
+	cfg, err := ConfigFromFile(tmpfile.Name())
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if !cfg.ForceNewCluster {
+		t.Errorf("ForceNewCluster = %v, want %v", cfg.ForceNewCluster, true)
+	}
+
+	if !ctls.equals(&cfg.ClientTLSInfo) {
+		t.Errorf("ClientTLS = %v, want %v", cfg.ClientTLSInfo, ctls)
+	}
+	if !ptls.equals(&cfg.PeerTLSInfo) {
+		t.Errorf("PeerTLS = %v, want %v", cfg.PeerTLSInfo, ptls)
+	}
+}
+
+func (s *securityConfig) equals(t *transport.TLSInfo) bool {
+	return s.CAFile == t.CAFile &&
+		s.CertFile == t.CertFile &&
+		s.CertAuth == t.ClientCertAuth &&
+		s.TrustedCAFile == t.TrustedCAFile
+}
+
+func mustCreateCfgFile(t *testing.T, b []byte) *os.File {
+	tmpfile, err := ioutil.TempFile("", "servercfg")
+	if err != nil {
+		t.Fatal(err)
+	}
+	if _, err = tmpfile.Write(b); err != nil {
+		t.Fatal(err)
+	}
+	if err = tmpfile.Close(); err != nil {
+		t.Fatal(err)
+	}
+	return tmpfile
+}

+ 21 - 11
etcdmain/const_unix.go → embed/doc.go

@@ -1,4 +1,4 @@
-// Copyright 2015 The etcd Authors
+// Copyright 2016 The etcd Authors
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
@@ -12,16 +12,26 @@
 // See the License for the specific language governing permissions and
 // See the License for the specific language governing permissions and
 // limitations under the License.
 // limitations under the License.
 
 
-// +build !windows,!plan9
+/*
+Package embed provides bindings for embedding an etcd server in a program.
 
 
-package etcdmain
+Launch an embedded etcd server using the configuration defaults:
 
 
-import (
-	// import procfs FIX godeps.
-	_ "github.com/prometheus/procfs"
-)
+	import (
+		"log"
 
 
-const (
-	defaultMaxSnapshots = 5
-	defaultMaxWALs      = 5
-)
+		"github.com/coreos/etcd/embed"
+	)
+
+	func main() {
+		cfg := embed.NewConfig()
+		cfg.Dir = "default.etcd"
+		e, err := embed.StartEtcd(cfg)
+		if err != nil {
+			log.Fatal(err)
+		}
+		defer e.Close()
+		log.Fatal(<-e.Err())
+	}
+*/
+package embed

+ 302 - 0
embed/etcd.go

@@ -0,0 +1,302 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package embed
+
+import (
+	"crypto/tls"
+	"fmt"
+	"net"
+	"net/http"
+	"path"
+
+	"github.com/coreos/etcd/etcdserver"
+	"github.com/coreos/etcd/etcdserver/api/v2http"
+	"github.com/coreos/etcd/pkg/cors"
+	runtimeutil "github.com/coreos/etcd/pkg/runtime"
+	"github.com/coreos/etcd/pkg/transport"
+	"github.com/coreos/etcd/rafthttp"
+	"github.com/coreos/pkg/capnslog"
+)
+
+var plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "embed")
+
+const (
+	// internal fd usage includes disk usage and transport usage.
+	// To read/write snapshot, snap pkg needs 1. In normal case, wal pkg needs
+	// at most 2 to read/lock/write WALs. One case that it needs to 2 is to
+	// read all logs after some snapshot index, which locates at the end of
+	// the second last and the head of the last. For purging, it needs to read
+	// directory, so it needs 1. For fd monitor, it needs 1.
+	// For transport, rafthttp builds two long-polling connections and at most
+	// four temporary connections with each member. There are at most 9 members
+	// in a cluster, so it should reserve 96.
+	// For the safety, we set the total reserved number to 150.
+	reservedInternalFDNum = 150
+)
+
+// Etcd contains a running etcd server and its listeners.
+type Etcd struct {
+	Peers   []net.Listener
+	Clients []net.Listener
+	Server  *etcdserver.EtcdServer
+
+	cfg   Config
+	errc  chan error
+	sctxs map[string]*serveCtx
+}
+
+// StartEtcd launches the etcd server and HTTP handlers for client/server communication.
+func StartEtcd(inCfg *Config) (e *Etcd, err error) {
+	if err = inCfg.Validate(); err != nil {
+		return nil, err
+	}
+	e = &Etcd{cfg: *inCfg}
+	cfg := &e.cfg
+	defer func() {
+		if err != nil {
+			e.Close()
+			e = nil
+		}
+	}()
+
+	if e.Peers, err = startPeerListeners(cfg); err != nil {
+		return
+	}
+	if e.sctxs, err = startClientListeners(cfg); err != nil {
+		return
+	}
+	for _, sctx := range e.sctxs {
+		e.Clients = append(e.Clients, sctx.l)
+	}
+
+	urlsmap, token, uerr := cfg.PeerURLsMapAndToken("etcd")
+	if uerr != nil {
+		err = fmt.Errorf("error setting up initial cluster: %v", uerr)
+		return
+	}
+
+	srvcfg := &etcdserver.ServerConfig{
+		Name:                    cfg.Name,
+		ClientURLs:              cfg.ACUrls,
+		PeerURLs:                cfg.APUrls,
+		DataDir:                 cfg.Dir,
+		DedicatedWALDir:         cfg.WalDir,
+		SnapCount:               cfg.SnapCount,
+		MaxSnapFiles:            cfg.MaxSnapFiles,
+		MaxWALFiles:             cfg.MaxWalFiles,
+		InitialPeerURLsMap:      urlsmap,
+		InitialClusterToken:     token,
+		DiscoveryURL:            cfg.Durl,
+		DiscoveryProxy:          cfg.Dproxy,
+		NewCluster:              cfg.IsNewCluster(),
+		ForceNewCluster:         cfg.ForceNewCluster,
+		PeerTLSInfo:             cfg.PeerTLSInfo,
+		TickMs:                  cfg.TickMs,
+		ElectionTicks:           cfg.ElectionTicks(),
+		AutoCompactionRetention: cfg.AutoCompactionRetention,
+		QuotaBackendBytes:       cfg.QuotaBackendBytes,
+		StrictReconfigCheck:     cfg.StrictReconfigCheck,
+		EnablePprof:             cfg.EnablePprof,
+	}
+
+	if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
+		return
+	}
+
+	// buffer channel so goroutines on closed connections won't wait forever
+	e.errc = make(chan error, len(e.Peers)+len(e.Clients)+2*len(e.sctxs))
+
+	e.Server.Start()
+	e.serve()
+	<-e.Server.ReadyNotify()
+	return
+}
+
+func (e *Etcd) Close() {
+	for _, sctx := range e.sctxs {
+		sctx.cancel()
+	}
+	for i := range e.Peers {
+		if e.Peers[i] != nil {
+			e.Peers[i].Close()
+		}
+	}
+	for i := range e.Clients {
+		if e.Clients[i] != nil {
+			e.Clients[i].Close()
+		}
+	}
+	if e.Server != nil {
+		e.Server.Stop()
+	}
+}
+
+func (e *Etcd) Err() <-chan error { return e.errc }
+
+func startPeerListeners(cfg *Config) (plns []net.Listener, err error) {
+	if cfg.PeerAutoTLS && cfg.PeerTLSInfo.Empty() {
+		phosts := make([]string, len(cfg.LPUrls))
+		for i, u := range cfg.LPUrls {
+			phosts[i] = u.Host
+		}
+		cfg.PeerTLSInfo, err = transport.SelfCert(path.Join(cfg.Dir, "fixtures/peer"), phosts)
+		if err != nil {
+			plog.Fatalf("could not get certs (%v)", err)
+		}
+	} else if cfg.PeerAutoTLS {
+		plog.Warningf("ignoring peer auto TLS since certs given")
+	}
+
+	if !cfg.PeerTLSInfo.Empty() {
+		plog.Infof("peerTLS: %s", cfg.PeerTLSInfo)
+	}
+
+	plns = make([]net.Listener, len(cfg.LPUrls))
+	defer func() {
+		if err == nil {
+			return
+		}
+		for i := range plns {
+			if plns[i] == nil {
+				continue
+			}
+			plns[i].Close()
+			plog.Info("stopping listening for peers on ", cfg.LPUrls[i].String())
+		}
+	}()
+
+	for i, u := range cfg.LPUrls {
+		var tlscfg *tls.Config
+		if u.Scheme == "http" {
+			if !cfg.PeerTLSInfo.Empty() {
+				plog.Warningf("The scheme of peer url %s is HTTP while peer key/cert files are presented. Ignored peer key/cert files.", u.String())
+			}
+			if cfg.PeerTLSInfo.ClientCertAuth {
+				plog.Warningf("The scheme of peer url %s is HTTP while client cert auth (--peer-client-cert-auth) is enabled. Ignored client cert auth for this url.", u.String())
+			}
+		}
+		if !cfg.PeerTLSInfo.Empty() {
+			if tlscfg, err = cfg.PeerTLSInfo.ServerConfig(); err != nil {
+				return nil, err
+			}
+		}
+		if plns[i], err = rafthttp.NewListener(u, tlscfg); err != nil {
+			return nil, err
+		}
+		plog.Info("listening for peers on ", u.String())
+	}
+	return plns, nil
+}
+
+func startClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) {
+	if cfg.ClientAutoTLS && cfg.ClientTLSInfo.Empty() {
+		chosts := make([]string, len(cfg.LCUrls))
+		for i, u := range cfg.LCUrls {
+			chosts[i] = u.Host
+		}
+		cfg.ClientTLSInfo, err = transport.SelfCert(path.Join(cfg.Dir, "fixtures/client"), chosts)
+		if err != nil {
+			plog.Fatalf("could not get certs (%v)", err)
+		}
+	} else if cfg.ClientAutoTLS {
+		plog.Warningf("ignoring client auto TLS since certs given")
+	}
+
+	sctxs = make(map[string]*serveCtx)
+	for _, u := range cfg.LCUrls {
+		sctx := newServeCtx()
+
+		if u.Scheme == "http" {
+			if !cfg.ClientTLSInfo.Empty() {
+				plog.Warningf("The scheme of client url %s is HTTP while peer key/cert files are presented. Ignored key/cert files.", u.String())
+			}
+			if cfg.ClientTLSInfo.ClientCertAuth {
+				plog.Warningf("The scheme of client url %s is HTTP while client cert auth (--client-cert-auth) is enabled. Ignored client cert auth for this url.", u.String())
+			}
+		}
+		if u.Scheme == "https" && cfg.ClientTLSInfo.Empty() {
+			return nil, fmt.Errorf("TLS key/cert (--cert-file, --key-file) must be provided for client url %s with HTTPs scheme", u.String())
+		}
+
+		sctx.secure = u.Scheme == "https"
+		sctx.insecure = !sctx.secure
+		if oldctx := sctxs[u.Host]; oldctx != nil {
+			oldctx.secure = oldctx.secure || sctx.secure
+			oldctx.insecure = oldctx.insecure || sctx.insecure
+			continue
+		}
+
+		if sctx.l, err = net.Listen("tcp", u.Host); err != nil {
+			return nil, err
+		}
+
+		if fdLimit, fderr := runtimeutil.FDLimit(); fderr == nil {
+			if fdLimit <= reservedInternalFDNum {
+				plog.Fatalf("file descriptor limit[%d] of etcd process is too low, and should be set higher than %d to ensure internal usage", fdLimit, reservedInternalFDNum)
+			}
+			sctx.l = transport.LimitListener(sctx.l, int(fdLimit-reservedInternalFDNum))
+		}
+
+		if sctx.l, err = transport.NewKeepAliveListener(sctx.l, "tcp", nil); err != nil {
+			return nil, err
+		}
+
+		plog.Info("listening for client requests on ", u.Host)
+		defer func() {
+			if err != nil {
+				sctx.l.Close()
+				plog.Info("stopping listening for client requests on ", u.Host)
+			}
+		}()
+		sctxs[u.Host] = sctx
+	}
+	return sctxs, nil
+}
+
+func (e *Etcd) serve() (err error) {
+	var ctlscfg *tls.Config
+	if !e.cfg.ClientTLSInfo.Empty() {
+		plog.Infof("ClientTLS: %s", e.cfg.ClientTLSInfo)
+		if ctlscfg, err = e.cfg.ClientTLSInfo.ServerConfig(); err != nil {
+			return err
+		}
+	}
+
+	if e.cfg.CorsInfo.String() != "" {
+		plog.Infof("cors = %s", e.cfg.CorsInfo)
+	}
+
+	// Start the peer server in a goroutine
+	ph := v2http.NewPeerHandler(e.Server)
+	for _, l := range e.Peers {
+		go func(l net.Listener) {
+			e.errc <- servePeerHTTP(l, ph)
+		}(l)
+	}
+
+	// Start a client server goroutine for each listen address
+	ch := http.Handler(&cors.CORSHandler{
+		Handler: v2http.NewClientHandler(e.Server, e.Server.Cfg.ReqTimeout()),
+		Info:    e.cfg.CorsInfo,
+	})
+	for _, sctx := range e.sctxs {
+		// read timeout does not work with http close notify
+		// TODO: https://github.com/golang/go/issues/9524
+		go func(s *serveCtx) {
+			e.errc <- s.serve(e.Server, ctlscfg, ch, e.errc)
+		}(sctx)
+	}
+	return nil
+}

+ 26 - 18
etcdmain/serve.go → embed/serve.go

@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // See the License for the specific language governing permissions and
 // limitations under the License.
 // limitations under the License.
 
 
-package etcdmain
+package embed
 
 
 import (
 import (
 	"crypto/tls"
 	"crypto/tls"
@@ -37,17 +37,23 @@ import (
 
 
 type serveCtx struct {
 type serveCtx struct {
 	l        net.Listener
 	l        net.Listener
-	host     string
 	secure   bool
 	secure   bool
 	insecure bool
 	insecure bool
+
+	ctx    context.Context
+	cancel context.CancelFunc
+}
+
+func newServeCtx() *serveCtx {
+	ctx, cancel := context.WithCancel(context.Background())
+	return &serveCtx{ctx: ctx, cancel: cancel}
 }
 }
 
 
 // serve accepts incoming connections on the listener l,
 // serve accepts incoming connections on the listener l,
 // creating a new service goroutine for each. The service goroutines
 // creating a new service goroutine for each. The service goroutines
 // read requests and then call handler to reply to them.
 // read requests and then call handler to reply to them.
-func serve(sctx *serveCtx, s *etcdserver.EtcdServer, tlscfg *tls.Config, handler http.Handler) error {
+func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlscfg *tls.Config, handler http.Handler, errc chan<- error) error {
 	logger := defaultLog.New(ioutil.Discard, "etcdhttp", 0)
 	logger := defaultLog.New(ioutil.Discard, "etcdhttp", 0)
-
 	<-s.ReadyNotify()
 	<-s.ReadyNotify()
 	plog.Info("ready to serve client requests")
 	plog.Info("ready to serve client requests")
 
 
@@ -56,12 +62,12 @@ func serve(sctx *serveCtx, s *etcdserver.EtcdServer, tlscfg *tls.Config, handler
 	if sctx.insecure {
 	if sctx.insecure {
 		gs := v3rpc.Server(s, nil)
 		gs := v3rpc.Server(s, nil)
 		grpcl := m.Match(cmux.HTTP2())
 		grpcl := m.Match(cmux.HTTP2())
-		go func() { plog.Fatal(gs.Serve(grpcl)) }()
+		go func() { errc <- gs.Serve(grpcl) }()
 
 
 		opts := []grpc.DialOption{
 		opts := []grpc.DialOption{
 			grpc.WithInsecure(),
 			grpc.WithInsecure(),
 		}
 		}
-		gwmux, err := registerGateway(sctx.l.Addr().String(), opts)
+		gwmux, err := sctx.registerGateway(opts)
 		if err != nil {
 		if err != nil {
 			return err
 			return err
 		}
 		}
@@ -74,8 +80,8 @@ func serve(sctx *serveCtx, s *etcdserver.EtcdServer, tlscfg *tls.Config, handler
 			ErrorLog: logger, // do not log user error
 			ErrorLog: logger, // do not log user error
 		}
 		}
 		httpl := m.Match(cmux.HTTP1())
 		httpl := m.Match(cmux.HTTP1())
-		go func() { plog.Fatal(srvhttp.Serve(httpl)) }()
-		plog.Noticef("serving insecure client requests on %s, this is strongly discouraged!", sctx.host)
+		go func() { errc <- srvhttp.Serve(httpl) }()
+		plog.Noticef("serving insecure client requests on %s, this is strongly discouraged!", sctx.l.Addr().String())
 	}
 	}
 
 
 	if sctx.secure {
 	if sctx.secure {
@@ -87,7 +93,7 @@ func serve(sctx *serveCtx, s *etcdserver.EtcdServer, tlscfg *tls.Config, handler
 		dtls.InsecureSkipVerify = true
 		dtls.InsecureSkipVerify = true
 		creds := credentials.NewTLS(dtls)
 		creds := credentials.NewTLS(dtls)
 		opts := []grpc.DialOption{grpc.WithTransportCredentials(creds)}
 		opts := []grpc.DialOption{grpc.WithTransportCredentials(creds)}
-		gwmux, err := registerGateway(sctx.l.Addr().String(), opts)
+		gwmux, err := sctx.registerGateway(opts)
 		if err != nil {
 		if err != nil {
 			return err
 			return err
 		}
 		}
@@ -102,9 +108,9 @@ func serve(sctx *serveCtx, s *etcdserver.EtcdServer, tlscfg *tls.Config, handler
 			TLSConfig: tlscfg,
 			TLSConfig: tlscfg,
 			ErrorLog:  logger, // do not log user error
 			ErrorLog:  logger, // do not log user error
 		}
 		}
-		go func() { plog.Fatal(srv.Serve(tlsl)) }()
+		go func() { errc <- srv.Serve(tlsl) }()
 
 
-		plog.Infof("serving client requests on %s", sctx.host)
+		plog.Infof("serving client requests on %s", sctx.l.Addr().String())
 	}
 	}
 
 
 	return m.Serve()
 	return m.Serve()
@@ -133,30 +139,32 @@ func servePeerHTTP(l net.Listener, handler http.Handler) error {
 	return srv.Serve(l)
 	return srv.Serve(l)
 }
 }
 
 
-func registerGateway(addr string, opts []grpc.DialOption) (*gw.ServeMux, error) {
+func (sctx *serveCtx) registerGateway(opts []grpc.DialOption) (*gw.ServeMux, error) {
+	ctx := sctx.ctx
+	addr := sctx.l.Addr().String()
 	gwmux := gw.NewServeMux()
 	gwmux := gw.NewServeMux()
 
 
-	err := pb.RegisterKVHandlerFromEndpoint(context.Background(), gwmux, addr, opts)
+	err := pb.RegisterKVHandlerFromEndpoint(ctx, gwmux, addr, opts)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	err = pb.RegisterWatchHandlerFromEndpoint(context.Background(), gwmux, addr, opts)
+	err = pb.RegisterWatchHandlerFromEndpoint(ctx, gwmux, addr, opts)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	err = pb.RegisterLeaseHandlerFromEndpoint(context.Background(), gwmux, addr, opts)
+	err = pb.RegisterLeaseHandlerFromEndpoint(ctx, gwmux, addr, opts)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	err = pb.RegisterClusterHandlerFromEndpoint(context.Background(), gwmux, addr, opts)
+	err = pb.RegisterClusterHandlerFromEndpoint(ctx, gwmux, addr, opts)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	err = pb.RegisterMaintenanceHandlerFromEndpoint(context.Background(), gwmux, addr, opts)
+	err = pb.RegisterMaintenanceHandlerFromEndpoint(ctx, gwmux, addr, opts)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	err = pb.RegisterAuthHandlerFromEndpoint(context.Background(), gwmux, addr, opts)
+	err = pb.RegisterAuthHandlerFromEndpoint(ctx, gwmux, addr, opts)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}

+ 125 - 271
etcdmain/config.go

@@ -20,21 +20,17 @@ import (
 	"flag"
 	"flag"
 	"fmt"
 	"fmt"
 	"io/ioutil"
 	"io/ioutil"
-	"net/url"
 	"os"
 	"os"
 	"runtime"
 	"runtime"
 	"strings"
 	"strings"
 
 
-	"github.com/coreos/etcd/etcdserver"
-	"github.com/coreos/etcd/pkg/cors"
+	"github.com/coreos/etcd/embed"
 	"github.com/coreos/etcd/pkg/flags"
 	"github.com/coreos/etcd/pkg/flags"
-	"github.com/coreos/etcd/pkg/transport"
-	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/version"
 	"github.com/coreos/etcd/version"
 	"github.com/ghodss/yaml"
 	"github.com/ghodss/yaml"
 )
 )
 
 
-const (
+var (
 	proxyFlagOff      = "off"
 	proxyFlagOff      = "off"
 	proxyFlagReadonly = "readonly"
 	proxyFlagReadonly = "readonly"
 	proxyFlagOn       = "on"
 	proxyFlagOn       = "on"
@@ -42,21 +38,6 @@ const (
 	fallbackFlagExit  = "exit"
 	fallbackFlagExit  = "exit"
 	fallbackFlagProxy = "proxy"
 	fallbackFlagProxy = "proxy"
 
 
-	clusterStateFlagNew      = "new"
-	clusterStateFlagExisting = "existing"
-
-	defaultName                     = "default"
-	defaultInitialAdvertisePeerURLs = "http://localhost:2380"
-	defaultAdvertiseClientURLs      = "http://localhost:2379"
-	defaultListenPeerURLs           = "http://localhost:2380"
-	defaultListenClientURLs         = "http://localhost:2379"
-
-	// maxElectionMs specifies the maximum value of election timeout.
-	// More details are listed in ../Documentation/tuning.md#time-parameters.
-	maxElectionMs = 50000
-)
-
-var (
 	ignored = []string{
 	ignored = []string{
 		"cluster-active-size",
 		"cluster-active-size",
 		"cluster-remove-delay",
 		"cluster-remove-delay",
@@ -72,105 +53,60 @@ var (
 		"v",
 		"v",
 		"vv",
 		"vv",
 	}
 	}
-
-	ErrConflictBootstrapFlags = fmt.Errorf("multiple discovery or bootstrap flags are set. " +
-		"Choose one of \"initial-cluster\", \"discovery\" or \"discovery-srv\"")
-	errUnsetAdvertiseClientURLsFlag = fmt.Errorf("--advertise-client-urls is required when --listen-client-urls is set explicitly")
 )
 )
 
 
-type config struct {
-	*flag.FlagSet
-
-	// member
-	corsInfo       *cors.CORSInfo
-	lpurls, lcurls []url.URL
-	Dir            string `json:"data-dir"`
-	WalDir         string `json:"wal-dir"`
-	MaxSnapFiles   uint   `json:"max-snapshots"`
-	MaxWalFiles    uint   `json:"max-wals"`
-	Name           string `json:"name"`
-	SnapCount      uint64 `json:"snapshot-count"`
-	LPUrlsCfgFile  string `json:"listen-peer-urls"`
-	LCUrlsCfgFile  string `json:"listen-client-urls"`
-	CorsCfgFile    string `json:"cors"`
-
-	// TickMs is the number of milliseconds between heartbeat ticks.
-	// TODO: decouple tickMs and heartbeat tick (current heartbeat tick = 1).
-	// make ticks a cluster wide configuration.
-	TickMs            uint  `json:"heartbeat-interval"`
-	ElectionMs        uint  `json:"election-timeout"`
-	QuotaBackendBytes int64 `json:"quota-backend-bytes"`
-
-	// clustering
-	apurls, acurls      []url.URL
-	clusterState        *flags.StringsFlag
-	DNSCluster          string `json:"discovery-srv"`
-	Dproxy              string `json:"discovery-proxy"`
-	Durl                string `json:"discovery"`
-	fallback            *flags.StringsFlag
-	InitialCluster      string `json:"initial-cluster"`
-	InitialClusterToken string `json:"initial-cluster-token"`
-	StrictReconfigCheck bool   `json:"strict-reconfig-check"`
-	ApurlsCfgFile       string `json:"initial-advertise-peer-urls"`
-	AcurlsCfgFile       string `json:"advertise-client-urls"`
-	ClusterStateCfgFile string `json:"initial-cluster-state"`
-	FallbackCfgFile     string `json:"discovery-fallback"`
-
-	// proxy
-	proxy                  *flags.StringsFlag
-	ProxyFailureWaitMs     uint   `json:"proxy-failure-wait"`
-	ProxyRefreshIntervalMs uint   `json:"proxy-refresh-interval"`
-	ProxyDialTimeoutMs     uint   `json:"proxy-dial-timeout"`
-	ProxyWriteTimeoutMs    uint   `json:"proxy-write-timeout"`
-	ProxyReadTimeoutMs     uint   `json:"proxy-read-timeout"`
-	ProxyCfgFile           string `json:"proxy"`
-
-	// security
-	clientTLSInfo, peerTLSInfo transport.TLSInfo
-	ClientAutoTLS              bool
-	PeerAutoTLS                bool
-	ClientSecurityCfgFile      securityConfig `json:"client-transport-security"`
-	PeerSecurityCfgFile        securityConfig `json:"peer-transport-security"`
-
-	// Debug logging
-	Debug        bool   `json:"debug"`
-	LogPkgLevels string `json:"log-package-levels"`
-
-	// ForceNewCluster is unsafe
-	ForceNewCluster bool `json:"force-new-cluster"`
+type configProxy struct {
+	ProxyFailureWaitMs     uint `json:"proxy-failure-wait"`
+	ProxyRefreshIntervalMs uint `json:"proxy-refresh-interval"`
+	ProxyDialTimeoutMs     uint `json:"proxy-dial-timeout"`
+	ProxyWriteTimeoutMs    uint `json:"proxy-write-timeout"`
+	ProxyReadTimeoutMs     uint `json:"proxy-read-timeout"`
+	Fallback               string
+	Proxy                  string
+	ProxyJSON              string `json:"proxy"`
+	FallbackJSON           string `json:"discovery-fallback"`
+}
 
 
+// config holds the config for a command line invocation of etcd
+type config struct {
+	embed.Config
+	configProxy
+	configFlags
+	configFile   string
 	printVersion bool
 	printVersion bool
-
-	autoCompactionRetention int
-
-	enablePprof bool
-
-	configFile string
-
-	ignored []string
+	ignored      []string
 }
 }
 
 
-type securityConfig struct {
-	CAFile        string `json:"ca-file"`
-	CertFile      string `json:"cert-file"`
-	KeyFile       string `json:"key-file"`
-	CertAuth      bool   `json:"client-cert-auth"`
-	TrustedCAFile string `json:"trusted-ca-file"`
-	AutoTLS       bool   `json:"auto-tls"`
+// configFlags has the set of flags used for command line parsing a Config
+type configFlags struct {
+	*flag.FlagSet
+	clusterState *flags.StringsFlag
+	fallback     *flags.StringsFlag
+	proxy        *flags.StringsFlag
 }
 }
 
 
-func NewConfig() *config {
+func newConfig() *config {
 	cfg := &config{
 	cfg := &config{
-		corsInfo: &cors.CORSInfo{},
+		Config: *embed.NewConfig(),
+		configProxy: configProxy{
+			Proxy:                  proxyFlagOff,
+			ProxyFailureWaitMs:     5000,
+			ProxyRefreshIntervalMs: 30000,
+			ProxyDialTimeoutMs:     1000,
+			ProxyWriteTimeoutMs:    5000,
+		},
+		ignored: ignored,
+	}
+	cfg.configFlags = configFlags{
+		FlagSet: flag.NewFlagSet("etcd", flag.ContinueOnError),
 		clusterState: flags.NewStringsFlag(
 		clusterState: flags.NewStringsFlag(
-			clusterStateFlagNew,
-			clusterStateFlagExisting,
+			embed.ClusterStateFlagNew,
+			embed.ClusterStateFlagExisting,
 		),
 		),
 		fallback: flags.NewStringsFlag(
 		fallback: flags.NewStringsFlag(
 			fallbackFlagExit,
 			fallbackFlagExit,
 			fallbackFlagProxy,
 			fallbackFlagProxy,
 		),
 		),
-		ignored: ignored,
 		proxy: flags.NewStringsFlag(
 		proxy: flags.NewStringsFlag(
 			proxyFlagOff,
 			proxyFlagOff,
 			proxyFlagReadonly,
 			proxyFlagReadonly,
@@ -178,7 +114,6 @@ func NewConfig() *config {
 		),
 		),
 	}
 	}
 
 
-	cfg.FlagSet = flag.NewFlagSet("etcd", flag.ContinueOnError)
 	fs := cfg.FlagSet
 	fs := cfg.FlagSet
 	fs.Usage = func() {
 	fs.Usage = func() {
 		fmt.Println(usageline)
 		fmt.Println(usageline)
@@ -187,38 +122,38 @@ func NewConfig() *config {
 	fs.StringVar(&cfg.configFile, "config-file", "", "Path to the server configuration file")
 	fs.StringVar(&cfg.configFile, "config-file", "", "Path to the server configuration file")
 
 
 	// member
 	// member
-	fs.Var(cfg.corsInfo, "cors", "Comma-separated white list of origins for CORS (cross-origin resource sharing).")
-	fs.StringVar(&cfg.Dir, "data-dir", "", "Path to the data directory.")
-	fs.StringVar(&cfg.WalDir, "wal-dir", "", "Path to the dedicated wal directory.")
-	fs.Var(flags.NewURLsValue(defaultListenPeerURLs), "listen-peer-urls", "List of URLs to listen on for peer traffic.")
-	fs.Var(flags.NewURLsValue(defaultListenClientURLs), "listen-client-urls", "List of URLs to listen on for client traffic.")
-	fs.UintVar(&cfg.MaxSnapFiles, "max-snapshots", defaultMaxSnapshots, "Maximum number of snapshot files to retain (0 is unlimited).")
-	fs.UintVar(&cfg.MaxWalFiles, "max-wals", defaultMaxWALs, "Maximum number of wal files to retain (0 is unlimited).")
-	fs.StringVar(&cfg.Name, "name", defaultName, "Human-readable name for this member.")
-	fs.Uint64Var(&cfg.SnapCount, "snapshot-count", etcdserver.DefaultSnapCount, "Number of committed transactions to trigger a snapshot to disk.")
-	fs.UintVar(&cfg.TickMs, "heartbeat-interval", 100, "Time (in milliseconds) of a heartbeat interval.")
-	fs.UintVar(&cfg.ElectionMs, "election-timeout", 1000, "Time (in milliseconds) for an election to timeout.")
-	fs.Int64Var(&cfg.QuotaBackendBytes, "quota-backend-bytes", 0, "Raise alarms when backend size exceeds the given quota. 0 means use the default quota.")
+	fs.Var(cfg.CorsInfo, "cors", "Comma-separated white list of origins for CORS (cross-origin resource sharing).")
+	fs.StringVar(&cfg.Dir, "data-dir", cfg.Dir, "Path to the data directory.")
+	fs.StringVar(&cfg.WalDir, "wal-dir", cfg.WalDir, "Path to the dedicated wal directory.")
+	fs.Var(flags.NewURLsValue(embed.DefaultListenPeerURLs), "listen-peer-urls", "List of URLs to listen on for peer traffic.")
+	fs.Var(flags.NewURLsValue(embed.DefaultListenClientURLs), "listen-client-urls", "List of URLs to listen on for client traffic.")
+	fs.UintVar(&cfg.MaxSnapFiles, "max-snapshots", cfg.MaxSnapFiles, "Maximum number of snapshot files to retain (0 is unlimited).")
+	fs.UintVar(&cfg.MaxWalFiles, "max-wals", cfg.MaxWalFiles, "Maximum number of wal files to retain (0 is unlimited).")
+	fs.StringVar(&cfg.Name, "name", cfg.Name, "Human-readable name for this member.")
+	fs.Uint64Var(&cfg.SnapCount, "snapshot-count", cfg.SnapCount, "Number of committed transactions to trigger a snapshot to disk.")
+	fs.UintVar(&cfg.TickMs, "heartbeat-interval", cfg.TickMs, "Time (in milliseconds) of a heartbeat interval.")
+	fs.UintVar(&cfg.ElectionMs, "election-timeout", cfg.ElectionMs, "Time (in milliseconds) for an election to timeout.")
+	fs.Int64Var(&cfg.QuotaBackendBytes, "quota-backend-bytes", cfg.QuotaBackendBytes, "Raise alarms when backend size exceeds the given quota. 0 means use the default quota.")
 
 
 	// clustering
 	// clustering
-	fs.Var(flags.NewURLsValue(defaultInitialAdvertisePeerURLs), "initial-advertise-peer-urls", "List of this member's peer URLs to advertise to the rest of the cluster.")
-	fs.Var(flags.NewURLsValue(defaultAdvertiseClientURLs), "advertise-client-urls", "List of this member's client URLs to advertise to the public.")
-	fs.StringVar(&cfg.Durl, "discovery", "", "Discovery URL used to bootstrap the cluster.")
+	fs.Var(flags.NewURLsValue(embed.DefaultInitialAdvertisePeerURLs), "initial-advertise-peer-urls", "List of this member's peer URLs to advertise to the rest of the cluster.")
+	fs.Var(flags.NewURLsValue(embed.DefaultAdvertiseClientURLs), "advertise-client-urls", "List of this member's client URLs to advertise to the public.")
+	fs.StringVar(&cfg.Durl, "discovery", cfg.Durl, "Discovery URL used to bootstrap the cluster.")
 	fs.Var(cfg.fallback, "discovery-fallback", fmt.Sprintf("Valid values include %s", strings.Join(cfg.fallback.Values, ", ")))
 	fs.Var(cfg.fallback, "discovery-fallback", fmt.Sprintf("Valid values include %s", strings.Join(cfg.fallback.Values, ", ")))
 	if err := cfg.fallback.Set(fallbackFlagProxy); err != nil {
 	if err := cfg.fallback.Set(fallbackFlagProxy); err != nil {
 		// Should never happen.
 		// Should never happen.
 		plog.Panicf("unexpected error setting up discovery-fallback flag: %v", err)
 		plog.Panicf("unexpected error setting up discovery-fallback flag: %v", err)
 	}
 	}
-	fs.StringVar(&cfg.Dproxy, "discovery-proxy", "", "HTTP proxy to use for traffic to discovery service.")
-	fs.StringVar(&cfg.DNSCluster, "discovery-srv", "", "DNS domain used to bootstrap initial cluster.")
-	fs.StringVar(&cfg.InitialCluster, "initial-cluster", initialClusterFromName(defaultName), "Initial cluster configuration for bootstrapping.")
-	fs.StringVar(&cfg.InitialClusterToken, "initial-cluster-token", "etcd-cluster", "Initial cluster token for the etcd cluster during bootstrap.")
+	fs.StringVar(&cfg.Dproxy, "discovery-proxy", cfg.Dproxy, "HTTP proxy to use for traffic to discovery service.")
+	fs.StringVar(&cfg.DNSCluster, "discovery-srv", cfg.DNSCluster, "DNS domain used to bootstrap initial cluster.")
+	fs.StringVar(&cfg.InitialCluster, "initial-cluster", cfg.InitialCluster, "Initial cluster configuration for bootstrapping.")
+	fs.StringVar(&cfg.InitialClusterToken, "initial-cluster-token", cfg.InitialClusterToken, "Initial cluster token for the etcd cluster during bootstrap.")
 	fs.Var(cfg.clusterState, "initial-cluster-state", "Initial cluster state ('new' or 'existing').")
 	fs.Var(cfg.clusterState, "initial-cluster-state", "Initial cluster state ('new' or 'existing').")
-	if err := cfg.clusterState.Set(clusterStateFlagNew); err != nil {
+	if err := cfg.clusterState.Set(embed.ClusterStateFlagNew); err != nil {
 		// Should never happen.
 		// Should never happen.
 		plog.Panicf("unexpected error setting up clusterStateFlag: %v", err)
 		plog.Panicf("unexpected error setting up clusterStateFlag: %v", err)
 	}
 	}
-	fs.BoolVar(&cfg.StrictReconfigCheck, "strict-reconfig-check", false, "Reject reconfiguration requests that would cause quorum loss.")
+	fs.BoolVar(&cfg.StrictReconfigCheck, "strict-reconfig-check", cfg.StrictReconfigCheck, "Reject reconfiguration requests that would cause quorum loss.")
 
 
 	// proxy
 	// proxy
 	fs.Var(cfg.proxy, "proxy", fmt.Sprintf("Valid values include %s", strings.Join(cfg.proxy.Values, ", ")))
 	fs.Var(cfg.proxy, "proxy", fmt.Sprintf("Valid values include %s", strings.Join(cfg.proxy.Values, ", ")))
@@ -226,24 +161,24 @@ func NewConfig() *config {
 		// Should never happen.
 		// Should never happen.
 		plog.Panicf("unexpected error setting up proxyFlag: %v", err)
 		plog.Panicf("unexpected error setting up proxyFlag: %v", err)
 	}
 	}
-	fs.UintVar(&cfg.ProxyFailureWaitMs, "proxy-failure-wait", 5000, "Time (in milliseconds) an endpoint will be held in a failed state.")
-	fs.UintVar(&cfg.ProxyRefreshIntervalMs, "proxy-refresh-interval", 30000, "Time (in milliseconds) of the endpoints refresh interval.")
-	fs.UintVar(&cfg.ProxyDialTimeoutMs, "proxy-dial-timeout", 1000, "Time (in milliseconds) for a dial to timeout.")
-	fs.UintVar(&cfg.ProxyWriteTimeoutMs, "proxy-write-timeout", 5000, "Time (in milliseconds) for a write to timeout.")
-	fs.UintVar(&cfg.ProxyReadTimeoutMs, "proxy-read-timeout", 0, "Time (in milliseconds) for a read to timeout.")
+	fs.UintVar(&cfg.ProxyFailureWaitMs, "proxy-failure-wait", cfg.ProxyFailureWaitMs, "Time (in milliseconds) an endpoint will be held in a failed state.")
+	fs.UintVar(&cfg.ProxyRefreshIntervalMs, "proxy-refresh-interval", cfg.ProxyRefreshIntervalMs, "Time (in milliseconds) of the endpoints refresh interval.")
+	fs.UintVar(&cfg.ProxyDialTimeoutMs, "proxy-dial-timeout", cfg.ProxyDialTimeoutMs, "Time (in milliseconds) for a dial to timeout.")
+	fs.UintVar(&cfg.ProxyWriteTimeoutMs, "proxy-write-timeout", cfg.ProxyWriteTimeoutMs, "Time (in milliseconds) for a write to timeout.")
+	fs.UintVar(&cfg.ProxyReadTimeoutMs, "proxy-read-timeout", cfg.ProxyReadTimeoutMs, "Time (in milliseconds) for a read to timeout.")
 
 
 	// security
 	// security
-	fs.StringVar(&cfg.clientTLSInfo.CAFile, "ca-file", "", "DEPRECATED: Path to the client server TLS CA file.")
-	fs.StringVar(&cfg.clientTLSInfo.CertFile, "cert-file", "", "Path to the client server TLS cert file.")
-	fs.StringVar(&cfg.clientTLSInfo.KeyFile, "key-file", "", "Path to the client server TLS key file.")
-	fs.BoolVar(&cfg.clientTLSInfo.ClientCertAuth, "client-cert-auth", false, "Enable client cert authentication.")
-	fs.StringVar(&cfg.clientTLSInfo.TrustedCAFile, "trusted-ca-file", "", "Path to the client server TLS trusted CA key file.")
+	fs.StringVar(&cfg.ClientTLSInfo.CAFile, "ca-file", "", "DEPRECATED: Path to the client server TLS CA file.")
+	fs.StringVar(&cfg.ClientTLSInfo.CertFile, "cert-file", "", "Path to the client server TLS cert file.")
+	fs.StringVar(&cfg.ClientTLSInfo.KeyFile, "key-file", "", "Path to the client server TLS key file.")
+	fs.BoolVar(&cfg.ClientTLSInfo.ClientCertAuth, "client-cert-auth", false, "Enable client cert authentication.")
+	fs.StringVar(&cfg.ClientTLSInfo.TrustedCAFile, "trusted-ca-file", "", "Path to the client server TLS trusted CA key file.")
 	fs.BoolVar(&cfg.ClientAutoTLS, "auto-tls", false, "Client TLS using generated certificates")
 	fs.BoolVar(&cfg.ClientAutoTLS, "auto-tls", false, "Client TLS using generated certificates")
-	fs.StringVar(&cfg.peerTLSInfo.CAFile, "peer-ca-file", "", "DEPRECATED: Path to the peer server TLS CA file.")
-	fs.StringVar(&cfg.peerTLSInfo.CertFile, "peer-cert-file", "", "Path to the peer server TLS cert file.")
-	fs.StringVar(&cfg.peerTLSInfo.KeyFile, "peer-key-file", "", "Path to the peer server TLS key file.")
-	fs.BoolVar(&cfg.peerTLSInfo.ClientCertAuth, "peer-client-cert-auth", false, "Enable peer client cert authentication.")
-	fs.StringVar(&cfg.peerTLSInfo.TrustedCAFile, "peer-trusted-ca-file", "", "Path to the peer server TLS trusted CA file.")
+	fs.StringVar(&cfg.PeerTLSInfo.CAFile, "peer-ca-file", "", "DEPRECATED: Path to the peer server TLS CA file.")
+	fs.StringVar(&cfg.PeerTLSInfo.CertFile, "peer-cert-file", "", "Path to the peer server TLS cert file.")
+	fs.StringVar(&cfg.PeerTLSInfo.KeyFile, "peer-key-file", "", "Path to the peer server TLS key file.")
+	fs.BoolVar(&cfg.PeerTLSInfo.ClientCertAuth, "peer-client-cert-auth", false, "Enable peer client cert authentication.")
+	fs.StringVar(&cfg.PeerTLSInfo.TrustedCAFile, "peer-trusted-ca-file", "", "Path to the peer server TLS trusted CA file.")
 	fs.BoolVar(&cfg.PeerAutoTLS, "peer-auto-tls", false, "Peer TLS using generated certificates")
 	fs.BoolVar(&cfg.PeerAutoTLS, "peer-auto-tls", false, "Peer TLS using generated certificates")
 
 
 	// logging
 	// logging
@@ -256,10 +191,10 @@ func NewConfig() *config {
 	// version
 	// version
 	fs.BoolVar(&cfg.printVersion, "version", false, "Print the version and exit.")
 	fs.BoolVar(&cfg.printVersion, "version", false, "Print the version and exit.")
 
 
-	fs.IntVar(&cfg.autoCompactionRetention, "auto-compaction-retention", 0, "Auto compaction retention for mvcc key value store in hour. 0 means disable auto compaction.")
+	fs.IntVar(&cfg.AutoCompactionRetention, "auto-compaction-retention", 0, "Auto compaction retention for mvcc key value store in hour. 0 means disable auto compaction.")
 
 
 	// pprof profiler via HTTP
 	// pprof profiler via HTTP
-	fs.BoolVar(&cfg.enablePprof, "enable-pprof", false, "Enable runtime profiling data via HTTP server. Address is at client URL + \"/debug/pprof/\"")
+	fs.BoolVar(&cfg.EnablePprof, "enable-pprof", false, "Enable runtime profiling data via HTTP server. Address is at client URL + \"/debug/pprof/\"")
 
 
 	// ignored
 	// ignored
 	for _, f := range cfg.ignored {
 	for _, f := range cfg.ignored {
@@ -268,7 +203,7 @@ func NewConfig() *config {
 	return cfg
 	return cfg
 }
 }
 
 
-func (cfg *config) Parse(arguments []string) error {
+func (cfg *config) parse(arguments []string) error {
 	perr := cfg.FlagSet.Parse(arguments)
 	perr := cfg.FlagSet.Parse(arguments)
 	switch perr {
 	switch perr {
 	case nil:
 	case nil:
@@ -293,11 +228,10 @@ func (cfg *config) Parse(arguments []string) error {
 	var err error
 	var err error
 	if cfg.configFile != "" {
 	if cfg.configFile != "" {
 		plog.Infof("Loading server configuration from %q", cfg.configFile)
 		plog.Infof("Loading server configuration from %q", cfg.configFile)
-		err = cfg.configFromFile()
+		err = cfg.configFromFile(cfg.configFile)
 	} else {
 	} else {
 		err = cfg.configFromCmdLine()
 		err = cfg.configFromCmdLine()
 	}
 	}
-
 	return err
 	return err
 }
 }
 
 
@@ -307,152 +241,72 @@ func (cfg *config) configFromCmdLine() error {
 		plog.Fatalf("%v", err)
 		plog.Fatalf("%v", err)
 	}
 	}
 
 
-	cfg.lpurls = flags.URLsFromFlag(cfg.FlagSet, "listen-peer-urls")
-	cfg.apurls = flags.URLsFromFlag(cfg.FlagSet, "initial-advertise-peer-urls")
-	cfg.lcurls = flags.URLsFromFlag(cfg.FlagSet, "listen-client-urls")
-	cfg.acurls = flags.URLsFromFlag(cfg.FlagSet, "advertise-client-urls")
-
-	return cfg.validateConfig(func(field string) bool {
-		return flags.IsSet(cfg.FlagSet, field)
-	})
-}
-
-func (cfg *config) configFromFile() error {
-	b, err := ioutil.ReadFile(cfg.configFile)
-	if err != nil {
-		return err
+	cfg.LPUrls = flags.URLsFromFlag(cfg.FlagSet, "listen-peer-urls")
+	cfg.APUrls = flags.URLsFromFlag(cfg.FlagSet, "initial-advertise-peer-urls")
+	cfg.LCUrls = flags.URLsFromFlag(cfg.FlagSet, "listen-client-urls")
+	cfg.ACUrls = flags.URLsFromFlag(cfg.FlagSet, "advertise-client-urls")
+	cfg.ClusterState = cfg.clusterState.String()
+	cfg.Fallback = cfg.fallback.String()
+	cfg.Proxy = cfg.proxy.String()
+
+	// disable default advertise-client-urls if lcurls is set
+	missingAC := flags.IsSet(cfg.FlagSet, "listen-client-urls") && !flags.IsSet(cfg.FlagSet, "advertise-client-urls")
+	if !cfg.mayBeProxy() && missingAC {
+		cfg.ACUrls = nil
 	}
 	}
 
 
-	err = yaml.Unmarshal(b, cfg)
-	if err != nil {
-		return err
+	// disable default initial-cluster if discovery is set
+	if (cfg.Durl != "" || cfg.DNSCluster != "") && !flags.IsSet(cfg.FlagSet, "initial-cluster") {
+		cfg.InitialCluster = ""
 	}
 	}
 
 
-	if cfg.LPUrlsCfgFile != "" {
-		u, err := types.NewURLs(strings.Split(cfg.LPUrlsCfgFile, ","))
-		if err != nil {
-			plog.Fatalf("unexpected error setting up listen-peer-urls: %v", err)
-		}
-		cfg.lpurls = []url.URL(u)
-	}
-
-	if cfg.LCUrlsCfgFile != "" {
-		u, err := types.NewURLs(strings.Split(cfg.LCUrlsCfgFile, ","))
-		if err != nil {
-			plog.Fatalf("unexpected error setting up listen-client-urls: %v", err)
-		}
-		cfg.lcurls = []url.URL(u)
-	}
-
-	if cfg.CorsCfgFile != "" {
-		if err := cfg.corsInfo.Set(cfg.CorsCfgFile); err != nil {
-			plog.Panicf("unexpected error setting up cors: %v", err)
-		}
-	}
+	return cfg.validate()
+}
 
 
-	if cfg.ApurlsCfgFile != "" {
-		u, err := types.NewURLs(strings.Split(cfg.ApurlsCfgFile, ","))
-		if err != nil {
-			plog.Fatalf("unexpected error setting up initial-advertise-peer-urls: %v", err)
-		}
-		cfg.apurls = []url.URL(u)
+func (cfg *config) configFromFile(path string) error {
+	eCfg, err := embed.ConfigFromFile(path)
+	if err != nil {
+		return err
 	}
 	}
+	cfg.Config = *eCfg
 
 
-	if cfg.AcurlsCfgFile != "" {
-		u, err := types.NewURLs(strings.Split(cfg.AcurlsCfgFile, ","))
-		if err != nil {
-			plog.Fatalf("unexpected error setting up advertise-peer-urls: %v", err)
-		}
-		cfg.acurls = []url.URL(u)
+	// load extra config information
+	b, rerr := ioutil.ReadFile(path)
+	if rerr != nil {
+		return rerr
 	}
 	}
-
-	if cfg.ClusterStateCfgFile != "" {
-		if err := cfg.clusterState.Set(cfg.ClusterStateCfgFile); err != nil {
-			plog.Panicf("unexpected error setting up clusterStateFlag: %v", err)
-		}
+	if yerr := yaml.Unmarshal(b, &cfg.configProxy); yerr != nil {
+		return yerr
 	}
 	}
-
-	if cfg.FallbackCfgFile != "" {
-		if err := cfg.fallback.Set(cfg.FallbackCfgFile); err != nil {
+	if cfg.FallbackJSON != "" {
+		if err := cfg.fallback.Set(cfg.FallbackJSON); err != nil {
 			plog.Panicf("unexpected error setting up discovery-fallback flag: %v", err)
 			plog.Panicf("unexpected error setting up discovery-fallback flag: %v", err)
 		}
 		}
+		cfg.Fallback = cfg.fallback.String()
 	}
 	}
-
-	if cfg.ProxyCfgFile != "" {
-		if err := cfg.proxy.Set(cfg.ProxyCfgFile); err != nil {
+	if cfg.ProxyJSON != "" {
+		if err := cfg.proxy.Set(cfg.ProxyJSON); err != nil {
 			plog.Panicf("unexpected error setting up proxyFlag: %v", err)
 			plog.Panicf("unexpected error setting up proxyFlag: %v", err)
 		}
 		}
+		cfg.Proxy = cfg.proxy.String()
 	}
 	}
-
-	copySecurityDetails := func(tls *transport.TLSInfo, ysc *securityConfig) {
-		tls.CAFile = ysc.CAFile
-		tls.CertFile = ysc.CertFile
-		tls.KeyFile = ysc.KeyFile
-		tls.ClientCertAuth = ysc.CertAuth
-		tls.TrustedCAFile = ysc.TrustedCAFile
-	}
-	copySecurityDetails(&cfg.clientTLSInfo, &cfg.ClientSecurityCfgFile)
-	copySecurityDetails(&cfg.peerTLSInfo, &cfg.PeerSecurityCfgFile)
-	cfg.ClientAutoTLS = cfg.ClientSecurityCfgFile.AutoTLS
-	cfg.PeerAutoTLS = cfg.PeerSecurityCfgFile.AutoTLS
-
-	fieldsToBeChecked := map[string]bool{
-		"discovery":             (cfg.Durl != ""),
-		"listen-client-urls":    (cfg.LCUrlsCfgFile != ""),
-		"advertise-client-urls": (cfg.AcurlsCfgFile != ""),
-		"initial-cluster":       (cfg.InitialCluster != ""),
-		"discovery-srv":         (cfg.DNSCluster != ""),
-	}
-
-	return cfg.validateConfig(func(field string) bool {
-		return fieldsToBeChecked[field]
-	})
+	return nil
 }
 }
 
 
-func (cfg *config) validateConfig(isSet func(field string) bool) error {
-	// when etcd runs in member mode user needs to set --advertise-client-urls if --listen-client-urls is set.
-	// TODO(yichengq): check this for joining through discovery service case
-	mayFallbackToProxy := isSet("discovery") && cfg.fallback.String() == fallbackFlagProxy
-	mayBeProxy := cfg.proxy.String() != proxyFlagOff || mayFallbackToProxy
-	if !mayBeProxy {
-		if isSet("listen-client-urls") && !isSet("advertise-client-urls") {
-			return errUnsetAdvertiseClientURLsFlag
-		}
-	}
-
-	// Check if conflicting flags are passed.
-	nSet := 0
-	for _, v := range []bool{isSet("discovery"), isSet("initial-cluster"), isSet("discovery-srv")} {
-		if v {
-			nSet++
-		}
-	}
-
-	if nSet > 1 {
-		return ErrConflictBootstrapFlags
-	}
-
-	if 5*cfg.TickMs > cfg.ElectionMs {
-		return fmt.Errorf("--election-timeout[%vms] should be at least as 5 times as --heartbeat-interval[%vms]", cfg.ElectionMs, cfg.TickMs)
-	}
-	if cfg.ElectionMs > maxElectionMs {
-		return fmt.Errorf("--election-timeout[%vms] is too long, and should be set less than %vms", cfg.ElectionMs, maxElectionMs)
-	}
-
-	return nil
+func (cfg *config) mayBeProxy() bool {
+	mayFallbackToProxy := cfg.Durl != "" && cfg.Fallback == fallbackFlagProxy
+	return cfg.Proxy != proxyFlagOff || mayFallbackToProxy
 }
 }
 
 
-func initialClusterFromName(name string) string {
-	n := name
-	if name == "" {
-		n = defaultName
+func (cfg *config) validate() error {
+	err := cfg.Config.Validate()
+	// TODO(yichengq): check this for joining through discovery service case
+	if err == embed.ErrUnsetAdvertiseClientURLsFlag && cfg.mayBeProxy() {
+		return nil
 	}
 	}
-	return fmt.Sprintf("%s=http://localhost:2380", n)
+	return err
 }
 }
 
 
-func (cfg config) isNewCluster() bool          { return cfg.clusterState.String() == clusterStateFlagNew }
 func (cfg config) isProxy() bool               { return cfg.proxy.String() != proxyFlagOff }
 func (cfg config) isProxy() bool               { return cfg.proxy.String() != proxyFlagOff }
 func (cfg config) isReadonlyProxy() bool       { return cfg.proxy.String() == proxyFlagReadonly }
 func (cfg config) isReadonlyProxy() bool       { return cfg.proxy.String() == proxyFlagReadonly }
 func (cfg config) shouldFallbackToProxy() bool { return cfg.fallback.String() == fallbackFlagProxy }
 func (cfg config) shouldFallbackToProxy() bool { return cfg.fallback.String() == fallbackFlagProxy }
-
-func (cfg config) electionTicks() int { return int(cfg.ElectionMs / cfg.TickMs) }

+ 57 - 101
etcdmain/config_test.go

@@ -23,6 +23,7 @@ import (
 	"strings"
 	"strings"
 	"testing"
 	"testing"
 
 
+	"github.com/coreos/etcd/embed"
 	"github.com/ghodss/yaml"
 	"github.com/ghodss/yaml"
 )
 )
 
 
@@ -39,8 +40,8 @@ func TestConfigParsingMemberFlags(t *testing.T) {
 		"-advertise-client-urls=http://localhost:7000,https://localhost:7001",
 		"-advertise-client-urls=http://localhost:7000,https://localhost:7001",
 	}
 	}
 
 
-	cfg := NewConfig()
-	err := cfg.Parse(args)
+	cfg := newConfig()
+	err := cfg.parse(args)
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
@@ -81,9 +82,8 @@ func TestConfigFileMemberFields(t *testing.T) {
 		fmt.Sprintf("--config-file=%s", tmpfile.Name()),
 		fmt.Sprintf("--config-file=%s", tmpfile.Name()),
 	}
 	}
 
 
-	cfg := NewConfig()
-	err = cfg.Parse(args)
-	if err != nil {
+	cfg := newConfig()
+	if err = cfg.parse(args); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
 
 
@@ -100,9 +100,8 @@ func TestConfigParsingClusteringFlags(t *testing.T) {
 		"-discovery-fallback=exit",
 		"-discovery-fallback=exit",
 	}
 	}
 
 
-	cfg := NewConfig()
-	err := cfg.Parse(args)
-	if err != nil {
+	cfg := newConfig()
+	if err := cfg.parse(args); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
 
 
@@ -137,8 +136,8 @@ func TestConfigFileClusteringFields(t *testing.T) {
 	args := []string{
 	args := []string{
 		fmt.Sprintf("--config-file=%s", tmpfile.Name()),
 		fmt.Sprintf("--config-file=%s", tmpfile.Name()),
 	}
 	}
-	cfg := NewConfig()
-	err = cfg.Parse(args)
+	cfg := newConfig()
+	err = cfg.parse(args)
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
@@ -147,19 +146,10 @@ func TestConfigFileClusteringFields(t *testing.T) {
 }
 }
 
 
 func TestConfigParsingOtherFlags(t *testing.T) {
 func TestConfigParsingOtherFlags(t *testing.T) {
-	args := []string{
-		"-proxy=readonly",
-		"-ca-file=cafile",
-		"-cert-file=certfile",
-		"-key-file=keyfile",
-		"-peer-ca-file=peercafile",
-		"-peer-cert-file=peercertfile",
-		"-peer-key-file=peerkeyfile",
-		"-force-new-cluster=true",
-	}
-
-	cfg := NewConfig()
-	err := cfg.Parse(args)
+	args := []string{"-proxy=readonly"}
+
+	cfg := newConfig()
+	err := cfg.parse(args)
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
@@ -169,23 +159,9 @@ func TestConfigParsingOtherFlags(t *testing.T) {
 
 
 func TestConfigFileOtherFields(t *testing.T) {
 func TestConfigFileOtherFields(t *testing.T) {
 	yc := struct {
 	yc := struct {
-		ProxyCfgFile          string         `json:"proxy"`
-		ClientSecurityCfgFile securityConfig `json:"client-transport-security"`
-		PeerSecurityCfgFile   securityConfig `json:"peer-transport-security"`
-		ForceNewCluster       bool           `json:"force-new-cluster"`
+		ProxyCfgFile string `json:"proxy"`
 	}{
 	}{
 		"readonly",
 		"readonly",
-		securityConfig{
-			CAFile:   "cafile",
-			CertFile: "certfile",
-			KeyFile:  "keyfile",
-		},
-		securityConfig{
-			CAFile:   "peercafile",
-			CertFile: "peercertfile",
-			KeyFile:  "peerkeyfile",
-		},
-		true,
 	}
 	}
 
 
 	b, err := yaml.Marshal(&yc)
 	b, err := yaml.Marshal(&yc)
@@ -200,8 +176,8 @@ func TestConfigFileOtherFields(t *testing.T) {
 		fmt.Sprintf("--config-file=%s", tmpfile.Name()),
 		fmt.Sprintf("--config-file=%s", tmpfile.Name()),
 	}
 	}
 
 
-	cfg := NewConfig()
-	err = cfg.Parse(args)
+	cfg := newConfig()
+	err = cfg.parse(args)
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
@@ -231,10 +207,9 @@ func TestConfigParsingConflictClusteringFlags(t *testing.T) {
 	}
 	}
 
 
 	for i, tt := range conflictArgs {
 	for i, tt := range conflictArgs {
-		cfg := NewConfig()
-		err := cfg.Parse(tt)
-		if err != ErrConflictBootstrapFlags {
-			t.Errorf("%d: err = %v, want %v", i, err, ErrConflictBootstrapFlags)
+		cfg := newConfig()
+		if err := cfg.parse(tt); err != embed.ErrConflictBootstrapFlags {
+			t.Errorf("%d: err = %v, want %v", i, err, embed.ErrConflictBootstrapFlags)
 		}
 		}
 	}
 	}
 }
 }
@@ -277,10 +252,9 @@ func TestConfigFileConflictClusteringFlags(t *testing.T) {
 			fmt.Sprintf("--config-file=%s", tmpfile.Name()),
 			fmt.Sprintf("--config-file=%s", tmpfile.Name()),
 		}
 		}
 
 
-		cfg := NewConfig()
-		err = cfg.Parse(args)
-		if err != ErrConflictBootstrapFlags {
-			t.Errorf("%d: err = %v, want %v", i, err, ErrConflictBootstrapFlags)
+		cfg := newConfig()
+		if err := cfg.parse(args); err != embed.ErrConflictBootstrapFlags {
+			t.Errorf("%d: err = %v, want %v", i, err, embed.ErrConflictBootstrapFlags)
 		}
 		}
 	}
 	}
 }
 }
@@ -295,14 +269,14 @@ func TestConfigParsingMissedAdvertiseClientURLsFlag(t *testing.T) {
 				"-initial-cluster=infra1=http://127.0.0.1:2380",
 				"-initial-cluster=infra1=http://127.0.0.1:2380",
 				"-listen-client-urls=http://127.0.0.1:2379",
 				"-listen-client-urls=http://127.0.0.1:2379",
 			},
 			},
-			errUnsetAdvertiseClientURLsFlag,
+			embed.ErrUnsetAdvertiseClientURLsFlag,
 		},
 		},
 		{
 		{
 			[]string{
 			[]string{
 				"-discovery-srv=example.com",
 				"-discovery-srv=example.com",
 				"-listen-client-urls=http://127.0.0.1:2379",
 				"-listen-client-urls=http://127.0.0.1:2379",
 			},
 			},
-			errUnsetAdvertiseClientURLsFlag,
+			embed.ErrUnsetAdvertiseClientURLsFlag,
 		},
 		},
 		{
 		{
 			[]string{
 			[]string{
@@ -310,13 +284,13 @@ func TestConfigParsingMissedAdvertiseClientURLsFlag(t *testing.T) {
 				"-discovery-fallback=exit",
 				"-discovery-fallback=exit",
 				"-listen-client-urls=http://127.0.0.1:2379",
 				"-listen-client-urls=http://127.0.0.1:2379",
 			},
 			},
-			errUnsetAdvertiseClientURLsFlag,
+			embed.ErrUnsetAdvertiseClientURLsFlag,
 		},
 		},
 		{
 		{
 			[]string{
 			[]string{
 				"-listen-client-urls=http://127.0.0.1:2379",
 				"-listen-client-urls=http://127.0.0.1:2379",
 			},
 			},
-			errUnsetAdvertiseClientURLsFlag,
+			embed.ErrUnsetAdvertiseClientURLsFlag,
 		},
 		},
 		{
 		{
 			[]string{
 			[]string{
@@ -342,9 +316,8 @@ func TestConfigParsingMissedAdvertiseClientURLsFlag(t *testing.T) {
 	}
 	}
 
 
 	for i, tt := range tests {
 	for i, tt := range tests {
-		cfg := NewConfig()
-		err := cfg.Parse(tt.args)
-		if err != tt.werr {
+		cfg := newConfig()
+		if err := cfg.parse(tt.args); err != tt.werr {
 			t.Errorf("%d: err = %v, want %v", i, err, tt.werr)
 			t.Errorf("%d: err = %v, want %v", i, err, tt.werr)
 		}
 		}
 	}
 	}
@@ -355,15 +328,16 @@ func TestConfigIsNewCluster(t *testing.T) {
 		state  string
 		state  string
 		wIsNew bool
 		wIsNew bool
 	}{
 	}{
-		{clusterStateFlagExisting, false},
-		{clusterStateFlagNew, true},
+		{embed.ClusterStateFlagExisting, false},
+		{embed.ClusterStateFlagNew, true},
 	}
 	}
 	for i, tt := range tests {
 	for i, tt := range tests {
-		cfg := NewConfig()
-		if err := cfg.clusterState.Set(tt.state); err != nil {
+		cfg := newConfig()
+		args := []string{"--initial-cluster-state", tests[i].state}
+		if err := cfg.parse(args); err != nil {
 			t.Fatalf("#%d: unexpected clusterState.Set error: %v", i, err)
 			t.Fatalf("#%d: unexpected clusterState.Set error: %v", i, err)
 		}
 		}
-		if g := cfg.isNewCluster(); g != tt.wIsNew {
+		if g := cfg.IsNewCluster(); g != tt.wIsNew {
 			t.Errorf("#%d: isNewCluster = %v, want %v", i, g, tt.wIsNew)
 			t.Errorf("#%d: isNewCluster = %v, want %v", i, g, tt.wIsNew)
 		}
 		}
 	}
 	}
@@ -379,7 +353,7 @@ func TestConfigIsProxy(t *testing.T) {
 		{proxyFlagOn, true},
 		{proxyFlagOn, true},
 	}
 	}
 	for i, tt := range tests {
 	for i, tt := range tests {
-		cfg := NewConfig()
+		cfg := newConfig()
 		if err := cfg.proxy.Set(tt.proxy); err != nil {
 		if err := cfg.proxy.Set(tt.proxy); err != nil {
 			t.Fatalf("#%d: unexpected proxy.Set error: %v", i, err)
 			t.Fatalf("#%d: unexpected proxy.Set error: %v", i, err)
 		}
 		}
@@ -399,7 +373,7 @@ func TestConfigIsReadonlyProxy(t *testing.T) {
 		{proxyFlagOn, false},
 		{proxyFlagOn, false},
 	}
 	}
 	for i, tt := range tests {
 	for i, tt := range tests {
-		cfg := NewConfig()
+		cfg := newConfig()
 		if err := cfg.proxy.Set(tt.proxy); err != nil {
 		if err := cfg.proxy.Set(tt.proxy); err != nil {
 			t.Fatalf("#%d: unexpected proxy.Set error: %v", i, err)
 			t.Fatalf("#%d: unexpected proxy.Set error: %v", i, err)
 		}
 		}
@@ -418,7 +392,7 @@ func TestConfigShouldFallbackToProxy(t *testing.T) {
 		{fallbackFlagExit, false},
 		{fallbackFlagExit, false},
 	}
 	}
 	for i, tt := range tests {
 	for i, tt := range tests {
-		cfg := NewConfig()
+		cfg := newConfig()
 		if err := cfg.fallback.Set(tt.fallback); err != nil {
 		if err := cfg.fallback.Set(tt.fallback); err != nil {
 			t.Fatalf("#%d: unexpected fallback.Set error: %v", i, err)
 			t.Fatalf("#%d: unexpected fallback.Set error: %v", i, err)
 		}
 		}
@@ -458,9 +432,8 @@ func TestConfigFileElectionTimeout(t *testing.T) {
 			fmt.Sprintf("--config-file=%s", tmpfile.Name()),
 			fmt.Sprintf("--config-file=%s", tmpfile.Name()),
 		}
 		}
 
 
-		cfg := NewConfig()
-		err = cfg.Parse(args)
-		if !strings.Contains(err.Error(), tt.errStr) {
+		cfg := newConfig()
+		if err := cfg.parse(args); err == nil || !strings.Contains(err.Error(), tt.errStr) {
 			t.Errorf("%d: Wrong err = %v", i, err)
 			t.Errorf("%d: Wrong err = %v", i, err)
 		}
 		}
 	}
 	}
@@ -485,10 +458,10 @@ func mustCreateCfgFile(t *testing.T, b []byte) *os.File {
 }
 }
 
 
 func validateMemberFlags(t *testing.T, cfg *config) {
 func validateMemberFlags(t *testing.T, cfg *config) {
-	wcfg := &config{
+	wcfg := &embed.Config{
 		Dir:          "testdir",
 		Dir:          "testdir",
-		lpurls:       []url.URL{{Scheme: "http", Host: "localhost:8000"}, {Scheme: "https", Host: "localhost:8001"}},
-		lcurls:       []url.URL{{Scheme: "http", Host: "localhost:7000"}, {Scheme: "https", Host: "localhost:7001"}},
+		LPUrls:       []url.URL{{Scheme: "http", Host: "localhost:8000"}, {Scheme: "https", Host: "localhost:8001"}},
+		LCUrls:       []url.URL{{Scheme: "http", Host: "localhost:7000"}, {Scheme: "https", Host: "localhost:7001"}},
 		MaxSnapFiles: 10,
 		MaxSnapFiles: 10,
 		MaxWalFiles:  10,
 		MaxWalFiles:  10,
 		Name:         "testname",
 		Name:         "testname",
@@ -510,25 +483,25 @@ func validateMemberFlags(t *testing.T, cfg *config) {
 	if cfg.SnapCount != wcfg.SnapCount {
 	if cfg.SnapCount != wcfg.SnapCount {
 		t.Errorf("snapcount = %v, want %v", cfg.SnapCount, wcfg.SnapCount)
 		t.Errorf("snapcount = %v, want %v", cfg.SnapCount, wcfg.SnapCount)
 	}
 	}
-	if !reflect.DeepEqual(cfg.lpurls, wcfg.lpurls) {
-		t.Errorf("listen-peer-urls = %v, want %v", cfg.lpurls, wcfg.lpurls)
+	if !reflect.DeepEqual(cfg.LPUrls, wcfg.LPUrls) {
+		t.Errorf("listen-peer-urls = %v, want %v", cfg.LPUrls, wcfg.LPUrls)
 	}
 	}
-	if !reflect.DeepEqual(cfg.lcurls, wcfg.lcurls) {
-		t.Errorf("listen-client-urls = %v, want %v", cfg.lcurls, wcfg.lcurls)
+	if !reflect.DeepEqual(cfg.LCUrls, wcfg.LCUrls) {
+		t.Errorf("listen-client-urls = %v, want %v", cfg.LCUrls, wcfg.LCUrls)
 	}
 	}
 }
 }
 
 
 func validateClusteringFlags(t *testing.T, cfg *config) {
 func validateClusteringFlags(t *testing.T, cfg *config) {
-	wcfg := NewConfig()
-	wcfg.apurls = []url.URL{{Scheme: "http", Host: "localhost:8000"}, {Scheme: "https", Host: "localhost:8001"}}
-	wcfg.acurls = []url.URL{{Scheme: "http", Host: "localhost:7000"}, {Scheme: "https", Host: "localhost:7001"}}
-	wcfg.clusterState.Set(clusterStateFlagExisting)
+	wcfg := newConfig()
+	wcfg.APUrls = []url.URL{{Scheme: "http", Host: "localhost:8000"}, {Scheme: "https", Host: "localhost:8001"}}
+	wcfg.ACUrls = []url.URL{{Scheme: "http", Host: "localhost:7000"}, {Scheme: "https", Host: "localhost:7001"}}
+	wcfg.ClusterState = embed.ClusterStateFlagExisting
 	wcfg.fallback.Set(fallbackFlagExit)
 	wcfg.fallback.Set(fallbackFlagExit)
 	wcfg.InitialCluster = "0=http://localhost:8000"
 	wcfg.InitialCluster = "0=http://localhost:8000"
 	wcfg.InitialClusterToken = "etcdtest"
 	wcfg.InitialClusterToken = "etcdtest"
 
 
-	if cfg.clusterState.String() != wcfg.clusterState.String() {
-		t.Errorf("clusterState = %v, want %v", cfg.clusterState, wcfg.clusterState)
+	if cfg.ClusterState != wcfg.ClusterState {
+		t.Errorf("clusterState = %v, want %v", cfg.ClusterState, wcfg.ClusterState)
 	}
 	}
 	if cfg.fallback.String() != wcfg.fallback.String() {
 	if cfg.fallback.String() != wcfg.fallback.String() {
 		t.Errorf("fallback = %v, want %v", cfg.fallback, wcfg.fallback)
 		t.Errorf("fallback = %v, want %v", cfg.fallback, wcfg.fallback)
@@ -539,35 +512,18 @@ func validateClusteringFlags(t *testing.T, cfg *config) {
 	if cfg.InitialClusterToken != wcfg.InitialClusterToken {
 	if cfg.InitialClusterToken != wcfg.InitialClusterToken {
 		t.Errorf("initialClusterToken = %v, want %v", cfg.InitialClusterToken, wcfg.InitialClusterToken)
 		t.Errorf("initialClusterToken = %v, want %v", cfg.InitialClusterToken, wcfg.InitialClusterToken)
 	}
 	}
-	if !reflect.DeepEqual(cfg.apurls, wcfg.apurls) {
-		t.Errorf("initial-advertise-peer-urls = %v, want %v", cfg.lpurls, wcfg.lpurls)
+	if !reflect.DeepEqual(cfg.APUrls, wcfg.APUrls) {
+		t.Errorf("initial-advertise-peer-urls = %v, want %v", cfg.LPUrls, wcfg.LPUrls)
 	}
 	}
-	if !reflect.DeepEqual(cfg.acurls, wcfg.acurls) {
-		t.Errorf("advertise-client-urls = %v, want %v", cfg.lcurls, wcfg.lcurls)
+	if !reflect.DeepEqual(cfg.ACUrls, wcfg.ACUrls) {
+		t.Errorf("advertise-client-urls = %v, want %v", cfg.LCUrls, wcfg.LCUrls)
 	}
 	}
 }
 }
 
 
 func validateOtherFlags(t *testing.T, cfg *config) {
 func validateOtherFlags(t *testing.T, cfg *config) {
-	wcfg := NewConfig()
+	wcfg := newConfig()
 	wcfg.proxy.Set(proxyFlagReadonly)
 	wcfg.proxy.Set(proxyFlagReadonly)
-	wcfg.clientTLSInfo.CAFile = "cafile"
-	wcfg.clientTLSInfo.CertFile = "certfile"
-	wcfg.clientTLSInfo.KeyFile = "keyfile"
-	wcfg.peerTLSInfo.CAFile = "peercafile"
-	wcfg.peerTLSInfo.CertFile = "peercertfile"
-	wcfg.peerTLSInfo.KeyFile = "peerkeyfile"
-	wcfg.ForceNewCluster = true
-
 	if cfg.proxy.String() != wcfg.proxy.String() {
 	if cfg.proxy.String() != wcfg.proxy.String() {
 		t.Errorf("proxy = %v, want %v", cfg.proxy, wcfg.proxy)
 		t.Errorf("proxy = %v, want %v", cfg.proxy, wcfg.proxy)
 	}
 	}
-	if cfg.clientTLSInfo.String() != wcfg.clientTLSInfo.String() {
-		t.Errorf("clientTLS = %v, want %v", cfg.clientTLSInfo, wcfg.clientTLSInfo)
-	}
-	if cfg.peerTLSInfo.String() != wcfg.peerTLSInfo.String() {
-		t.Errorf("peerTLS = %v, want %v", cfg.peerTLSInfo, wcfg.peerTLSInfo)
-	}
-	if cfg.ForceNewCluster != wcfg.ForceNewCluster {
-		t.Errorf("forceNewCluster = %t, want %t", cfg.ForceNewCluster, wcfg.ForceNewCluster)
-	}
 }
 }

+ 0 - 26
etcdmain/const_windows.go

@@ -1,26 +0,0 @@
-// Copyright 2015 The etcd Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-// +build windows
-
-package etcdmain
-
-// TODO(barakmich): So because file locking on Windows is untested, the
-// temporary fix is to default to unlimited snapshots and WAL files, with manual
-// removal. Perhaps not the most elegant solution, but it's at least safe and
-// we'd totally love a PR to fix the story around locking.
-const (
-	defaultMaxSnapshots = 0
-	defaultMaxWALs      = 0
-)

+ 33 - 273
etcdmain/etcd.go

@@ -21,7 +21,6 @@ import (
 	"io/ioutil"
 	"io/ioutil"
 	"net"
 	"net"
 	"net/http"
 	"net/http"
-	_ "net/http/pprof"
 	"os"
 	"os"
 	"path"
 	"path"
 	"reflect"
 	"reflect"
@@ -30,17 +29,15 @@ import (
 	"time"
 	"time"
 
 
 	"github.com/coreos/etcd/discovery"
 	"github.com/coreos/etcd/discovery"
+	"github.com/coreos/etcd/embed"
 	"github.com/coreos/etcd/etcdserver"
 	"github.com/coreos/etcd/etcdserver"
-	"github.com/coreos/etcd/etcdserver/api/v2http"
 	"github.com/coreos/etcd/pkg/cors"
 	"github.com/coreos/etcd/pkg/cors"
 	"github.com/coreos/etcd/pkg/fileutil"
 	"github.com/coreos/etcd/pkg/fileutil"
 	pkgioutil "github.com/coreos/etcd/pkg/ioutil"
 	pkgioutil "github.com/coreos/etcd/pkg/ioutil"
 	"github.com/coreos/etcd/pkg/osutil"
 	"github.com/coreos/etcd/pkg/osutil"
-	runtimeutil "github.com/coreos/etcd/pkg/runtime"
 	"github.com/coreos/etcd/pkg/transport"
 	"github.com/coreos/etcd/pkg/transport"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/proxy/httpproxy"
 	"github.com/coreos/etcd/proxy/httpproxy"
-	"github.com/coreos/etcd/rafthttp"
 	"github.com/coreos/etcd/version"
 	"github.com/coreos/etcd/version"
 	"github.com/coreos/go-systemd/daemon"
 	"github.com/coreos/go-systemd/daemon"
 	systemdutil "github.com/coreos/go-systemd/util"
 	systemdutil "github.com/coreos/go-systemd/util"
@@ -52,23 +49,6 @@ type dirType string
 
 
 var plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "etcdmain")
 var plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "etcdmain")
 
 
-const (
-	// the owner can make/remove files inside the directory
-	privateDirMode = 0700
-
-	// internal fd usage includes disk usage and transport usage.
-	// To read/write snapshot, snap pkg needs 1. In normal case, wal pkg needs
-	// at most 2 to read/lock/write WALs. One case that it needs to 2 is to
-	// read all logs after some snapshot index, which locates at the end of
-	// the second last and the head of the last. For purging, it needs to read
-	// directory, so it needs 1. For fd monitor, it needs 1.
-	// For transport, rafthttp builds two long-polling connections and at most
-	// four temporary connections with each member. There are at most 9 members
-	// in a cluster, so it should reserve 96.
-	// For the safety, we set the total reserved number to 150.
-	reservedInternalFDNum = 150
-)
-
 var (
 var (
 	dirMember = dirType("member")
 	dirMember = dirType("member")
 	dirProxy  = dirType("proxy")
 	dirProxy  = dirType("proxy")
@@ -76,12 +56,12 @@ var (
 )
 )
 
 
 func startEtcdOrProxyV2() {
 func startEtcdOrProxyV2() {
-	cfg := NewConfig()
-	err := cfg.Parse(os.Args[1:])
+	cfg := newConfig()
+	err := cfg.parse(os.Args[1:])
 	if err != nil {
 	if err != nil {
 		plog.Errorf("error verifying flags, %v. See 'etcd --help'.", err)
 		plog.Errorf("error verifying flags, %v. See 'etcd --help'.", err)
 		switch err {
 		switch err {
-		case errUnsetAdvertiseClientURLsFlag:
+		case embed.ErrUnsetAdvertiseClientURLsFlag:
 			plog.Errorf("When listening on specific address(es), this etcd process must advertise accessible url(s) to each connected client.")
 			plog.Errorf("When listening on specific address(es), this etcd process must advertise accessible url(s) to each connected client.")
 		}
 		}
 		os.Exit(1)
 		os.Exit(1)
@@ -89,6 +69,7 @@ func startEtcdOrProxyV2() {
 	setupLogging(cfg)
 	setupLogging(cfg)
 
 
 	var stopped <-chan struct{}
 	var stopped <-chan struct{}
+	var errc <-chan error
 
 
 	plog.Infof("etcd Version: %s\n", version.Version)
 	plog.Infof("etcd Version: %s\n", version.Version)
 	plog.Infof("Git SHA: %s\n", version.GitSHA)
 	plog.Infof("Git SHA: %s\n", version.GitSHA)
@@ -99,8 +80,8 @@ func startEtcdOrProxyV2() {
 	plog.Infof("setting maximum number of CPUs to %d, total number of available CPUs is %d", GoMaxProcs, runtime.NumCPU())
 	plog.Infof("setting maximum number of CPUs to %d, total number of available CPUs is %d", GoMaxProcs, runtime.NumCPU())
 
 
 	// TODO: check whether fields are set instead of whether fields have default value
 	// TODO: check whether fields are set instead of whether fields have default value
-	if cfg.Name != defaultName && cfg.InitialCluster == initialClusterFromName(defaultName) {
-		cfg.InitialCluster = initialClusterFromName(cfg.Name)
+	if cfg.Name != embed.DefaultName && cfg.InitialCluster == cfg.InitialClusterFromName(embed.DefaultName) {
+		cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
 	}
 	}
 
 
 	if cfg.Dir == "" {
 	if cfg.Dir == "" {
@@ -113,7 +94,7 @@ func startEtcdOrProxyV2() {
 		plog.Noticef("the server is already initialized as %v before, starting as etcd %v...", which, which)
 		plog.Noticef("the server is already initialized as %v before, starting as etcd %v...", which, which)
 		switch which {
 		switch which {
 		case dirMember:
 		case dirMember:
-			stopped, err = startEtcd(cfg)
+			stopped, errc, err = startEtcd(&cfg.Config)
 		case dirProxy:
 		case dirProxy:
 			err = startProxy(cfg)
 			err = startProxy(cfg)
 		default:
 		default:
@@ -122,7 +103,7 @@ func startEtcdOrProxyV2() {
 	} else {
 	} else {
 		shouldProxy := cfg.isProxy()
 		shouldProxy := cfg.isProxy()
 		if !shouldProxy {
 		if !shouldProxy {
-			stopped, err = startEtcd(cfg)
+			stopped, errc, err = startEtcd(&cfg.Config)
 			if derr, ok := err.(*etcdserver.DiscoveryError); ok && derr.Err == discovery.ErrFullCluster {
 			if derr, ok := err.(*etcdserver.DiscoveryError); ok && derr.Err == discovery.ErrFullCluster {
 				if cfg.shouldFallbackToProxy() {
 				if cfg.shouldFallbackToProxy() {
 					plog.Noticef("discovery cluster full, falling back to %s", fallbackFlagProxy)
 					plog.Noticef("discovery cluster full, falling back to %s", fallbackFlagProxy)
@@ -157,13 +138,13 @@ func startEtcdOrProxyV2() {
 
 
 		if strings.Contains(err.Error(), "include") && strings.Contains(err.Error(), "--initial-cluster") {
 		if strings.Contains(err.Error(), "include") && strings.Contains(err.Error(), "--initial-cluster") {
 			plog.Infof("%v", err)
 			plog.Infof("%v", err)
-			if cfg.InitialCluster == initialClusterFromName(cfg.Name) {
+			if cfg.InitialCluster == cfg.InitialClusterFromName(cfg.Name) {
 				plog.Infof("forgot to set --initial-cluster flag?")
 				plog.Infof("forgot to set --initial-cluster flag?")
 			}
 			}
-			if types.URLs(cfg.apurls).String() == defaultInitialAdvertisePeerURLs {
+			if types.URLs(cfg.APUrls).String() == embed.DefaultInitialAdvertisePeerURLs {
 				plog.Infof("forgot to set --initial-advertise-peer-urls flag?")
 				plog.Infof("forgot to set --initial-advertise-peer-urls flag?")
 			}
 			}
-			if cfg.InitialCluster == initialClusterFromName(cfg.Name) && len(cfg.Durl) == 0 {
+			if cfg.InitialCluster == cfg.InitialClusterFromName(cfg.Name) && len(cfg.Durl) == 0 {
 				plog.Infof("if you want to use discovery service, please set --discovery flag.")
 				plog.Infof("if you want to use discovery service, please set --discovery flag.")
 			}
 			}
 			os.Exit(1)
 			os.Exit(1)
@@ -188,233 +169,43 @@ func startEtcdOrProxyV2() {
 		}
 		}
 	}
 	}
 
 
-	<-stopped
-	osutil.Exit(0)
-}
-
-// startEtcd launches the etcd server and HTTP handlers for client/server communication.
-func startEtcd(cfg *config) (<-chan struct{}, error) {
-	urlsmap, token, err := getPeerURLsMapAndToken(cfg, "etcd")
-	if err != nil {
-		return nil, fmt.Errorf("error setting up initial cluster: %v", err)
-	}
-
-	if cfg.PeerAutoTLS && cfg.peerTLSInfo.Empty() {
-		var phosts []string
-		for _, u := range cfg.lpurls {
-			phosts = append(phosts, u.Host)
-		}
-		cfg.peerTLSInfo, err = transport.SelfCert(path.Join(cfg.Dir, "fixtures/peer"), phosts)
-		if err != nil {
-			plog.Fatalf("could not get certs (%v)", err)
-		}
-	} else if cfg.PeerAutoTLS {
-		plog.Warningf("ignoring peer auto TLS since certs given")
-	}
-
-	if !cfg.peerTLSInfo.Empty() {
-		plog.Infof("peerTLS: %s", cfg.peerTLSInfo)
+	select {
+	case lerr := <-errc:
+		// fatal out on listener errors
+		plog.Fatal(lerr)
+	case <-stopped:
 	}
 	}
 
 
-	var plns []net.Listener
-	for _, u := range cfg.lpurls {
-		if u.Scheme == "http" {
-			if !cfg.peerTLSInfo.Empty() {
-				plog.Warningf("The scheme of peer url %s is HTTP while peer key/cert files are presented. Ignored peer key/cert files.", u.String())
-			}
-			if cfg.peerTLSInfo.ClientCertAuth {
-				plog.Warningf("The scheme of peer url %s is HTTP while client cert auth (--peer-client-cert-auth) is enabled. Ignored client cert auth for this url.", u.String())
-			}
-		}
-		var (
-			l      net.Listener
-			tlscfg *tls.Config
-		)
-
-		if !cfg.peerTLSInfo.Empty() {
-			tlscfg, err = cfg.peerTLSInfo.ServerConfig()
-			if err != nil {
-				return nil, err
-			}
-		}
-
-		l, err = rafthttp.NewListener(u, tlscfg)
-		if err != nil {
-			return nil, err
-		}
-
-		urlStr := u.String()
-		plog.Info("listening for peers on ", urlStr)
-		defer func() {
-			if err != nil {
-				l.Close()
-				plog.Info("stopping listening for peers on ", urlStr)
-			}
-		}()
-		plns = append(plns, l)
-	}
-
-	if cfg.ClientAutoTLS && cfg.clientTLSInfo.Empty() {
-		var chosts []string
-		for _, u := range cfg.lcurls {
-			chosts = append(chosts, u.Host)
-		}
-		cfg.clientTLSInfo, err = transport.SelfCert(path.Join(cfg.Dir, "fixtures/client"), chosts)
-		if err != nil {
-			plog.Fatalf("could not get certs (%v)", err)
-		}
-	} else if cfg.ClientAutoTLS {
-		plog.Warningf("ignoring client auto TLS since certs given")
-	}
-
-	var ctlscfg *tls.Config
-	if !cfg.clientTLSInfo.Empty() {
-		plog.Infof("clientTLS: %s", cfg.clientTLSInfo)
-		ctlscfg, err = cfg.clientTLSInfo.ServerConfig()
-		if err != nil {
-			return nil, err
-		}
-	}
-
-	sctxs := make(map[string]*serveCtx)
-	for _, u := range cfg.lcurls {
-		if u.Scheme == "http" {
-			if !cfg.clientTLSInfo.Empty() {
-				plog.Warningf("The scheme of client url %s is HTTP while peer key/cert files are presented. Ignored key/cert files.", u.String())
-			}
-			if cfg.clientTLSInfo.ClientCertAuth {
-				plog.Warningf("The scheme of client url %s is HTTP while client cert auth (--client-cert-auth) is enabled. Ignored client cert auth for this url.", u.String())
-			}
-		}
-		if u.Scheme == "https" && ctlscfg == nil {
-			return nil, fmt.Errorf("TLS key/cert (--cert-file, --key-file) must be provided for client url %s with HTTPs scheme", u.String())
-		}
-
-		ctx := &serveCtx{host: u.Host}
-
-		if u.Scheme == "https" {
-			ctx.secure = true
-		} else {
-			ctx.insecure = true
-		}
-
-		if sctxs[u.Host] != nil {
-			if ctx.secure {
-				sctxs[u.Host].secure = true
-			}
-			if ctx.insecure {
-				sctxs[u.Host].insecure = true
-			}
-			continue
-		}
-
-		var l net.Listener
-
-		l, err = net.Listen("tcp", u.Host)
-		if err != nil {
-			return nil, err
-		}
-
-		var fdLimit uint64
-		if fdLimit, err = runtimeutil.FDLimit(); err == nil {
-			if fdLimit <= reservedInternalFDNum {
-				plog.Fatalf("file descriptor limit[%d] of etcd process is too low, and should be set higher than %d to ensure internal usage", fdLimit, reservedInternalFDNum)
-			}
-			l = transport.LimitListener(l, int(fdLimit-reservedInternalFDNum))
-		}
-
-		l, err = transport.NewKeepAliveListener(l, "tcp", nil)
-		ctx.l = l
-		if err != nil {
-			return nil, err
-		}
-
-		plog.Info("listening for client requests on ", u.Host)
-		defer func() {
-			if err != nil {
-				l.Close()
-				plog.Info("stopping listening for client requests on ", u.Host)
-			}
-		}()
-		sctxs[u.Host] = ctx
-	}
+	osutil.Exit(0)
+}
 
 
-	srvcfg := &etcdserver.ServerConfig{
-		Name:                    cfg.Name,
-		ClientURLs:              cfg.acurls,
-		PeerURLs:                cfg.apurls,
-		DataDir:                 cfg.Dir,
-		DedicatedWALDir:         cfg.WalDir,
-		SnapCount:               cfg.SnapCount,
-		MaxSnapFiles:            cfg.MaxSnapFiles,
-		MaxWALFiles:             cfg.MaxWalFiles,
-		InitialPeerURLsMap:      urlsmap,
-		InitialClusterToken:     token,
-		DiscoveryURL:            cfg.Durl,
-		DiscoveryProxy:          cfg.Dproxy,
-		NewCluster:              cfg.isNewCluster(),
-		ForceNewCluster:         cfg.ForceNewCluster,
-		PeerTLSInfo:             cfg.peerTLSInfo,
-		TickMs:                  cfg.TickMs,
-		ElectionTicks:           cfg.electionTicks(),
-		AutoCompactionRetention: cfg.autoCompactionRetention,
-		QuotaBackendBytes:       cfg.QuotaBackendBytes,
-		StrictReconfigCheck:     cfg.StrictReconfigCheck,
-		EnablePprof:             cfg.enablePprof,
-	}
-	var s *etcdserver.EtcdServer
-	s, err = etcdserver.NewServer(srvcfg)
+// startEtcd runs StartEtcd in addition to hooks needed for standalone etcd.
+func startEtcd(cfg *embed.Config) (<-chan struct{}, <-chan error, error) {
+	e, err := embed.StartEtcd(cfg)
 	if err != nil {
 	if err != nil {
-		return nil, err
-	}
-	s.Start()
-	osutil.RegisterInterruptHandler(s.Stop)
-
-	if cfg.corsInfo.String() != "" {
-		plog.Infof("cors = %s", cfg.corsInfo)
-	}
-	ch := http.Handler(&cors.CORSHandler{
-		Handler: v2http.NewClientHandler(s, srvcfg.ReqTimeout()),
-		Info:    cfg.corsInfo,
-	})
-	ph := v2http.NewPeerHandler(s)
-
-	// Start the peer server in a goroutine
-	for _, l := range plns {
-		go func(l net.Listener) {
-			plog.Fatal(servePeerHTTP(l, ph))
-		}(l)
-	}
-	// Start a client server goroutine for each listen address
-	for _, sctx := range sctxs {
-		go func(sctx *serveCtx) {
-			// read timeout does not work with http close notify
-			// TODO: https://github.com/golang/go/issues/9524
-			plog.Fatal(serve(sctx, s, ctlscfg, ch))
-		}(sctx)
+		return nil, nil, err
 	}
 	}
-
-	<-s.ReadyNotify()
-	return s.StopNotify(), nil
+	osutil.RegisterInterruptHandler(e.Server.Stop)
+	return e.Server.StopNotify(), e.Err(), nil
 }
 }
 
 
 // startProxy launches an HTTP proxy for client communication which proxies to other etcd nodes.
 // startProxy launches an HTTP proxy for client communication which proxies to other etcd nodes.
 func startProxy(cfg *config) error {
 func startProxy(cfg *config) error {
 	plog.Notice("proxy: this proxy supports v2 API only!")
 	plog.Notice("proxy: this proxy supports v2 API only!")
 
 
-	pt, err := transport.NewTimeoutTransport(cfg.peerTLSInfo, time.Duration(cfg.ProxyDialTimeoutMs)*time.Millisecond, time.Duration(cfg.ProxyReadTimeoutMs)*time.Millisecond, time.Duration(cfg.ProxyWriteTimeoutMs)*time.Millisecond)
+	pt, err := transport.NewTimeoutTransport(cfg.PeerTLSInfo, time.Duration(cfg.ProxyDialTimeoutMs)*time.Millisecond, time.Duration(cfg.ProxyReadTimeoutMs)*time.Millisecond, time.Duration(cfg.ProxyWriteTimeoutMs)*time.Millisecond)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
 	pt.MaxIdleConnsPerHost = httpproxy.DefaultMaxIdleConnsPerHost
 	pt.MaxIdleConnsPerHost = httpproxy.DefaultMaxIdleConnsPerHost
 
 
-	tr, err := transport.NewTimeoutTransport(cfg.peerTLSInfo, time.Duration(cfg.ProxyDialTimeoutMs)*time.Millisecond, time.Duration(cfg.ProxyReadTimeoutMs)*time.Millisecond, time.Duration(cfg.ProxyWriteTimeoutMs)*time.Millisecond)
+	tr, err := transport.NewTimeoutTransport(cfg.PeerTLSInfo, time.Duration(cfg.ProxyDialTimeoutMs)*time.Millisecond, time.Duration(cfg.ProxyReadTimeoutMs)*time.Millisecond, time.Duration(cfg.ProxyWriteTimeoutMs)*time.Millisecond)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
 
 
 	cfg.Dir = path.Join(cfg.Dir, "proxy")
 	cfg.Dir = path.Join(cfg.Dir, "proxy")
-	err = os.MkdirAll(cfg.Dir, privateDirMode)
+	err = os.MkdirAll(cfg.Dir, fileutil.PrivateDirMode)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
@@ -440,7 +231,7 @@ func startProxy(cfg *config) error {
 		plog.Infof("proxy: using peer urls %v from cluster file %q", peerURLs, clusterfile)
 		plog.Infof("proxy: using peer urls %v from cluster file %q", peerURLs, clusterfile)
 	case os.IsNotExist(err):
 	case os.IsNotExist(err):
 		var urlsmap types.URLsMap
 		var urlsmap types.URLsMap
-		urlsmap, _, err = getPeerURLsMapAndToken(cfg, "proxy")
+		urlsmap, _, err = cfg.PeerURLsMapAndToken("proxy")
 		if err != nil {
 		if err != nil {
 			return fmt.Errorf("error setting up initial cluster: %v", err)
 			return fmt.Errorf("error setting up initial cluster: %v", err)
 		}
 		}
@@ -502,20 +293,20 @@ func startProxy(cfg *config) error {
 	ph := httpproxy.NewHandler(pt, uf, time.Duration(cfg.ProxyFailureWaitMs)*time.Millisecond, time.Duration(cfg.ProxyRefreshIntervalMs)*time.Millisecond)
 	ph := httpproxy.NewHandler(pt, uf, time.Duration(cfg.ProxyFailureWaitMs)*time.Millisecond, time.Duration(cfg.ProxyRefreshIntervalMs)*time.Millisecond)
 	ph = &cors.CORSHandler{
 	ph = &cors.CORSHandler{
 		Handler: ph,
 		Handler: ph,
-		Info:    cfg.corsInfo,
+		Info:    cfg.CorsInfo,
 	}
 	}
 
 
 	if cfg.isReadonlyProxy() {
 	if cfg.isReadonlyProxy() {
 		ph = httpproxy.NewReadonlyHandler(ph)
 		ph = httpproxy.NewReadonlyHandler(ph)
 	}
 	}
 	// Start a proxy server goroutine for each listen address
 	// Start a proxy server goroutine for each listen address
-	for _, u := range cfg.lcurls {
+	for _, u := range cfg.LCUrls {
 		var (
 		var (
 			l      net.Listener
 			l      net.Listener
 			tlscfg *tls.Config
 			tlscfg *tls.Config
 		)
 		)
-		if !cfg.clientTLSInfo.Empty() {
-			tlscfg, err = cfg.clientTLSInfo.ServerConfig()
+		if !cfg.ClientTLSInfo.Empty() {
+			tlscfg, err = cfg.ClientTLSInfo.ServerConfig()
 			if err != nil {
 			if err != nil {
 				return err
 				return err
 			}
 			}
@@ -538,37 +329,6 @@ func startProxy(cfg *config) error {
 	return nil
 	return nil
 }
 }
 
 
-// getPeerURLsMapAndToken sets up an initial peer URLsMap and cluster token for bootstrap or discovery.
-func getPeerURLsMapAndToken(cfg *config, which string) (urlsmap types.URLsMap, token string, err error) {
-	switch {
-	case cfg.Durl != "":
-		urlsmap = types.URLsMap{}
-		// If using discovery, generate a temporary cluster based on
-		// self's advertised peer URLs
-		urlsmap[cfg.Name] = cfg.apurls
-		token = cfg.Durl
-	case cfg.DNSCluster != "":
-		var clusterStr string
-		clusterStr, token, err = discovery.SRVGetCluster(cfg.Name, cfg.DNSCluster, cfg.InitialClusterToken, cfg.apurls)
-		if err != nil {
-			return nil, "", err
-		}
-		urlsmap, err = types.NewURLsMap(clusterStr)
-		// only etcd member must belong to the discovered cluster.
-		// proxy does not need to belong to the discovered cluster.
-		if which == "etcd" {
-			if _, ok := urlsmap[cfg.Name]; !ok {
-				return nil, "", fmt.Errorf("cannot find local etcd member %q in SRV records", cfg.Name)
-			}
-		}
-	default:
-		// We're statically configured, and cluster has appropriately been set.
-		urlsmap, err = types.NewURLsMap(cfg.InitialCluster)
-		token = cfg.InitialClusterToken
-	}
-	return urlsmap, token, err
-}
-
 // identifyDataDirOrDie returns the type of the data dir.
 // identifyDataDirOrDie returns the type of the data dir.
 // Dies if the datadir is invalid.
 // Dies if the datadir is invalid.
 func identifyDataDirOrDie(dir string) dirType {
 func identifyDataDirOrDie(dir string) dirType {

+ 7 - 3
etcdmain/help.go

@@ -14,7 +14,11 @@
 
 
 package etcdmain
 package etcdmain
 
 
-import "strconv"
+import (
+	"strconv"
+
+	"github.com/coreos/etcd/embed"
+)
 
 
 var (
 var (
 	usageline = `usage: etcd [flags]
 	usageline = `usage: etcd [flags]
@@ -48,9 +52,9 @@ member flags:
 		list of URLs to listen on for peer traffic.
 		list of URLs to listen on for peer traffic.
 	--listen-client-urls 'http://localhost:2379'
 	--listen-client-urls 'http://localhost:2379'
 		list of URLs to listen on for client traffic.
 		list of URLs to listen on for client traffic.
-	--max-snapshots '` + strconv.Itoa(defaultMaxSnapshots) + `'
+	--max-snapshots '` + strconv.Itoa(embed.DefaultMaxSnapshots) + `'
 		maximum number of snapshot files to retain (0 is unlimited).
 		maximum number of snapshot files to retain (0 is unlimited).
-	--max-wals '` + strconv.Itoa(defaultMaxWALs) + `'
+	--max-wals '` + strconv.Itoa(embed.DefaultMaxWALs) + `'
 		maximum number of wal files to retain (0 is unlimited).
 		maximum number of wal files to retain (0 is unlimited).
 	--cors ''
 	--cors ''
 		comma-separated whitelist of origins for CORS (cross-origin resource sharing).
 		comma-separated whitelist of origins for CORS (cross-origin resource sharing).

+ 103 - 0
integration/embed_test.go

@@ -0,0 +1,103 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package integration
+
+import (
+	"fmt"
+	"net/url"
+	"os"
+	"path"
+	"strings"
+	"testing"
+
+	"github.com/coreos/etcd/embed"
+)
+
+func TestEmbedEtcd(t *testing.T) {
+	tests := []struct {
+		cfg embed.Config
+
+		werr     string
+		wpeers   int
+		wclients int
+	}{
+		{werr: "multiple discovery"},
+		{werr: "advertise-client-urls is required"},
+		{werr: "should be at least"},
+		{werr: "is too long"},
+		{wpeers: 1, wclients: 1},
+		{wpeers: 2, wclients: 1},
+		{wpeers: 1, wclients: 2},
+	}
+
+	// 4000x so no conflict with e2e tests
+	url1, _ := url.Parse("http://localhost:40000")
+	url2, _ := url.Parse("http://localhost:40001")
+	url3, _ := url.Parse("http://localhost:40002")
+
+	// setup defaults
+	for i := range tests {
+		tests[i].cfg = *embed.NewConfig()
+	}
+
+	tests[0].cfg.Durl = "abc"
+	setupEmbedCfg(&tests[1].cfg, []url.URL{*url1}, []url.URL{*url2})
+	tests[1].cfg.ACUrls = nil
+	tests[2].cfg.TickMs = tests[2].cfg.ElectionMs - 1
+	tests[3].cfg.ElectionMs = 999999
+	setupEmbedCfg(&tests[4].cfg, []url.URL{*url1}, []url.URL{*url2})
+	setupEmbedCfg(&tests[5].cfg, []url.URL{*url1}, []url.URL{*url2, *url3})
+	setupEmbedCfg(&tests[6].cfg, []url.URL{*url1, *url2}, []url.URL{*url3})
+
+	dir := path.Join(os.TempDir(), fmt.Sprintf("embed-etcd"))
+	os.RemoveAll(dir)
+	defer os.RemoveAll(dir)
+
+	for i, tt := range tests {
+		tests[i].cfg.Dir = dir
+		e, err := embed.StartEtcd(&tests[i].cfg)
+		if tt.werr != "" {
+			if err == nil || !strings.Contains(err.Error(), tt.werr) {
+				t.Errorf("%d: expected error with %q, got %v", i, tt.werr, err)
+			}
+			if e != nil {
+				e.Close()
+			}
+			continue
+		}
+		if err != nil {
+			t.Errorf("%d: expected success, got error %v", i, err)
+			continue
+		}
+		if len(e.Peers) != tt.wpeers {
+			t.Errorf("%d: expected %d peers, got %d", i, tt.wpeers, len(e.Peers))
+		}
+		if len(e.Clients) != tt.wclients {
+			t.Errorf("%d: expected %d peers, got %d", i, tt.wclients, len(e.Clients))
+		}
+		e.Close()
+	}
+}
+
+func setupEmbedCfg(cfg *embed.Config, curls []url.URL, purls []url.URL) {
+	cfg.ClusterState = "new"
+	cfg.LCUrls, cfg.ACUrls = curls, curls
+	cfg.LPUrls, cfg.APUrls = purls, purls
+	cfg.InitialCluster = ""
+	for i := range purls {
+		cfg.InitialCluster += ",default=" + purls[i].String()
+	}
+	cfg.InitialCluster = cfg.InitialCluster[1:]
+}