|
|
@@ -30,8 +30,9 @@ import (
|
|
|
"github.com/coreos/etcd/client"
|
|
|
"github.com/coreos/etcd/etcdmain"
|
|
|
"github.com/coreos/etcd/migrate"
|
|
|
+ "github.com/coreos/etcd/pkg/fileutil"
|
|
|
"github.com/coreos/etcd/pkg/flags"
|
|
|
- "github.com/coreos/etcd/wal"
|
|
|
+ "github.com/coreos/etcd/pkg/types"
|
|
|
|
|
|
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
|
|
)
|
|
|
@@ -41,80 +42,147 @@ type version string
|
|
|
const (
|
|
|
internalV1 version = "1"
|
|
|
internalV2 version = "2"
|
|
|
+ internalV2Proxy version = "2.proxy"
|
|
|
internalUnknown version = "unknown"
|
|
|
|
|
|
+ v0_4 version = "v0.4"
|
|
|
+ v2_0 version = "v2.0"
|
|
|
+ v2_0Proxy version = "v2.0 proxy"
|
|
|
+ empty version = "empty"
|
|
|
+ unknown version = "unknown"
|
|
|
+
|
|
|
defaultInternalV1etcdBinaryDir = "/usr/libexec/etcd/versions/"
|
|
|
)
|
|
|
|
|
|
+var (
|
|
|
+ v2SpecialFlags = []string{
|
|
|
+ "initial-cluster",
|
|
|
+ "listen-peer-urls",
|
|
|
+ "listen-client-urls",
|
|
|
+ "proxy",
|
|
|
+ }
|
|
|
+)
|
|
|
+
|
|
|
func StartDesiredVersion(args []string) {
|
|
|
- switch checkStartVersion(args) {
|
|
|
+ fs, err := parseConfig(args)
|
|
|
+ if err != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ ver := checkInternalVersion(fs)
|
|
|
+ log.Printf("starter: start etcd version %s", ver)
|
|
|
+ switch ver {
|
|
|
case internalV1:
|
|
|
startInternalV1()
|
|
|
case internalV2:
|
|
|
+ case internalV2Proxy:
|
|
|
+ if _, err := os.Stat(standbyInfo4(fs.Lookup("data-dir").Value.String())); err != nil {
|
|
|
+ log.Printf("starter: Detect standby_info file exists, and add --proxy=on flag to ensure it runs in v2.0 proxy mode.")
|
|
|
+ log.Printf("starter: Before removing v0.4 data, --proxy=on flag MUST be added.")
|
|
|
+ }
|
|
|
+ // append proxy flag to args to trigger proxy mode
|
|
|
+ os.Args = append(os.Args, "-proxy=on")
|
|
|
default:
|
|
|
- log.Panicf("migrate: unhandled start version")
|
|
|
+ log.Panicf("starter: unhandled start version")
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func checkStartVersion(args []string) version {
|
|
|
- fs, err := parseConfig(args)
|
|
|
- if err != nil {
|
|
|
- return internalV2
|
|
|
- }
|
|
|
+func checkInternalVersion(fs *flag.FlagSet) version {
|
|
|
// If it uses 2.0 env var explicitly, start 2.0
|
|
|
- if fs.Lookup("initial-cluster").Value.String() != "" {
|
|
|
- return internalV2
|
|
|
+ for _, name := range v2SpecialFlags {
|
|
|
+ if fs.Lookup(name).Value.String() != "" {
|
|
|
+ return internalV2
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
dataDir := fs.Lookup("data-dir").Value.String()
|
|
|
if dataDir == "" {
|
|
|
- log.Fatalf("migrate: please set ETCD_DATA_DIR for etcd")
|
|
|
+ log.Fatalf("starter: please set --data-dir or ETCD_DATA_DIR for etcd")
|
|
|
}
|
|
|
// check the data directory
|
|
|
- walVersion, err := wal.DetectVersion(dataDir)
|
|
|
+ ver, err := checkVersion(dataDir)
|
|
|
if err != nil {
|
|
|
- log.Fatalf("migrate: failed to detect etcd version in %v: %v", dataDir, err)
|
|
|
+ log.Fatalf("starter: failed to detect etcd version in %v: %v", dataDir, err)
|
|
|
}
|
|
|
- log.Printf("migrate: detect etcd version %s in %s", walVersion, dataDir)
|
|
|
- switch walVersion {
|
|
|
- case wal.WALv0_5:
|
|
|
+ log.Printf("starter: detect etcd version %s in %s", ver, dataDir)
|
|
|
+ switch ver {
|
|
|
+ case v2_0:
|
|
|
return internalV2
|
|
|
- case wal.WALv0_4:
|
|
|
- // TODO: standby case
|
|
|
- // if it is standby guy:
|
|
|
- // print out detect standby mode
|
|
|
- // go to WALNotExist case
|
|
|
- // if want to start with 2.0:
|
|
|
- // remove old data dir to avoid auto migration
|
|
|
- // try to let it fallback? or use local proxy file?
|
|
|
- ver, err := checkStartVersionByDataDir4(dataDir)
|
|
|
+ case v2_0Proxy:
|
|
|
+ return internalV2Proxy
|
|
|
+ case v0_4:
|
|
|
+ standbyInfo, err := migrate.DecodeStandbyInfo4FromFile(standbyInfo4(dataDir))
|
|
|
+ if err != nil && !os.IsNotExist(err) {
|
|
|
+ log.Fatalf("starter: failed to decode standbyInfo in %v: %v", dataDir, err)
|
|
|
+ }
|
|
|
+ inStandbyMode := standbyInfo != nil && standbyInfo.Running
|
|
|
+ if inStandbyMode {
|
|
|
+ ver, err := checkInternalVersionByClientURLs(standbyInfo.ClientURLs(), clientTLSInfo(fs))
|
|
|
+ if err != nil {
|
|
|
+ log.Printf("starter: failed to check start version through peers: %v", err)
|
|
|
+ return internalV1
|
|
|
+ }
|
|
|
+ if ver == internalV2 {
|
|
|
+ os.Args = append(os.Args, "-initial-cluster", standbyInfo.InitialCluster())
|
|
|
+ return internalV2Proxy
|
|
|
+ }
|
|
|
+ return ver
|
|
|
+ }
|
|
|
+ ver, err := checkInternalVersionByDataDir4(dataDir)
|
|
|
if err != nil {
|
|
|
- log.Fatalf("migrate: failed to check start version in %v: %v", dataDir, err)
|
|
|
+ log.Fatalf("starter: failed to check start version in %v: %v", dataDir, err)
|
|
|
}
|
|
|
return ver
|
|
|
- case wal.WALUnknown:
|
|
|
- log.Fatalf("migrate: unknown etcd version in %v", dataDir)
|
|
|
- case wal.WALNotExist:
|
|
|
+ case empty:
|
|
|
discovery := fs.Lookup("discovery").Value.String()
|
|
|
- peers := trimSplit(fs.Lookup("peers").Value.String(), ",")
|
|
|
- peerTLSInfo := &TLSInfo{
|
|
|
- CAFile: fs.Lookup("peer-ca-file").Value.String(),
|
|
|
- CertFile: fs.Lookup("peer-cert-file").Value.String(),
|
|
|
- KeyFile: fs.Lookup("peer-key-file").Value.String(),
|
|
|
+ dpeers, err := getPeersFromDiscoveryURL(discovery)
|
|
|
+ if err != nil {
|
|
|
+ log.Printf("starter: failed to get peers from discovery %s: %v", discovery, err)
|
|
|
}
|
|
|
- ver, err := checkStartVersionByMembers(discovery, peers, peerTLSInfo)
|
|
|
+ peerStr := fs.Lookup("peers").Value.String()
|
|
|
+ ppeers := getPeersFromPeersFlag(peerStr, peerTLSInfo(fs))
|
|
|
+
|
|
|
+ urls := getClientURLsByPeerURLs(append(dpeers, ppeers...), peerTLSInfo(fs))
|
|
|
+ ver, err := checkInternalVersionByClientURLs(urls, clientTLSInfo(fs))
|
|
|
if err != nil {
|
|
|
- log.Printf("migrate: failed to check start version through peers: %v", err)
|
|
|
- break
|
|
|
+ log.Printf("starter: failed to check start version through peers: %v", err)
|
|
|
+ return internalV2
|
|
|
}
|
|
|
return ver
|
|
|
- default:
|
|
|
- log.Panicf("migrate: unhandled etcd version in %v", dataDir)
|
|
|
}
|
|
|
- return internalV2
|
|
|
+ // never reach here
|
|
|
+ log.Panicf("starter: unhandled etcd version in %v", dataDir)
|
|
|
+ return internalUnknown
|
|
|
}
|
|
|
|
|
|
-func checkStartVersionByDataDir4(dataDir string) (version, error) {
|
|
|
+func checkVersion(dataDir string) (version, error) {
|
|
|
+ names, err := fileutil.ReadDir(dataDir)
|
|
|
+ if err != nil {
|
|
|
+ if os.IsNotExist(err) {
|
|
|
+ err = nil
|
|
|
+ }
|
|
|
+ return empty, err
|
|
|
+ }
|
|
|
+ if len(names) == 0 {
|
|
|
+ return empty, nil
|
|
|
+ }
|
|
|
+ nameSet := types.NewUnsafeSet(names...)
|
|
|
+ if nameSet.ContainsAll([]string{"member"}) {
|
|
|
+ return v2_0, nil
|
|
|
+ }
|
|
|
+ if nameSet.ContainsAll([]string{"proxy"}) {
|
|
|
+ return v2_0Proxy, nil
|
|
|
+ }
|
|
|
+ if nameSet.ContainsAll([]string{"snapshot", "conf", "log"}) {
|
|
|
+ return v0_4, nil
|
|
|
+ }
|
|
|
+ if nameSet.ContainsAll([]string{"standby_info"}) {
|
|
|
+ return v0_4, nil
|
|
|
+ }
|
|
|
+ return unknown, fmt.Errorf("failed to check version")
|
|
|
+}
|
|
|
+
|
|
|
+func checkInternalVersionByDataDir4(dataDir string) (version, error) {
|
|
|
// check v0.4 snapshot
|
|
|
snap4, err := migrate.DecodeLatestSnapshot4FromDir(snapDir4(dataDir))
|
|
|
if err != nil {
|
|
|
@@ -153,51 +221,50 @@ func checkStartVersionByDataDir4(dataDir string) (version, error) {
|
|
|
return internalV1, nil
|
|
|
}
|
|
|
|
|
|
-func checkStartVersionByMembers(discoverURL string, peers []string, tls *TLSInfo) (version, error) {
|
|
|
- tr := &http.Transport{}
|
|
|
- if tls.Scheme() == "https" {
|
|
|
- tlsConfig, err := tls.ClientConfig()
|
|
|
- if err != nil {
|
|
|
- return internalUnknown, err
|
|
|
- }
|
|
|
- tr.TLSClientConfig = tlsConfig
|
|
|
- }
|
|
|
- c := &http.Client{Transport: tr}
|
|
|
-
|
|
|
- possiblePeers, err := getPeersFromDiscoveryURL(discoverURL)
|
|
|
+func getClientURLsByPeerURLs(peers []string, tls *TLSInfo) []string {
|
|
|
+ c, err := newDefaultClient(tls)
|
|
|
if err != nil {
|
|
|
- return internalUnknown, err
|
|
|
+ log.Printf("starter: new client error: %v", err)
|
|
|
+ return nil
|
|
|
}
|
|
|
- for _, p := range peers {
|
|
|
- possiblePeers = append(possiblePeers, tls.Scheme()+"://"+p)
|
|
|
- }
|
|
|
-
|
|
|
- for _, p := range possiblePeers {
|
|
|
- resp, err := c.Get(p + "/etcdURL")
|
|
|
+ var urls []string
|
|
|
+ for _, u := range peers {
|
|
|
+ resp, err := c.Get(u + "/etcdURL")
|
|
|
if err != nil {
|
|
|
- log.Printf("migrate: failed to get /etcdURL from %s", p)
|
|
|
+ log.Printf("starter: failed to get /etcdURL from %s", u)
|
|
|
continue
|
|
|
}
|
|
|
b, err := ioutil.ReadAll(resp.Body)
|
|
|
if err != nil {
|
|
|
- log.Printf("migrate: failed to read body from %s", p)
|
|
|
+ log.Printf("starter: failed to read body from %s", u)
|
|
|
continue
|
|
|
}
|
|
|
- resp, err = c.Get(string(b) + "/version")
|
|
|
+ urls = append(urls, string(b))
|
|
|
+ }
|
|
|
+ return urls
|
|
|
+}
|
|
|
+
|
|
|
+func checkInternalVersionByClientURLs(urls []string, tls *TLSInfo) (version, error) {
|
|
|
+ c, err := newDefaultClient(tls)
|
|
|
+ if err != nil {
|
|
|
+ return internalUnknown, err
|
|
|
+ }
|
|
|
+ for _, u := range urls {
|
|
|
+ resp, err := c.Get(u + "/version")
|
|
|
if err != nil {
|
|
|
- log.Printf("migrate: failed to get /version from %s", p)
|
|
|
+ log.Printf("starter: failed to get /version from %s", u)
|
|
|
continue
|
|
|
}
|
|
|
- b, err = ioutil.ReadAll(resp.Body)
|
|
|
+ b, err := ioutil.ReadAll(resp.Body)
|
|
|
if err != nil {
|
|
|
- log.Printf("migrate: failed to read body from %s", p)
|
|
|
+ log.Printf("starter: failed to read body from %s", u)
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
var m map[string]string
|
|
|
err = json.Unmarshal(b, &m)
|
|
|
if err != nil {
|
|
|
- log.Printf("migrate: failed to unmarshal body %s from %s", b, p)
|
|
|
+ log.Printf("starter: failed to unmarshal body %s from %s", b, u)
|
|
|
continue
|
|
|
}
|
|
|
switch m["internalVersion"] {
|
|
|
@@ -206,10 +273,10 @@ func checkStartVersionByMembers(discoverURL string, peers []string, tls *TLSInfo
|
|
|
case "2":
|
|
|
return internalV2, nil
|
|
|
default:
|
|
|
- log.Printf("migrate: unrecognized internal version %s from %s", m["internalVersion"], p)
|
|
|
+ log.Printf("starter: unrecognized internal version %s from %s", m["internalVersion"], u)
|
|
|
}
|
|
|
}
|
|
|
- return internalUnknown, fmt.Errorf("failed to get version from peers %v", possiblePeers)
|
|
|
+ return internalUnknown, fmt.Errorf("failed to get version from urls %v", urls)
|
|
|
}
|
|
|
|
|
|
func getPeersFromDiscoveryURL(discoverURL string) ([]string, error) {
|
|
|
@@ -246,6 +313,14 @@ func getPeersFromDiscoveryURL(discoverURL string) ([]string, error) {
|
|
|
return peers, nil
|
|
|
}
|
|
|
|
|
|
+func getPeersFromPeersFlag(str string, tls *TLSInfo) []string {
|
|
|
+ peers := trimSplit(str, ",")
|
|
|
+ for i, p := range peers {
|
|
|
+ peers[i] = tls.Scheme() + "://" + p
|
|
|
+ }
|
|
|
+ return peers
|
|
|
+}
|
|
|
+
|
|
|
func startInternalV1() {
|
|
|
p := os.Getenv("ETCD_BINARY_DIR")
|
|
|
if p == "" {
|
|
|
@@ -254,10 +329,22 @@ func startInternalV1() {
|
|
|
p = path.Join(p, "1")
|
|
|
err := syscall.Exec(p, os.Args, syscall.Environ())
|
|
|
if err != nil {
|
|
|
- log.Fatalf("migrate: failed to execute internal v1 etcd: %v", err)
|
|
|
+ log.Fatalf("starter: failed to execute internal v1 etcd: %v", err)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+func newDefaultClient(tls *TLSInfo) (*http.Client, error) {
|
|
|
+ tr := &http.Transport{}
|
|
|
+ if tls.Scheme() == "https" {
|
|
|
+ tlsConfig, err := tls.ClientConfig()
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ tr.TLSClientConfig = tlsConfig
|
|
|
+ }
|
|
|
+ return &http.Client{Transport: tr}, nil
|
|
|
+}
|
|
|
+
|
|
|
type value struct {
|
|
|
s string
|
|
|
}
|
|
|
@@ -285,6 +372,22 @@ func parseConfig(args []string) (*flag.FlagSet, error) {
|
|
|
return fs, nil
|
|
|
}
|
|
|
|
|
|
+func clientTLSInfo(fs *flag.FlagSet) *TLSInfo {
|
|
|
+ return &TLSInfo{
|
|
|
+ CAFile: fs.Lookup("ca-file").Value.String(),
|
|
|
+ CertFile: fs.Lookup("cert-file").Value.String(),
|
|
|
+ KeyFile: fs.Lookup("key-file").Value.String(),
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func peerTLSInfo(fs *flag.FlagSet) *TLSInfo {
|
|
|
+ return &TLSInfo{
|
|
|
+ CAFile: fs.Lookup("peer-ca-file").Value.String(),
|
|
|
+ CertFile: fs.Lookup("peer-cert-file").Value.String(),
|
|
|
+ KeyFile: fs.Lookup("peer-key-file").Value.String(),
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
func snapDir4(dataDir string) string {
|
|
|
return path.Join(dataDir, "snapshot")
|
|
|
}
|
|
|
@@ -293,6 +396,10 @@ func logFile4(dataDir string) string {
|
|
|
return path.Join(dataDir, "log")
|
|
|
}
|
|
|
|
|
|
+func standbyInfo4(dataDir string) string {
|
|
|
+ return path.Join(dataDir, "standby_info")
|
|
|
+}
|
|
|
+
|
|
|
func trimSplit(s, sep string) []string {
|
|
|
trimmed := strings.Split(s, sep)
|
|
|
for i := range trimmed {
|