Explorar o código

Merge pull request #2190 from yichengq/308

migrate: support start desired version
Yicheng Qin %!s(int64=11) %!d(string=hai) anos
pai
achega
ebf9daff74

+ 10 - 2
etcdmain/config.go

@@ -231,7 +231,11 @@ func (cfg *config) Parse(arguments []string) error {
 		return ErrConflictBootstrapFlags
 	}
 
-	cfg.lpurls, err = flags.URLsFromFlags(cfg.FlagSet, "listen-peer-urls", "peer-bind-addr", cfg.peerTLSInfo)
+	peerBindAddrFlag := "peer-bind-addr"
+	if !flags.IsSet(cfg.FlagSet, peerBindAddrFlag) {
+		peerBindAddrFlag = "peer-addr"
+	}
+	cfg.lpurls, err = flags.URLsFromFlags(cfg.FlagSet, "listen-peer-urls", peerBindAddrFlag, cfg.peerTLSInfo)
 	if err != nil {
 		return err
 	}
@@ -239,7 +243,11 @@ func (cfg *config) Parse(arguments []string) error {
 	if err != nil {
 		return err
 	}
-	cfg.lcurls, err = flags.URLsFromFlags(cfg.FlagSet, "listen-client-urls", "bind-addr", cfg.clientTLSInfo)
+	bindAddrFlag := "bind-addr"
+	if !flags.IsSet(cfg.FlagSet, bindAddrFlag) {
+		bindAddrFlag = "addr"
+	}
+	cfg.lcurls, err = flags.URLsFromFlags(cfg.FlagSet, "listen-client-urls", bindAddrFlag, cfg.clientTLSInfo)
 	if err != nil {
 		return err
 	}

+ 14 - 0
main.go

@@ -24,9 +24,23 @@
 package main
 
 import (
+	"log"
+	"os"
+	"strconv"
+
 	"github.com/coreos/etcd/etcdmain"
+	"github.com/coreos/etcd/migrate/starter"
 )
 
 func main() {
+	if str := os.Getenv("ETCD_ALLOW_LEGACY_MODE"); str != "" {
+		v, err := strconv.ParseBool(str)
+		if err != nil {
+			log.Fatalf("failed to parse ETCD_ALLOW_LEGACY_MODE=%s as bool", str)
+		}
+		if v {
+			starter.StartDesiredVersion(os.Args[1:])
+		}
+	}
 	etcdmain.Main()
 }

+ 27 - 0
migrate/functional/README.md

@@ -0,0 +1,27 @@
+
+etcd migration functional tests
+=====
+
+This functional test suite deploys a etcd cluster using processes, and asserts etcd is functioning properly.
+
+Dependencies
+------------
+
+The test suite can only be run in linux system. It's recommended to run this in a virtual machine environment on CoreOS (e.g. using coreos-vagrant). The only dependency for the tests not provided on the CoreOS image is go.
+
+Usage
+-----
+
+Set environment variables point to the respective binaries that are used to drive the actual tests:
+
+```
+$ export ETCD_V1_BIN=/path/to/v1_etcd
+$ export ETCD_V2_BIN=/path/to/v2_etcd
+$ export ETCDCTL_BIN=/path/to/etcdctl
+```
+
+Then the tests can be run:
+
+```
+$ go test github.com/coreos/etcd/migrate/functional
+```

+ 256 - 0
migrate/functional/member.go

@@ -0,0 +1,256 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package functional
+
+import (
+	"fmt"
+	"io"
+	"io/ioutil"
+	"net/http"
+	"os"
+	"os/exec"
+	"strconv"
+	"strings"
+	"time"
+)
+
+type Proc struct {
+	*exec.Cmd
+	Name    string
+	DataDir string
+	URL     string
+	PeerURL string
+
+	stderr io.ReadCloser
+}
+
+func NewProcWithDefaultFlags(path string) *Proc {
+	var args []string
+	dir, err := ioutil.TempDir(os.TempDir(), "etcd")
+	if err != nil {
+		fmt.Printf("unexpected TempDir error: %v", err)
+		os.Exit(1)
+	}
+	args = append(args, "--data-dir="+dir)
+	args = append(args, "--name=default")
+	p := &Proc{
+		Cmd:     exec.Command(path, args...),
+		Name:    "default",
+		DataDir: dir,
+		URL:     "http://127.0.0.1:4001",
+		PeerURL: "http://127.0.0.1:7001",
+	}
+	// always expect to use start_desired_verson mode
+	p.Env = append(p.Env,
+		"ETCD_ALLOW_LEGACY_MODE=true",
+		"ETCD_BINARY_DIR="+binDir,
+	)
+	return p
+}
+
+func NewProcWithV1Flags(path string) *Proc {
+	p := NewProcWithDefaultFlags(path)
+	p.SetV1PeerAddr("127.0.0.1:7001")
+	return p
+}
+
+func NewProcWithV2Flags(path string) *Proc {
+	p := NewProcWithDefaultFlags(path)
+	p.SetV2PeerURL("http://127.0.0.1:7001")
+	return p
+}
+
+func (p *Proc) SetV2PeerURL(url string) {
+	p.Args = append(p.Args,
+		"-listen-peer-urls="+url,
+		"-initial-advertise-peer-urls="+url,
+		"-initial-cluster",
+		p.Name+"="+url,
+	)
+	p.PeerURL = url
+}
+
+func (p *Proc) SetV1PeerAddr(addr string) {
+	p.Args = append(p.Args,
+		"-peer-addr="+addr,
+	)
+	p.PeerURL = "http://" + addr
+}
+
+func (p *Proc) SetV1Addr(addr string) {
+	p.Args = append(p.Args,
+		"-addr="+addr,
+	)
+	p.URL = "http://" + addr
+}
+
+func (p *Proc) SetV1Peers(peers []string) {
+	p.Args = append(p.Args,
+		"-peers="+strings.Join(peers, ","),
+	)
+}
+
+func (p *Proc) SetName(name string) {
+	p.Args = append(p.Args,
+		"-name="+name,
+	)
+	p.Name = name
+}
+
+func (p *Proc) SetDataDir(dataDir string) {
+	p.Args = append(p.Args,
+		"-data-dir="+dataDir,
+	)
+	p.DataDir = dataDir
+}
+
+func (p *Proc) SetSnapCount(cnt int) {
+	p.Args = append(p.Args,
+		"-snapshot-count="+strconv.Itoa(cnt),
+	)
+}
+
+func (p *Proc) SetDiscovery(url string) {
+	p.Args = append(p.Args,
+		"-discovery="+url,
+	)
+}
+
+func (p *Proc) CleanUnsuppportedV1Flags() {
+	var args []string
+	for _, arg := range p.Args {
+		if !strings.HasPrefix(arg, "-peers=") {
+			args = append(args, arg)
+		}
+	}
+	p.Args = args
+}
+
+func (p *Proc) Start() error {
+	var err error
+	if p.stderr, err = p.Cmd.StderrPipe(); err != nil {
+		return err
+	}
+	if err := p.Cmd.Start(); err != nil {
+		return err
+	}
+	for k := 0; k < 50; k++ {
+		_, err := http.Get(p.URL)
+		if err == nil {
+			return nil
+		}
+		time.Sleep(100 * time.Millisecond)
+	}
+	errMsg, _ := ioutil.ReadAll(p.stderr)
+	return fmt.Errorf("instance %s failed to be available after a long time: %s", p.Name, errMsg)
+}
+
+func (p *Proc) Stop() {
+	if err := p.Cmd.Process.Kill(); err != nil {
+		fmt.Printf("Process Kill error: %v", err)
+		return
+	}
+	ioutil.ReadAll(p.stderr)
+	p.Cmd.Wait()
+}
+
+func (p *Proc) Restart() error {
+	p.Stop()
+	return p.Start()
+}
+
+func (p *Proc) Terminate() {
+	p.Stop()
+	os.RemoveAll(p.DataDir)
+}
+
+type ProcGroup []*Proc
+
+func NewProcGroupWithV1Flags(path string, num int) ProcGroup {
+	pg := make([]*Proc, num)
+	pg[0] = NewProcWithDefaultFlags(path)
+	pg[0].SetName("etcd0")
+	for i := 1; i < num; i++ {
+		pg[i] = NewProcWithDefaultFlags(path)
+		pg[i].SetName(fmt.Sprintf("etcd%d", i))
+		pg[i].SetV1PeerAddr(fmt.Sprintf("127.0.0.1:%d", 7001+i))
+		pg[i].SetV1Addr(fmt.Sprintf("127.0.0.1:%d", 4001+i))
+		pg[i].SetV1Peers([]string{"127.0.0.1:7001"})
+	}
+	return pg
+}
+
+func NewProcGroupViaDiscoveryWithV1Flags(path string, num int, url string) ProcGroup {
+	pg := make([]*Proc, num)
+	for i := range pg {
+		pg[i] = NewProcWithDefaultFlags(path)
+		pg[i].SetName(fmt.Sprintf("etcd%d", i))
+		pg[i].SetDiscovery(url)
+		pg[i].SetV1PeerAddr(fmt.Sprintf("127.0.0.1:%d", 7001+i))
+		pg[i].SetV1Addr(fmt.Sprintf("127.0.0.1:%d", 4001+i))
+	}
+	return pg
+}
+
+func (pg ProcGroup) InheritDataDir(opg ProcGroup) {
+	for i := range pg {
+		pg[i].SetDataDir(opg[i].DataDir)
+	}
+}
+
+func (pg ProcGroup) SetSnapCount(count int) {
+	for i := range pg {
+		pg[i].SetSnapCount(count)
+	}
+}
+
+func (pg ProcGroup) CleanUnsuppportedV1Flags() {
+	for _, p := range pg {
+		p.CleanUnsuppportedV1Flags()
+	}
+}
+
+func (pg ProcGroup) Start() error {
+	for _, p := range pg {
+		if err := p.Start(); err != nil {
+			return err
+		}
+	}
+	// leave time for instances to sync and write some entries into disk
+	// TODO: use more reliable method
+	time.Sleep(time.Second)
+	return nil
+}
+
+func (pg ProcGroup) Wait() error {
+	for _, p := range pg {
+		if err := p.Wait(); err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+func (pg ProcGroup) Stop() {
+	for _, p := range pg {
+		p.Stop()
+	}
+}
+
+func (pg ProcGroup) Terminate() {
+	for _, p := range pg {
+		p.Terminate()
+	}
+}

+ 292 - 0
migrate/functional/upgrade_test.go

@@ -0,0 +1,292 @@
+package functional
+
+import (
+	"encoding/json"
+	"fmt"
+	"io/ioutil"
+	"net/http"
+	"os"
+	"os/exec"
+	"path"
+	"path/filepath"
+	"testing"
+)
+
+var (
+	binDir         = ".versions"
+	v1BinPath      = path.Join(binDir, "1")
+	v2BinPath      = path.Join(binDir, "2")
+	etcdctlBinPath string
+)
+
+func init() {
+	os.RemoveAll(binDir)
+	if err := os.Mkdir(binDir, 0700); err != nil {
+		fmt.Printf("unexpected Mkdir error: %v\n", err)
+		os.Exit(1)
+	}
+	if err := os.Symlink(absPathFromEnv("ETCD_V1_BIN"), v1BinPath); err != nil {
+		fmt.Printf("unexpected Symlink error: %v\n", err)
+		os.Exit(1)
+	}
+	if err := os.Symlink(absPathFromEnv("ETCD_V2_BIN"), v2BinPath); err != nil {
+		fmt.Printf("unexpected Symlink error: %v\n", err)
+		os.Exit(1)
+	}
+	etcdctlBinPath = os.Getenv("ETCDCTL_BIN")
+
+	mustExist(v1BinPath)
+	mustExist(v2BinPath)
+	mustExist(etcdctlBinPath)
+}
+
+func TestStartNewMember(t *testing.T) {
+	tests := []*Proc{
+		NewProcWithDefaultFlags(v2BinPath),
+		NewProcWithV1Flags(v2BinPath),
+		NewProcWithV2Flags(v2BinPath),
+	}
+	for i, tt := range tests {
+		if err := tt.Start(); err != nil {
+			t.Fatalf("#%d: Start error: %v", i, err)
+		}
+		defer tt.Terminate()
+
+		ver, err := checkInternalVersion(tt.URL)
+		if err != nil {
+			t.Fatalf("#%d: checkVersion error: %v", i, err)
+		}
+		if ver != "2" {
+			t.Errorf("#%d: internal version = %s, want %s", i, ver, "2")
+		}
+	}
+}
+
+func TestStartV2Member(t *testing.T) {
+	tests := []*Proc{
+		NewProcWithDefaultFlags(v2BinPath),
+		NewProcWithV1Flags(v2BinPath),
+		NewProcWithV2Flags(v2BinPath),
+	}
+	for i, tt := range tests {
+		// get v2 data dir
+		p := NewProcWithDefaultFlags(v2BinPath)
+		if err := p.Start(); err != nil {
+			t.Fatalf("#%d: Start error: %v", i, err)
+		}
+		p.Stop()
+		tt.SetDataDir(p.DataDir)
+		if err := tt.Start(); err != nil {
+			t.Fatalf("#%d: Start error: %v", i, err)
+		}
+		defer tt.Terminate()
+
+		ver, err := checkInternalVersion(tt.URL)
+		if err != nil {
+			t.Fatalf("#%d: checkVersion error: %v", i, err)
+		}
+		if ver != "2" {
+			t.Errorf("#%d: internal version = %s, want %s", i, ver, "2")
+		}
+	}
+}
+
+func TestStartV1Member(t *testing.T) {
+	tests := []*Proc{
+		NewProcWithDefaultFlags(v2BinPath),
+		NewProcWithV1Flags(v2BinPath),
+		NewProcWithV2Flags(v2BinPath),
+	}
+	for i, tt := range tests {
+		// get v1 data dir
+		p := NewProcWithDefaultFlags(v1BinPath)
+		if err := p.Start(); err != nil {
+			t.Fatalf("#%d: Start error: %v", i, err)
+		}
+		p.Stop()
+		tt.SetDataDir(p.DataDir)
+		if err := tt.Start(); err != nil {
+			t.Fatalf("#%d: Start error: %v", i, err)
+		}
+		defer tt.Terminate()
+
+		ver, err := checkInternalVersion(tt.URL)
+		if err != nil {
+			t.Fatalf("#%d: checkVersion error: %v", i, err)
+		}
+		if ver != "1" {
+			t.Errorf("#%d: internal version = %s, want %s", i, ver, "1")
+		}
+	}
+}
+
+func TestUpgradeV1Cluster(t *testing.T) {
+	// get v2-desired v1 data dir
+	pg := NewProcGroupWithV1Flags(v1BinPath, 3)
+	if err := pg.Start(); err != nil {
+		t.Fatalf("Start error: %v", err)
+	}
+	cmd := exec.Command(etcdctlBinPath, "upgrade", "--peer-url", pg[1].PeerURL)
+	if err := cmd.Start(); err != nil {
+		t.Fatalf("Start error: %v", err)
+	}
+	if err := cmd.Wait(); err != nil {
+		t.Fatalf("Wait error: %v", err)
+	}
+	t.Logf("wait until etcd exits...")
+	if err := pg.Wait(); err != nil {
+		t.Fatalf("Wait error: %v", err)
+	}
+
+	npg := NewProcGroupWithV1Flags(v2BinPath, 3)
+	npg.InheritDataDir(pg)
+	npg.CleanUnsuppportedV1Flags()
+	if err := npg.Start(); err != nil {
+		t.Fatalf("Start error: %v", err)
+	}
+	defer npg.Terminate()
+
+	for _, p := range npg {
+		ver, err := checkInternalVersion(p.URL)
+		if err != nil {
+			t.Fatalf("checkVersion error: %v", err)
+		}
+		if ver != "2" {
+			t.Errorf("internal version = %s, want %s", ver, "2")
+		}
+	}
+}
+
+func TestUpgradeV1SnapshotedCluster(t *testing.T) {
+	// get v2-desired v1 data dir
+	pg := NewProcGroupWithV1Flags(v1BinPath, 3)
+	pg.SetSnapCount(10)
+	if err := pg.Start(); err != nil {
+		t.Fatalf("Start error: %v", err)
+	}
+	cmd := exec.Command(etcdctlBinPath, "upgrade", "--peer-url", pg[1].PeerURL)
+	if err := cmd.Start(); err != nil {
+		t.Fatalf("Start error: %v", err)
+	}
+	if err := cmd.Wait(); err != nil {
+		t.Fatalf("Wait error: %v", err)
+	}
+	t.Logf("wait until etcd exits...")
+	if err := pg.Wait(); err != nil {
+		t.Fatalf("Wait error: %v", err)
+	}
+	for _, p := range pg {
+		// check it has taken snapshot
+		fis, err := ioutil.ReadDir(path.Join(p.DataDir, "snapshot"))
+		if err != nil {
+			t.Fatalf("unexpected ReadDir error: %v", err)
+		}
+		if len(fis) == 0 {
+			t.Fatalf("unexpected no-snapshot data dir")
+		}
+	}
+
+	npg := NewProcGroupWithV1Flags(v2BinPath, 3)
+	npg.InheritDataDir(pg)
+	npg.CleanUnsuppportedV1Flags()
+	if err := npg.Start(); err != nil {
+		t.Fatalf("Start error: %v", err)
+	}
+	defer npg.Terminate()
+
+	for _, p := range npg {
+		ver, err := checkInternalVersion(p.URL)
+		if err != nil {
+			t.Fatalf("checkVersion error: %v", err)
+		}
+		if ver != "2" {
+			t.Errorf("internal version = %s, want %s", ver, "2")
+		}
+	}
+}
+
+func TestJoinV1Cluster(t *testing.T) {
+	pg := NewProcGroupWithV1Flags(v1BinPath, 1)
+	if err := pg.Start(); err != nil {
+		t.Fatalf("Start error: %v", err)
+	}
+	pg.Stop()
+	npg := NewProcGroupWithV1Flags(v2BinPath, 3)
+	npg[0].SetDataDir(pg[0].DataDir)
+	if err := npg.Start(); err != nil {
+		t.Fatalf("Start error: %v", err)
+	}
+	defer npg.Terminate()
+
+	for _, p := range npg {
+		ver, err := checkInternalVersion(p.URL)
+		if err != nil {
+			t.Fatalf("checkVersion error: %v", err)
+		}
+		if ver != "1" {
+			t.Errorf("internal version = %s, want %s", ver, "1")
+		}
+	}
+}
+
+func TestJoinV1ClusterViaDiscovery(t *testing.T) {
+	dp := NewProcWithDefaultFlags(v1BinPath)
+	dp.SetV1Addr("127.0.0.1:5001")
+	dp.SetV1PeerAddr("127.0.0.1:8001")
+	if err := dp.Start(); err != nil {
+		t.Fatalf("Start error: %v", err)
+	}
+	defer dp.Terminate()
+
+	durl := "http://127.0.0.1:5001/v2/keys/cluster/"
+	pg := NewProcGroupViaDiscoveryWithV1Flags(v1BinPath, 1, durl)
+	if err := pg.Start(); err != nil {
+		t.Fatalf("Start error: %v", err)
+	}
+	pg.Stop()
+	npg := NewProcGroupViaDiscoveryWithV1Flags(v2BinPath, 3, durl)
+	npg[0].SetDataDir(pg[0].DataDir)
+	if err := npg.Start(); err != nil {
+		t.Fatalf("Start error: %v", err)
+	}
+	defer npg.Terminate()
+
+	for _, p := range npg {
+		ver, err := checkInternalVersion(p.URL)
+		if err != nil {
+			t.Fatalf("checkVersion error: %v", err)
+		}
+		if ver != "1" {
+			t.Errorf("internal version = %s, want %s", ver, "1")
+		}
+	}
+}
+
+func absPathFromEnv(name string) string {
+	path, err := filepath.Abs(os.Getenv(name))
+	if err != nil {
+		fmt.Printf("unexpected Abs error: %v\n", err)
+	}
+	return path
+}
+
+func mustExist(path string) {
+	if _, err := os.Stat(path); err != nil {
+		fmt.Printf("%v\n", err)
+		os.Exit(1)
+	}
+}
+
+func checkInternalVersion(url string) (string, error) {
+	resp, err := http.Get(url + "/version")
+	if err != nil {
+		return "", err
+	}
+	b, err := ioutil.ReadAll(resp.Body)
+	if err != nil {
+		return "", err
+	}
+	var m map[string]string
+	err = json.Unmarshal(b, &m)
+	return m["internalVersion"], err
+}

+ 3 - 3
migrate/snapshot.go

@@ -43,7 +43,7 @@ type Snapshot4 struct {
 	} `json:"peers"`
 }
 
-type sstore struct {
+type Store4 struct {
 	Root           *node
 	CurrentIndex   uint64
 	CurrentVersion int
@@ -165,7 +165,7 @@ func mangleRoot(n *node) *node {
 }
 
 func (s *Snapshot4) GetNodesFromStore() map[string]uint64 {
-	st := &sstore{}
+	st := &Store4{}
 	if err := json.Unmarshal(s.State, st); err != nil {
 		log.Fatal("Couldn't unmarshal snapshot")
 	}
@@ -174,7 +174,7 @@ func (s *Snapshot4) GetNodesFromStore() map[string]uint64 {
 }
 
 func (s *Snapshot4) Snapshot2() *raftpb.Snapshot {
-	st := &sstore{}
+	st := &Store4{}
 	if err := json.Unmarshal(s.State, st); err != nil {
 		log.Fatal("Couldn't unmarshal snapshot")
 	}

+ 302 - 0
migrate/starter/starter.go

@@ -0,0 +1,302 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package starter
+
+import (
+	"encoding/json"
+	"flag"
+	"fmt"
+	"io/ioutil"
+	"log"
+	"net/http"
+	"net/url"
+	"os"
+	"path"
+	"strings"
+	"syscall"
+
+	"github.com/coreos/etcd/client"
+	"github.com/coreos/etcd/etcdmain"
+	"github.com/coreos/etcd/migrate"
+	"github.com/coreos/etcd/pkg/flags"
+	"github.com/coreos/etcd/wal"
+
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
+)
+
+type version string
+
+const (
+	internalV1      version = "1"
+	internalV2      version = "2"
+	internalUnknown version = "unknown"
+
+	defaultInternalV1etcdBinaryDir = "/usr/libexec/etcd/versions/"
+)
+
+func StartDesiredVersion(args []string) {
+	switch checkStartVersion(args) {
+	case internalV1:
+		startInternalV1()
+	case internalV2:
+	default:
+		log.Panicf("migrate: unhandled start version")
+	}
+}
+
+func checkStartVersion(args []string) version {
+	fs, err := parseConfig(args)
+	if err != nil {
+		return internalV2
+	}
+	// If it uses 2.0 env var explicitly, start 2.0
+	if fs.Lookup("initial-cluster").Value.String() != "" {
+		return internalV2
+	}
+
+	dataDir := fs.Lookup("data-dir").Value.String()
+	if dataDir == "" {
+		log.Fatalf("migrate: please set ETCD_DATA_DIR for etcd")
+	}
+	// check the data directory
+	walVersion, err := wal.DetectVersion(dataDir)
+	if err != nil {
+		log.Fatalf("migrate: failed to detect etcd version in %v: %v", dataDir, err)
+	}
+	log.Printf("migrate: detect etcd version %s in %s", walVersion, dataDir)
+	switch walVersion {
+	case wal.WALv0_5:
+		return internalV2
+	case wal.WALv0_4:
+		// TODO: standby case
+		// if it is standby guy:
+		//     print out detect standby mode
+		//     go to WALNotExist case
+		//     if want to start with 2.0:
+		//         remove old data dir to avoid auto migration
+		//         try to let it fallback? or use local proxy file?
+		ver, err := checkStartVersionByDataDir4(dataDir)
+		if err != nil {
+			log.Fatalf("migrate: failed to check start version in %v: %v", dataDir, err)
+		}
+		return ver
+	case wal.WALUnknown:
+		log.Fatalf("migrate: unknown etcd version in %v", dataDir)
+	case wal.WALNotExist:
+		discovery := fs.Lookup("discovery").Value.String()
+		peers := trimSplit(fs.Lookup("peers").Value.String(), ",")
+		peerTLSInfo := &TLSInfo{
+			CAFile:   fs.Lookup("peer-ca-file").Value.String(),
+			CertFile: fs.Lookup("peer-cert-file").Value.String(),
+			KeyFile:  fs.Lookup("peer-key-file").Value.String(),
+		}
+		ver, err := checkStartVersionByMembers(discovery, peers, peerTLSInfo)
+		if err != nil {
+			log.Printf("migrate: failed to check start version through peers: %v", err)
+			break
+		}
+		return ver
+	default:
+		log.Panicf("migrate: unhandled etcd version in %v", dataDir)
+	}
+	return internalV2
+}
+
+func checkStartVersionByDataDir4(dataDir string) (version, error) {
+	// check v0.4 snapshot
+	snap4, err := migrate.DecodeLatestSnapshot4FromDir(snapDir4(dataDir))
+	if err != nil {
+		return internalUnknown, err
+	}
+	if snap4 != nil {
+		st := &migrate.Store4{}
+		if err := json.Unmarshal(snap4.State, st); err != nil {
+			return internalUnknown, err
+		}
+		dir := st.Root.Children["_etcd"]
+		n, ok := dir.Children["next-internal-version"]
+		if ok && n.Value == "2" {
+			return internalV2, nil
+		}
+	}
+
+	// check v0.4 log
+	ents4, err := migrate.DecodeLog4FromFile(logFile4(dataDir))
+	if err != nil {
+		return internalUnknown, err
+	}
+	for _, e := range ents4 {
+		cmd, err := migrate.NewCommand4(e.GetCommandName(), e.GetCommand(), nil)
+		if err != nil {
+			return internalUnknown, err
+		}
+		setcmd, ok := cmd.(*migrate.SetCommand)
+		if !ok {
+			continue
+		}
+		if setcmd.Key == "/_etcd/next-internal-version" && setcmd.Value == "2" {
+			return internalV2, nil
+		}
+	}
+	return internalV1, nil
+}
+
+func checkStartVersionByMembers(discoverURL string, peers []string, tls *TLSInfo) (version, error) {
+	tr := &http.Transport{}
+	if tls.Scheme() == "https" {
+		tlsConfig, err := tls.ClientConfig()
+		if err != nil {
+			return internalUnknown, err
+		}
+		tr.TLSClientConfig = tlsConfig
+	}
+	c := &http.Client{Transport: tr}
+
+	possiblePeers, err := getPeersFromDiscoveryURL(discoverURL)
+	if err != nil {
+		return internalUnknown, err
+	}
+	for _, p := range peers {
+		possiblePeers = append(possiblePeers, tls.Scheme()+"://"+p)
+	}
+
+	for _, p := range possiblePeers {
+		resp, err := c.Get(p + "/etcdURL")
+		if err != nil {
+			log.Printf("migrate: failed to get /etcdURL from %s", p)
+			continue
+		}
+		b, err := ioutil.ReadAll(resp.Body)
+		if err != nil {
+			log.Printf("migrate: failed to read body from %s", p)
+			continue
+		}
+		resp, err = c.Get(string(b) + "/version")
+		if err != nil {
+			log.Printf("migrate: failed to get /version from %s", p)
+			continue
+		}
+		b, err = ioutil.ReadAll(resp.Body)
+		if err != nil {
+			log.Printf("migrate: failed to read body from %s", p)
+			continue
+		}
+
+		var m map[string]string
+		err = json.Unmarshal(b, &m)
+		if err != nil {
+			log.Printf("migrate: failed to unmarshal body %s from %s", b, p)
+			continue
+		}
+		switch m["internalVersion"] {
+		case "1":
+			return internalV1, nil
+		case "2":
+			return internalV2, nil
+		default:
+			log.Printf("migrate: unrecognized internal version %s from %s", m["internalVersion"], p)
+		}
+	}
+	return internalUnknown, fmt.Errorf("failed to get version from peers %v", possiblePeers)
+}
+
+func getPeersFromDiscoveryURL(discoverURL string) ([]string, error) {
+	if discoverURL == "" {
+		return nil, nil
+	}
+
+	u, err := url.Parse(discoverURL)
+	if err != nil {
+		return nil, err
+	}
+	token := u.Path
+	u.Path = ""
+	c, err := client.NewHTTPClient(&http.Transport{}, []string{u.String()})
+	if err != nil {
+		return nil, err
+	}
+	dc := client.NewDiscoveryKeysAPI(c)
+
+	ctx, cancel := context.WithTimeout(context.Background(), client.DefaultRequestTimeout)
+	resp, err := dc.Get(ctx, token)
+	cancel()
+	if err != nil {
+		return nil, err
+	}
+	peers := make([]string, 0)
+	// append non-config keys to peers
+	for _, n := range resp.Node.Nodes {
+		if g := path.Base(n.Key); g == "_config" || g == "_state" {
+			continue
+		}
+		peers = append(peers, n.Value)
+	}
+	return peers, nil
+}
+
+func startInternalV1() {
+	p := os.Getenv("ETCD_BINARY_DIR")
+	if p == "" {
+		p = defaultInternalV1etcdBinaryDir
+	}
+	p = path.Join(p, "1")
+	err := syscall.Exec(p, os.Args, syscall.Environ())
+	if err != nil {
+		log.Fatalf("migrate: failed to execute internal v1 etcd: %v", err)
+	}
+}
+
+type value struct {
+	s string
+}
+
+func (v *value) String() string { return v.s }
+
+func (v *value) Set(s string) error {
+	v.s = s
+	return nil
+}
+
+// parseConfig parses out the input config from cmdline arguments and
+// environment variables.
+func parseConfig(args []string) (*flag.FlagSet, error) {
+	fs := flag.NewFlagSet("full flagset", flag.ContinueOnError)
+	etcdmain.NewConfig().VisitAll(func(f *flag.Flag) {
+		fs.Var(&value{}, f.Name, "")
+	})
+	if err := fs.Parse(args); err != nil {
+		return nil, err
+	}
+	if err := flags.SetFlagsFromEnv(fs); err != nil {
+		return nil, err
+	}
+	return fs, nil
+}
+
+func snapDir4(dataDir string) string {
+	return path.Join(dataDir, "snapshot")
+}
+
+func logFile4(dataDir string) string {
+	return path.Join(dataDir, "log")
+}
+
+func trimSplit(s, sep string) []string {
+	trimmed := strings.Split(s, sep)
+	for i := range trimmed {
+		trimmed[i] = strings.TrimSpace(trimmed[i])
+	}
+	return trimmed
+}

+ 120 - 0
migrate/starter/tls_info.go

@@ -0,0 +1,120 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package starter
+
+import (
+	"crypto/tls"
+	"crypto/x509"
+	"encoding/pem"
+	"fmt"
+	"io/ioutil"
+)
+
+// TLSInfo holds the SSL certificates paths.
+type TLSInfo struct {
+	CertFile string `json:"CertFile"`
+	KeyFile  string `json:"KeyFile"`
+	CAFile   string `json:"CAFile"`
+}
+
+func (info TLSInfo) Scheme() string {
+	if info.KeyFile != "" && info.CertFile != "" {
+		return "https"
+	} else {
+		return "http"
+	}
+}
+
+// Generates a tls.Config object for a server from the given files.
+func (info TLSInfo) ServerConfig() (*tls.Config, error) {
+	// Both the key and cert must be present.
+	if info.KeyFile == "" || info.CertFile == "" {
+		return nil, fmt.Errorf("KeyFile and CertFile must both be present[key: %v, cert: %v]", info.KeyFile, info.CertFile)
+	}
+
+	var cfg tls.Config
+
+	tlsCert, err := tls.LoadX509KeyPair(info.CertFile, info.KeyFile)
+	if err != nil {
+		return nil, err
+	}
+
+	cfg.Certificates = []tls.Certificate{tlsCert}
+
+	if info.CAFile != "" {
+		cfg.ClientAuth = tls.RequireAndVerifyClientCert
+		cp, err := newCertPool(info.CAFile)
+		if err != nil {
+			return nil, err
+		}
+
+		cfg.RootCAs = cp
+		cfg.ClientCAs = cp
+	} else {
+		cfg.ClientAuth = tls.NoClientCert
+	}
+
+	return &cfg, nil
+}
+
+// Generates a tls.Config object for a client from the given files.
+func (info TLSInfo) ClientConfig() (*tls.Config, error) {
+	var cfg tls.Config
+
+	if info.KeyFile == "" || info.CertFile == "" {
+		return &cfg, nil
+	}
+
+	tlsCert, err := tls.LoadX509KeyPair(info.CertFile, info.KeyFile)
+	if err != nil {
+		return nil, err
+	}
+
+	cfg.Certificates = []tls.Certificate{tlsCert}
+
+	if info.CAFile != "" {
+		cp, err := newCertPool(info.CAFile)
+		if err != nil {
+			return nil, err
+		}
+
+		cfg.RootCAs = cp
+	}
+
+	return &cfg, nil
+}
+
+// newCertPool creates x509 certPool with provided CA file
+func newCertPool(CAFile string) (*x509.CertPool, error) {
+	certPool := x509.NewCertPool()
+	pemByte, err := ioutil.ReadFile(CAFile)
+	if err != nil {
+		return nil, err
+	}
+
+	for {
+		var block *pem.Block
+		block, pemByte = pem.Decode(pemByte)
+		if block == nil {
+			return certPool, nil
+		}
+		cert, err := x509.ParseCertificate(block.Bytes)
+		if err != nil {
+			return nil, err
+		}
+		certPool.AddCert(cert)
+	}
+
+}

+ 10 - 0
pkg/flags/flag.go

@@ -119,3 +119,13 @@ func URLsFromFlags(fs *flag.FlagSet, urlsFlagName string, addrFlagName string, t
 
 	return []url.URL(*fs.Lookup(urlsFlagName).Value.(*URLsValue)), nil
 }
+
+func IsSet(fs *flag.FlagSet, name string) bool {
+	set := false
+	fs.Visit(func(f *flag.Flag) {
+		if f.Name == name {
+			set = true
+		}
+	})
+	return set
+}