Browse Source

Merge pull request #1743 from barakmich/auto_upgrade

etcdserver: autodetect v0.4 WALs and upgrade them to v0.5 automatically
Barak Michener 11 years ago
parent
commit
5139257b8d

+ 33 - 5
etcdserver/server.go

@@ -36,6 +36,7 @@ import (
 	"github.com/coreos/etcd/etcdserver/etcdhttp/httptypes"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/etcdserver/stats"
+	"github.com/coreos/etcd/migrate"
 	"github.com/coreos/etcd/pkg/pbutil"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/pkg/wait"
@@ -190,18 +191,45 @@ type EtcdServer struct {
 	raftLead uint64
 }
 
+// UpgradeWAL converts an older version of the EtcdServer data to the newest version.
+// It must ensure that, after upgrading, the most recent version is present.
+func UpgradeWAL(cfg *ServerConfig, ver wal.WalVersion) error {
+	if ver == wal.WALv0_4 {
+		log.Print("Converting v0.4 log to v0.5")
+		err := migrate.Migrate4To5(cfg.DataDir, cfg.Name)
+		if err != nil {
+			log.Fatalf("Failed migrating data-dir: %v", err)
+			return err
+		}
+	}
+	return nil
+}
+
 // NewServer creates a new EtcdServer from the supplied configuration. The
 // configuration is considered static for the lifetime of the EtcdServer.
 func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
-	if err := os.MkdirAll(cfg.SnapDir(), privateDirMode); err != nil {
-		return nil, fmt.Errorf("cannot create snapshot directory: %v", err)
-	}
-	ss := snap.New(cfg.SnapDir())
 	st := store.New()
 	var w *wal.WAL
 	var n raft.Node
 	var id types.ID
-	haveWAL := wal.Exist(cfg.WALDir())
+	walVersion := wal.DetectVersion(cfg.DataDir)
+	if walVersion == wal.WALUnknown {
+		return nil, fmt.Errorf("unknown wal version in data dir %s", cfg.DataDir)
+	}
+	haveWAL := walVersion != wal.WALNotExist
+
+	if haveWAL && walVersion != wal.WALv0_5 {
+		err := UpgradeWAL(cfg, walVersion)
+		if err != nil {
+			return nil, err
+		}
+	}
+
+	if err := os.MkdirAll(cfg.SnapDir(), privateDirMode); err != nil {
+		return nil, fmt.Errorf("cannot create snapshot directory: %v", err)
+	}
+	ss := snap.New(cfg.SnapDir())
+
 	switch {
 	case !haveWAL && !cfg.NewCluster:
 		us := getOtherPeerURLs(cfg.Cluster, cfg.Name)

+ 18 - 12
integration/cluster_test.go

@@ -146,16 +146,7 @@ type cluster struct {
 	Members []*member
 }
 
-// NewCluster returns an unlaunched cluster of the given size which has been
-// set to use static bootstrap.
-func NewCluster(t *testing.T, size int) *cluster {
-	c := &cluster{}
-	ms := make([]*member, size)
-	for i := 0; i < size; i++ {
-		ms[i] = mustNewMember(t, c.name(i))
-	}
-	c.Members = ms
-
+func fillClusterForMembers(ms []*member, cName string) error {
 	addrs := make([]string, 0)
 	for _, m := range ms {
 		for _, l := range m.PeerListeners {
@@ -165,11 +156,26 @@ func NewCluster(t *testing.T, size int) *cluster {
 	clusterStr := strings.Join(addrs, ",")
 	var err error
 	for _, m := range ms {
-		m.Cluster, err = etcdserver.NewClusterFromString(clusterName, clusterStr)
+		m.Cluster, err = etcdserver.NewClusterFromString(cName, clusterStr)
 		if err != nil {
-			t.Fatal(err)
+			return err
 		}
 	}
+	return nil
+}
+
+// NewCluster returns an unlaunched cluster of the given size which has been
+// set to use static bootstrap.
+func NewCluster(t *testing.T, size int) *cluster {
+	c := &cluster{}
+	ms := make([]*member, size)
+	for i := 0; i < size; i++ {
+		ms[i] = mustNewMember(t, c.name(i))
+	}
+	c.Members = ms
+	if err := fillClusterForMembers(c.Members, clusterName); err != nil {
+		t.Fatal(err)
+	}
 
 	return c
 }

+ 34 - 0
integration/migration_test.go

@@ -0,0 +1,34 @@
+package integration
+
+import (
+	"github.com/coreos/etcd/pkg/types"
+	"net"
+	"os/exec"
+	"testing"
+)
+
+func TestUpgradeMember(t *testing.T) {
+	defer afterTest(t)
+	m := mustNewMember(t, "integration046")
+	newPeerListeners := make([]net.Listener, 0)
+	newPeerListeners = append(newPeerListeners, newListenerWithAddr(t, "127.0.0.1:59892"))
+	m.PeerListeners = newPeerListeners
+	urls, err := types.NewURLs([]string{"http://127.0.0.1:59892"})
+	if err != nil {
+		t.Fatal(err)
+	}
+	m.PeerURLs = urls
+	m.NewCluster = true
+	c := &cluster{}
+	c.Members = []*member{m}
+	fillClusterForMembers(c.Members, "etcd-cluster")
+	cmd := exec.Command("cp", "-r", "testdata/integration046_data/conf", "testdata/integration046_data/log", "testdata/integration046_data/snapshot", m.DataDir)
+	err = cmd.Run()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	c.Launch(t)
+	defer c.Terminate(t)
+	clusterMustProgress(t, c)
+}

+ 1 - 0
integration/testdata/integration046_data/conf

@@ -0,0 +1 @@
+{"commitIndex":1,"peers":[]}

BIN
integration/testdata/integration046_data/log


File diff suppressed because it is too large
+ 1 - 0
integration/testdata/integration046_data/snapshot/1_90.ss


+ 3 - 2
migrate/etcd4.go

@@ -8,6 +8,7 @@ import (
 
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/pkg/pbutil"
+	"github.com/coreos/etcd/pkg/types"
 	raftpb "github.com/coreos/etcd/raft/raftpb"
 	"github.com/coreos/etcd/snap"
 	"github.com/coreos/etcd/wal"
@@ -125,12 +126,12 @@ func GuessNodeID(nodes map[string]uint64, snap4 *Snapshot4, cfg *Config4, name s
 	if name != "" {
 		log.Printf("Using suggested name %s", name)
 		if val, ok := nodes[name]; ok {
-			log.Printf("Found ID %d", val)
+			log.Printf("Found ID %s", types.ID(val))
 			return val
 		}
 		if snapNodes != nil {
 			if val, ok := snapNodes[name]; ok {
-				log.Printf("Found ID %d", val)
+				log.Printf("Found ID %s", types.ID(val))
 				return val
 			}
 		}

+ 4 - 5
migrate/log.go

@@ -10,7 +10,6 @@ import (
 	"path"
 	"time"
 
-	"github.com/coreos/etcd/etcdserver"
 	etcdserverpb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	etcd4pb "github.com/coreos/etcd/migrate/etcd4pb"
 	"github.com/coreos/etcd/pkg/types"
@@ -56,7 +55,7 @@ func (l Log4) NodeIDs() map[string]uint64 {
 }
 
 func StorePath(key string) string {
-	return path.Join(etcdserver.StoreKeysPrefix, key)
+	return path.Join("/1", key)
 }
 
 func DecodeLog4FromFile(logpath string) (Log4, error) {
@@ -214,7 +213,7 @@ type JoinCommand struct {
 	Name    string `json:"name"`
 	RaftURL string `json:"raftURL"`
 	EtcdURL string `json:"etcdURL"`
-	memb    etcdserver.Member
+	memb    member
 }
 
 func (c *JoinCommand) Type5() raftpb.EntryType {
@@ -496,13 +495,13 @@ func toEntry5(ent4 *etcd4pb.LogEntry, raftMap map[string]uint64) (*raftpb.Entry,
 	return &ent5, nil
 }
 
-func generateNodeMember(name, rafturl, etcdurl string) *etcdserver.Member {
+func generateNodeMember(name, rafturl, etcdurl string) *member {
 	pURLs, err := types.NewURLs([]string{rafturl})
 	if err != nil {
 		log.Fatalf("Invalid Raft URL %s -- this log could never have worked", rafturl)
 	}
 
-	m := etcdserver.NewMember(name, pURLs, etcdDefaultClusterName, nil)
+	m := NewMember(name, pURLs, etcdDefaultClusterName)
 	m.ClientURLs = []string{etcdurl}
 	return m
 }

+ 1 - 3
migrate/log_test.go

@@ -6,8 +6,6 @@ import (
 	"reflect"
 	"testing"
 	"time"
-
-	"github.com/coreos/etcd/etcdserver"
 )
 
 func TestNewCommand(t *testing.T) {
@@ -21,7 +19,7 @@ func TestNewCommand(t *testing.T) {
 		t.Errorf("couldn't create time: %v", err)
 	}
 
-	m := etcdserver.NewMember("alice", []url.URL{{Scheme: "http", Host: "127.0.0.1:7001"}}, etcdDefaultClusterName, nil)
+	m := NewMember("alice", []url.URL{{Scheme: "http", Host: "127.0.0.1:7001"}}, etcdDefaultClusterName)
 	m.ClientURLs = []string{"http://127.0.0.1:4001"}
 
 	tests := []interface{}{

+ 59 - 0
migrate/member.go

@@ -0,0 +1,59 @@
+/*
+   Copyright 2014 CoreOS, Inc.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+package migrate
+
+import (
+	"crypto/sha1"
+	"encoding/binary"
+	"sort"
+
+	"github.com/coreos/etcd/pkg/types"
+)
+
+type raftAttributes struct {
+	PeerURLs []string `json:"peerURLs"`
+}
+
+type attributes struct {
+	Name       string   `json:"name,omitempty"`
+	ClientURLs []string `json:"clientURLs,omitempty"`
+}
+
+type member struct {
+	ID types.ID `json:"id"`
+	raftAttributes
+	attributes
+}
+
+func NewMember(name string, peerURLs types.URLs, clusterName string) *member {
+	m := &member{
+		raftAttributes: raftAttributes{PeerURLs: peerURLs.StringSlice()},
+		attributes:     attributes{Name: name},
+	}
+
+	var b []byte
+	sort.Strings(m.PeerURLs)
+	for _, p := range m.PeerURLs {
+		b = append(b, []byte(p)...)
+	}
+
+	b = append(b, []byte(clusterName)...)
+
+	hash := sha1.Sum(b)
+	m.ID = types.ID(binary.BigEndian.Uint64(hash[:8]))
+	return m
+}

+ 9 - 7
migrate/snapshot.go

@@ -93,11 +93,11 @@ func fixEtcd(n *node) {
 		rafturl := q.Get("raft")
 
 		m := generateNodeMember(name, rafturl, etcdurl)
-		attrBytes, err := json.Marshal(m.Attributes)
+		attrBytes, err := json.Marshal(m.attributes)
 		if err != nil {
 			log.Fatal("Couldn't marshal attributes")
 		}
-		raftBytes, err := json.Marshal(m.RaftAttributes)
+		raftBytes, err := json.Marshal(m.raftAttributes)
 		if err != nil {
 			log.Fatal("Couldn't marshal raft attributes")
 		}
@@ -171,15 +171,17 @@ func (s *Snapshot4) Snapshot5() *raftpb.Snapshot {
 		log.Fatal("Couldn't re-marshal new snapshot")
 	}
 
+	nodes := s.GetNodesFromStore()
+	nodeList := make([]uint64, 0)
+	for _, v := range nodes {
+		nodeList = append(nodeList, v)
+	}
+
 	snap5 := raftpb.Snapshot{
 		Data:  newState,
 		Index: s.LastIndex,
 		Term:  s.LastTerm,
-		Nodes: make([]uint64, len(s.Peers)),
-	}
-
-	for i, p := range s.Peers {
-		snap5.Nodes[i] = hashName(p.Name)
+		Nodes: nodeList,
 	}
 
 	return &snap5

+ 180 - 0
pkg/types/set.go

@@ -0,0 +1,180 @@
+/*
+   Copyright 2014 CoreOS, Inc.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+package types
+
+import (
+	"reflect"
+	"sort"
+	"sync"
+)
+
+type Set interface {
+	Add(string)
+	Remove(string)
+	Contains(string) bool
+	Equals(Set) bool
+	Length() int
+	Values() []string
+	Copy() Set
+	Sub(Set) Set
+}
+
+func NewUnsafeSet(values ...string) *unsafeSet {
+	set := &unsafeSet{make(map[string]struct{})}
+	for _, v := range values {
+		set.Add(v)
+	}
+	return set
+}
+
+func NewThreadsafeSet(values ...string) *tsafeSet {
+	us := NewUnsafeSet(values...)
+	return &tsafeSet{us, sync.RWMutex{}}
+}
+
+type unsafeSet struct {
+	d map[string]struct{}
+}
+
+// Add adds a new value to the set (no-op if the value is already present)
+func (us *unsafeSet) Add(value string) {
+	us.d[value] = struct{}{}
+}
+
+// Remove removes the given value from the set
+func (us *unsafeSet) Remove(value string) {
+	delete(us.d, value)
+}
+
+// Contains returns whether the set contains the given value
+func (us *unsafeSet) Contains(value string) (exists bool) {
+	_, exists = us.d[value]
+	return
+}
+
+// ContainsAll returns whether the set contains all given values
+func (us *unsafeSet) ContainsAll(values []string) bool {
+	for _, s := range values {
+		if !us.Contains(s) {
+			return false
+		}
+	}
+	return true
+}
+
+// Equals returns whether the contents of two sets are identical
+func (us *unsafeSet) Equals(other Set) bool {
+	v1 := sort.StringSlice(us.Values())
+	v2 := sort.StringSlice(other.Values())
+	v1.Sort()
+	v2.Sort()
+	return reflect.DeepEqual(v1, v2)
+}
+
+// Length returns the number of elements in the set
+func (us *unsafeSet) Length() int {
+	return len(us.d)
+}
+
+// Values returns the values of the Set in an unspecified order.
+func (us *unsafeSet) Values() (values []string) {
+	values = make([]string, 0)
+	for val, _ := range us.d {
+		values = append(values, val)
+	}
+	return
+}
+
+// Copy creates a new Set containing the values of the first
+func (us *unsafeSet) Copy() Set {
+	cp := NewUnsafeSet()
+	for val, _ := range us.d {
+		cp.Add(val)
+	}
+
+	return cp
+}
+
+// Sub removes all elements in other from the set
+func (us *unsafeSet) Sub(other Set) Set {
+	oValues := other.Values()
+	result := us.Copy().(*unsafeSet)
+
+	for _, val := range oValues {
+		if _, ok := result.d[val]; !ok {
+			continue
+		}
+		delete(result.d, val)
+	}
+
+	return result
+}
+
+type tsafeSet struct {
+	us *unsafeSet
+	m  sync.RWMutex
+}
+
+func (ts *tsafeSet) Add(value string) {
+	ts.m.Lock()
+	defer ts.m.Unlock()
+	ts.us.Add(value)
+}
+
+func (ts *tsafeSet) Remove(value string) {
+	ts.m.Lock()
+	defer ts.m.Unlock()
+	ts.us.Remove(value)
+}
+
+func (ts *tsafeSet) Contains(value string) (exists bool) {
+	ts.m.RLock()
+	defer ts.m.RUnlock()
+	return ts.us.Contains(value)
+}
+
+func (ts *tsafeSet) Equals(other Set) bool {
+	ts.m.RLock()
+	defer ts.m.RUnlock()
+	return ts.us.Equals(other)
+}
+
+func (ts *tsafeSet) Length() int {
+	ts.m.RLock()
+	defer ts.m.RUnlock()
+	return ts.us.Length()
+}
+
+func (ts *tsafeSet) Values() (values []string) {
+	ts.m.RLock()
+	defer ts.m.RUnlock()
+	return ts.us.Values()
+}
+
+func (ts *tsafeSet) Copy() Set {
+	ts.m.RLock()
+	defer ts.m.RUnlock()
+	usResult := ts.us.Copy().(*unsafeSet)
+	return &tsafeSet{usResult, sync.RWMutex{}}
+}
+
+func (ts *tsafeSet) Sub(other Set) Set {
+	ts.m.RLock()
+	defer ts.m.RUnlock()
+	usResult := ts.us.Sub(other).(*unsafeSet)
+	return &tsafeSet{usResult, sync.RWMutex{}}
+}

+ 166 - 0
pkg/types/set_test.go

@@ -0,0 +1,166 @@
+/*
+   Copyright 2014 CoreOS, Inc.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+package types
+
+import (
+	"reflect"
+	"sort"
+	"testing"
+)
+
+func TestUnsafeSet(t *testing.T) {
+	driveSetTests(t, NewUnsafeSet())
+}
+
+func TestThreadsafeSet(t *testing.T) {
+	driveSetTests(t, NewThreadsafeSet())
+}
+
+// Check that two slices contents are equal; order is irrelevant
+func equal(a, b []string) bool {
+	as := sort.StringSlice(a)
+	bs := sort.StringSlice(b)
+	as.Sort()
+	bs.Sort()
+	return reflect.DeepEqual(as, bs)
+}
+
+func driveSetTests(t *testing.T, s Set) {
+	// Verify operations on an empty set
+	eValues := []string{}
+	values := s.Values()
+	if !reflect.DeepEqual(values, eValues) {
+		t.Fatalf("Expect values=%v got %v", eValues, values)
+	}
+	if l := s.Length(); l != 0 {
+		t.Fatalf("Expected length=0, got %d", l)
+	}
+	for _, v := range []string{"foo", "bar", "baz"} {
+		if s.Contains(v) {
+			t.Fatalf("Expect s.Contains(%q) to be fale, got true", v)
+		}
+	}
+
+	// Add three items, ensure they show up
+	s.Add("foo")
+	s.Add("bar")
+	s.Add("baz")
+
+	eValues = []string{"foo", "bar", "baz"}
+	values = s.Values()
+	if !equal(values, eValues) {
+		t.Fatalf("Expect values=%v got %v", eValues, values)
+	}
+
+	for _, v := range eValues {
+		if !s.Contains(v) {
+			t.Fatalf("Expect s.Contains(%q) to be true, got false", v)
+		}
+	}
+
+	if l := s.Length(); l != 3 {
+		t.Fatalf("Expected length=3, got %d", l)
+	}
+
+	// Add the same item a second time, ensuring it is not duplicated
+	s.Add("foo")
+
+	values = s.Values()
+	if !equal(values, eValues) {
+		t.Fatalf("Expect values=%v got %v", eValues, values)
+	}
+	if l := s.Length(); l != 3 {
+		t.Fatalf("Expected length=3, got %d", l)
+	}
+
+	// Remove all items, ensure they are gone
+	s.Remove("foo")
+	s.Remove("bar")
+	s.Remove("baz")
+
+	eValues = []string{}
+	values = s.Values()
+	if !equal(values, eValues) {
+		t.Fatalf("Expect values=%v got %v", eValues, values)
+	}
+
+	if l := s.Length(); l != 0 {
+		t.Fatalf("Expected length=0, got %d", l)
+	}
+
+	// Create new copies of the set, and ensure they are unlinked to the
+	// original Set by making modifications
+	s.Add("foo")
+	s.Add("bar")
+	cp1 := s.Copy()
+	cp2 := s.Copy()
+	s.Remove("foo")
+	cp3 := s.Copy()
+	cp1.Add("baz")
+
+	for i, tt := range []struct {
+		want []string
+		got  []string
+	}{
+		{[]string{"bar"}, s.Values()},
+		{[]string{"foo", "bar", "baz"}, cp1.Values()},
+		{[]string{"foo", "bar"}, cp2.Values()},
+		{[]string{"bar"}, cp3.Values()},
+	} {
+		if !equal(tt.want, tt.got) {
+			t.Fatalf("case %d: expect values=%v got %v", i, tt.want, tt.got)
+		}
+	}
+
+	for i, tt := range []struct {
+		want bool
+		got  bool
+	}{
+		{true, s.Equals(cp3)},
+		{true, cp3.Equals(s)},
+		{false, s.Equals(cp2)},
+		{false, s.Equals(cp1)},
+		{false, cp1.Equals(s)},
+		{false, cp2.Equals(s)},
+		{false, cp2.Equals(cp1)},
+	} {
+		if tt.got != tt.want {
+			t.Fatalf("case %d: want %t, got %t", i, tt.want, tt.got)
+
+		}
+	}
+
+	// Subtract values from a Set, ensuring a new Set is created and
+	// the original Sets are unmodified
+	sub1 := cp1.Sub(s)
+	sub2 := cp2.Sub(cp1)
+
+	for i, tt := range []struct {
+		want []string
+		got  []string
+	}{
+		{[]string{"foo", "bar", "baz"}, cp1.Values()},
+		{[]string{"foo", "bar"}, cp2.Values()},
+		{[]string{"bar"}, s.Values()},
+		{[]string{"foo", "baz"}, sub1.Values()},
+		{[]string{}, sub2.Values()},
+	} {
+		if !equal(tt.want, tt.got) {
+			t.Fatalf("case %d: expect values=%v got %v", i, tt.want, tt.got)
+		}
+	}
+}

+ 33 - 0
wal/util.go

@@ -20,8 +20,41 @@ import (
 	"fmt"
 	"log"
 	"os"
+	"path"
+
+	"github.com/coreos/etcd/pkg/types"
+)
+
+// WalVersion is an enum for versions of etcd logs.
+type WalVersion string
+
+const (
+	WALUnknown  WalVersion = "Unknown WAL"
+	WALNotExist WalVersion = "No WAL"
+	WALv0_4     WalVersion = "0.4.x"
+	WALv0_5     WalVersion = "0.5.x"
 )
 
+func DetectVersion(dirpath string) WalVersion {
+	names, err := readDir(dirpath)
+	if err != nil || len(names) == 0 {
+		return WALNotExist
+	}
+	nameSet := types.NewUnsafeSet(names...)
+	if nameSet.ContainsAll([]string{"snap", "wal"}) {
+		// .../wal cannot be empty to exist.
+		if Exist(path.Join(dirpath, "wal")) {
+			return WALv0_5
+		}
+		return WALNotExist
+	}
+	if nameSet.ContainsAll([]string{"snapshot", "conf", "log"}) {
+		return WALv0_4
+	}
+
+	return WALUnknown
+}
+
 func Exist(dirpath string) bool {
 	names, err := readDir(dirpath)
 	if err != nil {

Some files were not shown because too many files changed in this diff