Browse Source

etcd4: migration from v0.4 -> v0.5

Brian Waldon 11 years ago
parent
commit
5ea1f2d96f

+ 1 - 0
etcdserver/cluster_store.go

@@ -59,6 +59,7 @@ func (s *clusterStore) Get() Cluster {
 		if err := json.Unmarshal([]byte(*n.Value), &m); err != nil {
 			log.Panicf("unmarshal peer error: %v", err)
 		}
+		log.Printf("Found member in cluster: %#v", m)
 		err := c.Add(m)
 		if err != nil {
 			log.Panicf("add member to cluster should never fail: %v", err)

+ 1 - 1
etcdserver/member.go

@@ -47,7 +47,7 @@ type Member struct {
 	Attributes
 }
 
-// newMember creates a Member without an ID and generates one based on the
+// NewMember creates a Member without an ID and generates one based on the
 // name, peer URLs. This is used for bootstrapping/adding new member.
 func NewMember(name string, peerURLs types.URLs, clusterName string, now *time.Time) *Member {
 	m := &Member{

+ 90 - 0
migrate/cmd/etcd-dump-logs/main.go

@@ -0,0 +1,90 @@
+package main
+
+import (
+	"errors"
+	"flag"
+	"fmt"
+	"log"
+	"path"
+
+	etcdserverpb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	"github.com/coreos/etcd/migrate"
+	raftpb "github.com/coreos/etcd/raft/raftpb"
+	"github.com/coreos/etcd/wal"
+)
+
+func walDir5(dataDir string) string {
+	return path.Join(dataDir, "wal")
+}
+
+func logFile4(dataDir string) string {
+	return path.Join(dataDir, "log")
+}
+
+func main() {
+	version := flag.Int("version", 5, "4 or 5")
+	from := flag.String("data-dir", "", "")
+	flag.Parse()
+
+	if *from == "" {
+		log.Fatal("Must provide -from flag")
+	}
+
+	var ents []raftpb.Entry
+	var err error
+	switch *version {
+	case 4:
+		ents, err = dump4(*from)
+	case 5:
+		ents, err = dump5(*from)
+	default:
+		err = errors.New("value of -version flag must be 4 or 5")
+	}
+
+	if err != nil {
+		log.Fatalf("Failed decoding log: %v", err)
+	}
+
+	for _, e := range ents {
+		msg := fmt.Sprintf("%2d %5d: ", e.Term, e.Index)
+		switch e.Type {
+		case raftpb.EntryNormal:
+			msg = fmt.Sprintf("%s norm", msg)
+			var r etcdserverpb.Request
+			if err := r.Unmarshal(e.Data); err != nil {
+				msg = fmt.Sprintf("%s ???", msg)
+			} else {
+				msg = fmt.Sprintf("%s %s %s %s", msg, r.Method, r.Path, r.Val)
+			}
+		case raftpb.EntryConfChange:
+			msg = fmt.Sprintf("%s conf", msg)
+		}
+		fmt.Println(msg)
+	}
+}
+
+func dump4(dataDir string) ([]raftpb.Entry, error) {
+	lf4 := logFile4(dataDir)
+	ents, err := migrate.DecodeLog4FromFile(lf4)
+	if err != nil {
+		return nil, err
+	}
+
+	return migrate.Entries4To5(0, ents)
+}
+
+func dump5(dataDir string) ([]raftpb.Entry, error) {
+	wd5 := walDir5(dataDir)
+	if !wal.Exist(wd5) {
+		return nil, fmt.Errorf("No wal exists at %s", wd5)
+	}
+
+	w, err := wal.OpenAtIndex(wd5, 0)
+	if err != nil {
+		return nil, err
+	}
+	defer w.Close()
+
+	_, _, ents, err := w.ReadAll()
+	return ents, err
+}

+ 22 - 0
migrate/cmd/etcd-migrate/main.go

@@ -0,0 +1,22 @@
+package main
+
+import (
+	"flag"
+	"log"
+
+	"github.com/coreos/etcd/migrate"
+)
+
+func main() {
+	from := flag.String("data-dir", "", "etcd v0.4 data-dir")
+	flag.Parse()
+
+	if *from == "" {
+		log.Fatal("Must provide -from flag")
+	}
+
+	err := migrate.Migrate4To5(*from)
+	if err != nil {
+		log.Fatalf("Failed migrating data-dir: %v", err)
+	}
+}

+ 40 - 0
migrate/config.go

@@ -0,0 +1,40 @@
+package migrate
+
+import (
+	"encoding/json"
+	"io/ioutil"
+
+	raftpb "github.com/coreos/etcd/raft/raftpb"
+)
+
+type Config4 struct {
+	CommitIndex uint64 `json:"commitIndex"`
+
+	//TODO(bcwaldon): is this needed?
+	//Peers []struct{
+	//	Name             string `json:"name"`
+	//	ConnectionString string `json:"connectionString"`
+	//}	`json:"peers"`
+}
+
+func (c *Config4) HardState5() raftpb.HardState {
+	return raftpb.HardState{
+		Commit: int64(c.CommitIndex),
+		Term:   0,
+		Vote:   0,
+	}
+}
+
+func DecodeConfig4FromFile(cfgPath string) (*Config4, error) {
+	b, err := ioutil.ReadFile(cfgPath)
+	if err != nil {
+		return nil, err
+	}
+
+	conf := &Config4{}
+	if err = json.Unmarshal(b, conf); err != nil {
+		return nil, err
+	}
+
+	return conf, nil
+}

+ 98 - 0
migrate/etcd4.go

@@ -0,0 +1,98 @@
+package migrate
+
+import (
+	"fmt"
+	"log"
+	"os"
+	"path"
+
+	raftpb "github.com/coreos/etcd/raft/raftpb"
+	"github.com/coreos/etcd/snap"
+	"github.com/coreos/etcd/wal"
+)
+
+func snapDir4(dataDir string) string {
+	return path.Join(dataDir, "snapshot")
+}
+
+func logFile4(dataDir string) string {
+	return path.Join(dataDir, "log")
+}
+
+func cfgFile4(dataDir string) string {
+	return path.Join(dataDir, "conf")
+}
+
+func snapDir5(dataDir string) string {
+	return path.Join(dataDir, "snap")
+}
+
+func walDir5(dataDir string) string {
+	return path.Join(dataDir, "wal")
+}
+
+func Migrate4To5(dataDir string) error {
+	// prep new directories
+	sd5 := snapDir5(dataDir)
+	if err := os.MkdirAll(sd5, 0700); err != nil {
+		return fmt.Errorf("failed creating snapshot directory %s: %v", sd5, err)
+	}
+
+	wd5 := walDir5(dataDir)
+	w, err := wal.Create(wd5)
+	if err != nil {
+		return fmt.Errorf("failed initializing wal at %s: %v", wd5, err)
+	}
+	defer w.Close()
+
+	// read v0.4 data
+	snap4, err := DecodeLatestSnapshot4FromDir(snapDir4(dataDir))
+	if err != nil {
+		return err
+	}
+
+	cfg4, err := DecodeConfig4FromFile(cfgFile4(dataDir))
+	if err != nil {
+		return err
+	}
+
+	ents4, err := DecodeLog4FromFile(logFile4(dataDir))
+	if err != nil {
+		return err
+	}
+
+	// transform v0.4 data
+	var snap5 *raftpb.Snapshot
+	if snap4 == nil {
+		log.Printf("No snapshot found")
+	} else {
+		log.Printf("Found snapshot: lastIndex=%d", snap4.LastIndex)
+
+		snap5 = snap4.Snapshot5()
+	}
+
+	st5 := cfg4.HardState5()
+
+	ents5, err := Entries4To5(uint64(st5.Commit), ents4)
+	if err != nil {
+		return err
+	}
+
+	ents5Len := len(ents5)
+	log.Printf("Found %d log entries: firstIndex=%d lastIndex=%d", ents5Len, ents5[0].Index, ents5[ents5Len-1].Index)
+
+	// migrate snapshot (if necessary) and logs
+	if snap5 != nil {
+		ss := snap.New(sd5)
+		ss.SaveSnap(*snap5)
+		log.Printf("Snapshot migration successful")
+	}
+
+	// explicitly prepend an empty entry as the WAL code expects it
+	ents5 = append(make([]raftpb.Entry, 1), ents5...)
+
+	w.Save(st5, ents5)
+	log.Printf("Log migration successful")
+
+	return nil
+}

+ 552 - 0
migrate/etcd4pb/log_entry.pb.go

@@ -0,0 +1,552 @@
+// Code generated by protoc-gen-gogo.
+// source: log_entry.proto
+// DO NOT EDIT!
+
+package protobuf
+
+import proto "github.com/coreos/etcd/third_party/code.google.com/p/gogoprotobuf/proto"
+import json "encoding/json"
+import math "math"
+
+// discarding unused import gogoproto "code.google.com/p/gogoprotobuf/gogoproto/gogo.pb"
+
+import io "io"
+import code_google_com_p_gogoprotobuf_proto "github.com/coreos/etcd/third_party/code.google.com/p/gogoprotobuf/proto"
+
+import fmt "fmt"
+import strings "strings"
+import reflect "reflect"
+
+import fmt1 "fmt"
+import strings1 "strings"
+import code_google_com_p_gogoprotobuf_proto1 "github.com/coreos/etcd/third_party/code.google.com/p/gogoprotobuf/proto"
+import sort "sort"
+import strconv "strconv"
+import reflect1 "reflect"
+
+import fmt2 "fmt"
+import bytes "bytes"
+
+// Reference proto, json, and math imports to suppress error if they are not otherwise used.
+var _ = proto.Marshal
+var _ = &json.SyntaxError{}
+var _ = math.Inf
+
+type LogEntry struct {
+	Index            *uint64 `protobuf:"varint,1,req" json:"Index,omitempty"`
+	Term             *uint64 `protobuf:"varint,2,req" json:"Term,omitempty"`
+	CommandName      *string `protobuf:"bytes,3,req" json:"CommandName,omitempty"`
+	Command          []byte  `protobuf:"bytes,4,opt" json:"Command,omitempty"`
+	XXX_unrecognized []byte  `json:"-"`
+}
+
+func (m *LogEntry) Reset()      { *m = LogEntry{} }
+func (*LogEntry) ProtoMessage() {}
+
+func (m *LogEntry) GetIndex() uint64 {
+	if m != nil && m.Index != nil {
+		return *m.Index
+	}
+	return 0
+}
+
+func (m *LogEntry) GetTerm() uint64 {
+	if m != nil && m.Term != nil {
+		return *m.Term
+	}
+	return 0
+}
+
+func (m *LogEntry) GetCommandName() string {
+	if m != nil && m.CommandName != nil {
+		return *m.CommandName
+	}
+	return ""
+}
+
+func (m *LogEntry) GetCommand() []byte {
+	if m != nil {
+		return m.Command
+	}
+	return nil
+}
+
+func init() {
+}
+func (m *LogEntry) Unmarshal(data []byte) error {
+	l := len(data)
+	index := 0
+	for index < l {
+		var wire uint64
+		for shift := uint(0); ; shift += 7 {
+			if index >= l {
+				return io.ErrUnexpectedEOF
+			}
+			b := data[index]
+			index++
+			wire |= (uint64(b) & 0x7F) << shift
+			if b < 0x80 {
+				break
+			}
+		}
+		fieldNum := int32(wire >> 3)
+		wireType := int(wire & 0x7)
+		switch fieldNum {
+		case 1:
+			if wireType != 0 {
+				return code_google_com_p_gogoprotobuf_proto.ErrWrongType
+			}
+			var v uint64
+			for shift := uint(0); ; shift += 7 {
+				if index >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[index]
+				index++
+				v |= (uint64(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			m.Index = &v
+		case 2:
+			if wireType != 0 {
+				return code_google_com_p_gogoprotobuf_proto.ErrWrongType
+			}
+			var v uint64
+			for shift := uint(0); ; shift += 7 {
+				if index >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[index]
+				index++
+				v |= (uint64(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			m.Term = &v
+		case 3:
+			if wireType != 2 {
+				return code_google_com_p_gogoprotobuf_proto.ErrWrongType
+			}
+			var stringLen uint64
+			for shift := uint(0); ; shift += 7 {
+				if index >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[index]
+				index++
+				stringLen |= (uint64(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			postIndex := index + int(stringLen)
+			if postIndex > l {
+				return io.ErrUnexpectedEOF
+			}
+			s := string(data[index:postIndex])
+			m.CommandName = &s
+			index = postIndex
+		case 4:
+			if wireType != 2 {
+				return code_google_com_p_gogoprotobuf_proto.ErrWrongType
+			}
+			var byteLen int
+			for shift := uint(0); ; shift += 7 {
+				if index >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[index]
+				index++
+				byteLen |= (int(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			postIndex := index + byteLen
+			if postIndex > l {
+				return io.ErrUnexpectedEOF
+			}
+			m.Command = append(m.Command, data[index:postIndex]...)
+			index = postIndex
+		default:
+			var sizeOfWire int
+			for {
+				sizeOfWire++
+				wire >>= 7
+				if wire == 0 {
+					break
+				}
+			}
+			index -= sizeOfWire
+			skippy, err := code_google_com_p_gogoprotobuf_proto.Skip(data[index:])
+			if err != nil {
+				return err
+			}
+			if (index + skippy) > l {
+				return io.ErrUnexpectedEOF
+			}
+			m.XXX_unrecognized = append(m.XXX_unrecognized, data[index:index+skippy]...)
+			index += skippy
+		}
+	}
+	return nil
+}
+func (this *LogEntry) String() string {
+	if this == nil {
+		return "nil"
+	}
+	s := strings.Join([]string{`&LogEntry{`,
+		`Index:` + valueToStringLogEntry(this.Index) + `,`,
+		`Term:` + valueToStringLogEntry(this.Term) + `,`,
+		`CommandName:` + valueToStringLogEntry(this.CommandName) + `,`,
+		`Command:` + valueToStringLogEntry(this.Command) + `,`,
+		`XXX_unrecognized:` + fmt.Sprintf("%v", this.XXX_unrecognized) + `,`,
+		`}`,
+	}, "")
+	return s
+}
+func valueToStringLogEntry(v interface{}) string {
+	rv := reflect.ValueOf(v)
+	if rv.IsNil() {
+		return "nil"
+	}
+	pv := reflect.Indirect(rv).Interface()
+	return fmt.Sprintf("*%v", pv)
+}
+func (m *LogEntry) Size() (n int) {
+	var l int
+	_ = l
+	if m.Index != nil {
+		n += 1 + sovLogEntry(uint64(*m.Index))
+	}
+	if m.Term != nil {
+		n += 1 + sovLogEntry(uint64(*m.Term))
+	}
+	if m.CommandName != nil {
+		l = len(*m.CommandName)
+		n += 1 + l + sovLogEntry(uint64(l))
+	}
+	if m.Command != nil {
+		l = len(m.Command)
+		n += 1 + l + sovLogEntry(uint64(l))
+	}
+	if m.XXX_unrecognized != nil {
+		n += len(m.XXX_unrecognized)
+	}
+	return n
+}
+
+func sovLogEntry(x uint64) (n int) {
+	for {
+		n++
+		x >>= 7
+		if x == 0 {
+			break
+		}
+	}
+	return n
+}
+func sozLogEntry(x uint64) (n int) {
+	return sovLogEntry(uint64((x << 1) ^ uint64((int64(x) >> 63))))
+}
+func NewPopulatedLogEntry(r randyLogEntry, easy bool) *LogEntry {
+	this := &LogEntry{}
+	v1 := uint64(r.Uint32())
+	this.Index = &v1
+	v2 := uint64(r.Uint32())
+	this.Term = &v2
+	v3 := randStringLogEntry(r)
+	this.CommandName = &v3
+	if r.Intn(10) != 0 {
+		v4 := r.Intn(100)
+		this.Command = make([]byte, v4)
+		for i := 0; i < v4; i++ {
+			this.Command[i] = byte(r.Intn(256))
+		}
+	}
+	if !easy && r.Intn(10) != 0 {
+		this.XXX_unrecognized = randUnrecognizedLogEntry(r, 5)
+	}
+	return this
+}
+
+type randyLogEntry interface {
+	Float32() float32
+	Float64() float64
+	Int63() int64
+	Int31() int32
+	Uint32() uint32
+	Intn(n int) int
+}
+
+func randUTF8RuneLogEntry(r randyLogEntry) rune {
+	res := rune(r.Uint32() % 1112064)
+	if 55296 <= res {
+		res += 2047
+	}
+	return res
+}
+func randStringLogEntry(r randyLogEntry) string {
+	v5 := r.Intn(100)
+	tmps := make([]rune, v5)
+	for i := 0; i < v5; i++ {
+		tmps[i] = randUTF8RuneLogEntry(r)
+	}
+	return string(tmps)
+}
+func randUnrecognizedLogEntry(r randyLogEntry, maxFieldNumber int) (data []byte) {
+	l := r.Intn(5)
+	for i := 0; i < l; i++ {
+		wire := r.Intn(4)
+		if wire == 3 {
+			wire = 5
+		}
+		fieldNumber := maxFieldNumber + r.Intn(100)
+		data = randFieldLogEntry(data, r, fieldNumber, wire)
+	}
+	return data
+}
+func randFieldLogEntry(data []byte, r randyLogEntry, fieldNumber int, wire int) []byte {
+	key := uint32(fieldNumber)<<3 | uint32(wire)
+	switch wire {
+	case 0:
+		data = encodeVarintPopulateLogEntry(data, uint64(key))
+		data = encodeVarintPopulateLogEntry(data, uint64(r.Int63()))
+	case 1:
+		data = encodeVarintPopulateLogEntry(data, uint64(key))
+		data = append(data, byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)))
+	case 2:
+		data = encodeVarintPopulateLogEntry(data, uint64(key))
+		ll := r.Intn(100)
+		data = encodeVarintPopulateLogEntry(data, uint64(ll))
+		for j := 0; j < ll; j++ {
+			data = append(data, byte(r.Intn(256)))
+		}
+	default:
+		data = encodeVarintPopulateLogEntry(data, uint64(key))
+		data = append(data, byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)))
+	}
+	return data
+}
+func encodeVarintPopulateLogEntry(data []byte, v uint64) []byte {
+	for v >= 1<<7 {
+		data = append(data, uint8(uint64(v)&0x7f|0x80))
+		v >>= 7
+	}
+	data = append(data, uint8(v))
+	return data
+}
+func (m *LogEntry) Marshal() (data []byte, err error) {
+	size := m.Size()
+	data = make([]byte, size)
+	n, err := m.MarshalTo(data)
+	if err != nil {
+		return nil, err
+	}
+	return data[:n], nil
+}
+
+func (m *LogEntry) MarshalTo(data []byte) (n int, err error) {
+	var i int
+	_ = i
+	var l int
+	_ = l
+	if m.Index != nil {
+		data[i] = 0x8
+		i++
+		i = encodeVarintLogEntry(data, i, uint64(*m.Index))
+	}
+	if m.Term != nil {
+		data[i] = 0x10
+		i++
+		i = encodeVarintLogEntry(data, i, uint64(*m.Term))
+	}
+	if m.CommandName != nil {
+		data[i] = 0x1a
+		i++
+		i = encodeVarintLogEntry(data, i, uint64(len(*m.CommandName)))
+		i += copy(data[i:], *m.CommandName)
+	}
+	if m.Command != nil {
+		data[i] = 0x22
+		i++
+		i = encodeVarintLogEntry(data, i, uint64(len(m.Command)))
+		i += copy(data[i:], m.Command)
+	}
+	if m.XXX_unrecognized != nil {
+		i += copy(data[i:], m.XXX_unrecognized)
+	}
+	return i, nil
+}
+func encodeFixed64LogEntry(data []byte, offset int, v uint64) int {
+	data[offset] = uint8(v)
+	data[offset+1] = uint8(v >> 8)
+	data[offset+2] = uint8(v >> 16)
+	data[offset+3] = uint8(v >> 24)
+	data[offset+4] = uint8(v >> 32)
+	data[offset+5] = uint8(v >> 40)
+	data[offset+6] = uint8(v >> 48)
+	data[offset+7] = uint8(v >> 56)
+	return offset + 8
+}
+func encodeFixed32LogEntry(data []byte, offset int, v uint32) int {
+	data[offset] = uint8(v)
+	data[offset+1] = uint8(v >> 8)
+	data[offset+2] = uint8(v >> 16)
+	data[offset+3] = uint8(v >> 24)
+	return offset + 4
+}
+func encodeVarintLogEntry(data []byte, offset int, v uint64) int {
+	for v >= 1<<7 {
+		data[offset] = uint8(v&0x7f | 0x80)
+		v >>= 7
+		offset++
+	}
+	data[offset] = uint8(v)
+	return offset + 1
+}
+func (this *LogEntry) GoString() string {
+	if this == nil {
+		return "nil"
+	}
+	s := strings1.Join([]string{`&protobuf.LogEntry{` + `Index:` + valueToGoStringLogEntry(this.Index, "uint64"), `Term:` + valueToGoStringLogEntry(this.Term, "uint64"), `CommandName:` + valueToGoStringLogEntry(this.CommandName, "string"), `Command:` + valueToGoStringLogEntry(this.Command, "byte"), `XXX_unrecognized:` + fmt1.Sprintf("%#v", this.XXX_unrecognized) + `}`}, ", ")
+	return s
+}
+func valueToGoStringLogEntry(v interface{}, typ string) string {
+	rv := reflect1.ValueOf(v)
+	if rv.IsNil() {
+		return "nil"
+	}
+	pv := reflect1.Indirect(rv).Interface()
+	return fmt1.Sprintf("func(v %v) *%v { return &v } ( %#v )", typ, typ, pv)
+}
+func extensionToGoStringLogEntry(e map[int32]code_google_com_p_gogoprotobuf_proto1.Extension) string {
+	if e == nil {
+		return "nil"
+	}
+	s := "map[int32]proto.Extension{"
+	keys := make([]int, 0, len(e))
+	for k := range e {
+		keys = append(keys, int(k))
+	}
+	sort.Ints(keys)
+	ss := []string{}
+	for _, k := range keys {
+		ss = append(ss, strconv.Itoa(k)+": "+e[int32(k)].GoString())
+	}
+	s += strings1.Join(ss, ",") + "}"
+	return s
+}
+func (this *LogEntry) VerboseEqual(that interface{}) error {
+	if that == nil {
+		if this == nil {
+			return nil
+		}
+		return fmt2.Errorf("that == nil && this != nil")
+	}
+
+	that1, ok := that.(*LogEntry)
+	if !ok {
+		return fmt2.Errorf("that is not of type *LogEntry")
+	}
+	if that1 == nil {
+		if this == nil {
+			return nil
+		}
+		return fmt2.Errorf("that is type *LogEntry but is nil && this != nil")
+	} else if this == nil {
+		return fmt2.Errorf("that is type *LogEntrybut is not nil && this == nil")
+	}
+	if this.Index != nil && that1.Index != nil {
+		if *this.Index != *that1.Index {
+			return fmt2.Errorf("Index this(%v) Not Equal that(%v)", *this.Index, *that1.Index)
+		}
+	} else if this.Index != nil {
+		return fmt2.Errorf("this.Index == nil && that.Index != nil")
+	} else if that1.Index != nil {
+		return fmt2.Errorf("Index this(%v) Not Equal that(%v)", this.Index, that1.Index)
+	}
+	if this.Term != nil && that1.Term != nil {
+		if *this.Term != *that1.Term {
+			return fmt2.Errorf("Term this(%v) Not Equal that(%v)", *this.Term, *that1.Term)
+		}
+	} else if this.Term != nil {
+		return fmt2.Errorf("this.Term == nil && that.Term != nil")
+	} else if that1.Term != nil {
+		return fmt2.Errorf("Term this(%v) Not Equal that(%v)", this.Term, that1.Term)
+	}
+	if this.CommandName != nil && that1.CommandName != nil {
+		if *this.CommandName != *that1.CommandName {
+			return fmt2.Errorf("CommandName this(%v) Not Equal that(%v)", *this.CommandName, *that1.CommandName)
+		}
+	} else if this.CommandName != nil {
+		return fmt2.Errorf("this.CommandName == nil && that.CommandName != nil")
+	} else if that1.CommandName != nil {
+		return fmt2.Errorf("CommandName this(%v) Not Equal that(%v)", this.CommandName, that1.CommandName)
+	}
+	if !bytes.Equal(this.Command, that1.Command) {
+		return fmt2.Errorf("Command this(%v) Not Equal that(%v)", this.Command, that1.Command)
+	}
+	if !bytes.Equal(this.XXX_unrecognized, that1.XXX_unrecognized) {
+		return fmt2.Errorf("XXX_unrecognized this(%v) Not Equal that(%v)", this.XXX_unrecognized, that1.XXX_unrecognized)
+	}
+	return nil
+}
+func (this *LogEntry) Equal(that interface{}) bool {
+	if that == nil {
+		if this == nil {
+			return true
+		}
+		return false
+	}
+
+	that1, ok := that.(*LogEntry)
+	if !ok {
+		return false
+	}
+	if that1 == nil {
+		if this == nil {
+			return true
+		}
+		return false
+	} else if this == nil {
+		return false
+	}
+	if this.Index != nil && that1.Index != nil {
+		if *this.Index != *that1.Index {
+			return false
+		}
+	} else if this.Index != nil {
+		return false
+	} else if that1.Index != nil {
+		return false
+	}
+	if this.Term != nil && that1.Term != nil {
+		if *this.Term != *that1.Term {
+			return false
+		}
+	} else if this.Term != nil {
+		return false
+	} else if that1.Term != nil {
+		return false
+	}
+	if this.CommandName != nil && that1.CommandName != nil {
+		if *this.CommandName != *that1.CommandName {
+			return false
+		}
+	} else if this.CommandName != nil {
+		return false
+	} else if that1.CommandName != nil {
+		return false
+	}
+	if !bytes.Equal(this.Command, that1.Command) {
+		return false
+	}
+	if !bytes.Equal(this.XXX_unrecognized, that1.XXX_unrecognized) {
+		return false
+	}
+	return true
+}

+ 22 - 0
migrate/etcd4pb/log_entry.proto

@@ -0,0 +1,22 @@
+package protobuf;
+
+import "code.google.com/p/gogoprotobuf/gogoproto/gogo.proto";
+
+option (gogoproto.gostring_all) = true;
+option (gogoproto.equal_all) = true;
+option (gogoproto.verbose_equal_all) = true;
+option (gogoproto.goproto_stringer_all) = false;
+option (gogoproto.stringer_all) =  true;
+option (gogoproto.populate_all) = true;
+option (gogoproto.testgen_all) = true;
+option (gogoproto.benchgen_all) = true;
+option (gogoproto.marshaler_all) = true;
+option (gogoproto.sizer_all) = true;
+option (gogoproto.unmarshaler_all) = true;
+
+message LogEntry {
+	required uint64 Index=1;
+	required uint64 Term=2;
+	required string CommandName=3;
+	optional bytes Command=4; // for nop-command
+}

BIN
migrate/fixtures/cmdlog


+ 475 - 0
migrate/log.go

@@ -0,0 +1,475 @@
+package migrate
+
+import (
+	"bytes"
+	"encoding/json"
+	"fmt"
+	"io"
+	"log"
+	"os"
+	"time"
+
+	"github.com/coreos/etcd/etcdserver"
+	etcdserverpb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	etcd4pb "github.com/coreos/etcd/migrate/etcd4pb"
+	"github.com/coreos/etcd/pkg/types"
+	raftpb "github.com/coreos/etcd/raft/raftpb"
+	"github.com/coreos/etcd/store"
+)
+
+func DecodeLog4FromFile(logpath string) ([]*etcd4pb.LogEntry, error) {
+	file, err := os.OpenFile(logpath, os.O_RDWR, 0600)
+	if err != nil {
+		return nil, err
+	}
+	defer file.Close()
+
+	return DecodeLog4(file)
+}
+
+func DecodeLog4(file *os.File) ([]*etcd4pb.LogEntry, error) {
+	var readBytes int64
+	entries := make([]*etcd4pb.LogEntry, 0)
+
+	for {
+		entry, n, err := DecodeNextEntry4(file)
+		if err != nil {
+			if err == io.EOF {
+				break
+			}
+			return nil, fmt.Errorf("failed decoding next log entry: ", err)
+		}
+
+		if entry != nil {
+			entries = append(entries, entry)
+		}
+
+		readBytes += int64(n)
+	}
+
+	return entries, nil
+}
+
+// DecodeNextEntry4 unmarshals a v0.4 log entry from a reader. Returns the
+// number of bytes read and any error that occurs.
+func DecodeNextEntry4(r io.Reader) (*etcd4pb.LogEntry, int, error) {
+	var length int
+	_, err := fmt.Fscanf(r, "%8x\n", &length)
+	if err != nil {
+		return nil, -1, err
+	}
+
+	data := make([]byte, length)
+	if _, err = io.ReadFull(r, data); err != nil {
+		return nil, -1, err
+	}
+
+	ent4 := new(etcd4pb.LogEntry)
+	if err = ent4.Unmarshal(data); err != nil {
+		return nil, -1, err
+	}
+
+	// add width of scanner token to length
+	length = length + 8 + 1
+
+	return ent4, length, nil
+}
+
+func hashName(name string) int64 {
+	var sum int64
+	for _, ch := range name {
+		sum = 131*sum + int64(ch)
+	}
+	return sum
+}
+
+type Command4 interface {
+	Type5() raftpb.EntryType
+	Data5() ([]byte, error)
+}
+
+func NewCommand4(name string, data []byte) (Command4, error) {
+	var cmd Command4
+
+	switch name {
+	case "etcd:remove":
+		cmd = &RemoveCommand{}
+	case "etcd:join":
+		cmd = &JoinCommand{}
+	case "etcd:setClusterConfig":
+		//TODO(bcwaldon): can this safely be discarded?
+		cmd = &NOPCommand{}
+	case "etcd:compareAndDelete":
+		cmd = &CompareAndDeleteCommand{}
+	case "etcd:compareAndSwap":
+		cmd = &CompareAndSwapCommand{}
+	case "etcd:create":
+		cmd = &CreateCommand{}
+	case "etcd:delete":
+		cmd = &DeleteCommand{}
+	case "etcd:set":
+		cmd = &SetCommand{}
+	case "etcd:sync":
+		cmd = &SyncCommand{}
+	case "etcd:update":
+		cmd = &UpdateCommand{}
+	case "raft:join":
+		cmd = &DefaultJoinCommand{}
+	case "raft:leave":
+		cmd = &DefaultLeaveCommand{}
+	case "raft:nop":
+		cmd = &NOPCommand{}
+	default:
+		return nil, fmt.Errorf("unregistered command type %s", name)
+	}
+
+	// If data for the command was passed in the decode it.
+	if data != nil {
+		if err := json.NewDecoder(bytes.NewReader(data)).Decode(cmd); err != nil {
+			return nil, fmt.Errorf("unable to decode bytes %q: %v", data, err)
+		}
+	}
+
+	return cmd, nil
+}
+
+type RemoveCommand struct {
+	Name string `json:"name"`
+}
+
+func (c *RemoveCommand) Type5() raftpb.EntryType {
+	return raftpb.EntryNormal
+}
+
+func (c *RemoveCommand) Data5() ([]byte, error) {
+	m := etcdserver.Member{
+		ID: hashName(c.Name),
+	}
+
+	req5 := &etcdserverpb.Request{
+		Method: "DELETE",
+		Path:   m.StoreKey(),
+	}
+
+	return req5.Marshal()
+}
+
+type JoinCommand struct {
+	Name    string `json:"name"`
+	RaftURL string `json:"raftURL"`
+	EtcdURL string `json:"etcdURL"`
+
+	//TODO(bcwaldon): Should these be converted?
+	//MinVersion int `json:"minVersion"`
+	//MaxVersion int `json:"maxVersion"`
+}
+
+func (c *JoinCommand) Type5() raftpb.EntryType {
+	return raftpb.EntryNormal
+}
+
+func (c *JoinCommand) Data5() ([]byte, error) {
+	pURLs, err := types.NewURLs([]string{c.RaftURL})
+	if err != nil {
+		return nil, err
+	}
+
+	m := etcdserver.GenerateMember(c.Name, pURLs, nil)
+
+	//TODO(bcwaldon): why doesn't this go through GenerateMember?
+	m.ClientURLs = []string{c.EtcdURL}
+
+	b, err := json.Marshal(*m)
+	if err != nil {
+		return nil, err
+	}
+
+	req5 := &etcdserverpb.Request{
+		Method: "PUT",
+		Path:   m.StoreKey(),
+		Val:    string(b),
+
+		// TODO(bcwaldon): Is this correct?
+		Time: store.Permanent.Unix(),
+
+		//TODO(bcwaldon): What is the new equivalent of Unique?
+		//Unique: c.Unique,
+	}
+
+	return req5.Marshal()
+
+}
+
+type SetClusterConfigCommand struct {
+	Config *struct {
+		ActiveSize   int     `json:"activeSize"`
+		RemoveDelay  float64 `json:"removeDelay"`
+		SyncInterval float64 `json:"syncInterval"`
+	} `json:"config"`
+}
+
+func (c *SetClusterConfigCommand) Type5() raftpb.EntryType {
+	return raftpb.EntryNormal
+}
+
+func (c *SetClusterConfigCommand) Data5() ([]byte, error) {
+	b, err := json.Marshal(c.Config)
+	if err != nil {
+		return nil, err
+	}
+
+	req5 := &etcdserverpb.Request{
+		Method: "PUT",
+		Path:   "/v2/admin/config",
+		Dir:    false,
+		Val:    string(b),
+
+		// TODO(bcwaldon): Is this correct?
+		Time: store.Permanent.Unix(),
+	}
+
+	return req5.Marshal()
+}
+
+type CompareAndDeleteCommand struct {
+	Key       string `json:"key"`
+	PrevValue string `json:"prevValue"`
+	PrevIndex uint64 `json:"prevIndex"`
+}
+
+func (c *CompareAndDeleteCommand) Type5() raftpb.EntryType {
+	return raftpb.EntryNormal
+}
+
+func (c *CompareAndDeleteCommand) Data5() ([]byte, error) {
+	req5 := &etcdserverpb.Request{
+		Method:    "DELETE",
+		Path:      c.Key,
+		PrevValue: c.PrevValue,
+		PrevIndex: c.PrevIndex,
+	}
+	return req5.Marshal()
+}
+
+type CompareAndSwapCommand struct {
+	Key        string    `json:"key"`
+	Value      string    `json:"value"`
+	ExpireTime time.Time `json:"expireTime"`
+	PrevValue  string    `json:"prevValue"`
+	PrevIndex  uint64    `json:"prevIndex"`
+}
+
+func (c *CompareAndSwapCommand) Type5() raftpb.EntryType {
+	return raftpb.EntryNormal
+}
+
+func (c *CompareAndSwapCommand) Data5() ([]byte, error) {
+	req5 := &etcdserverpb.Request{
+		Method:    "PUT",
+		Path:      c.Key,
+		Val:       c.Value,
+		PrevValue: c.PrevValue,
+		PrevIndex: c.PrevIndex,
+		Time:      c.ExpireTime.Unix(),
+	}
+	return req5.Marshal()
+}
+
+type CreateCommand struct {
+	Key        string    `json:"key"`
+	Value      string    `json:"value"`
+	ExpireTime time.Time `json:"expireTime"`
+	Unique     bool      `json:"unique"`
+	Dir        bool      `json:"dir"`
+}
+
+func (c *CreateCommand) Type5() raftpb.EntryType {
+	return raftpb.EntryNormal
+}
+
+func (c *CreateCommand) Data5() ([]byte, error) {
+	req5 := &etcdserverpb.Request{
+		Method: "PUT",
+		Path:   c.Key,
+		Dir:    c.Dir,
+		Val:    c.Value,
+
+		// TODO(bcwaldon): Is this correct?
+		Time: c.ExpireTime.Unix(),
+
+		//TODO(bcwaldon): What is the new equivalent of Unique?
+		//Unique: c.Unique,
+	}
+	return req5.Marshal()
+}
+
+type DeleteCommand struct {
+	Key       string `json:"key"`
+	Recursive bool   `json:"recursive"`
+	Dir       bool   `json:"dir"`
+}
+
+func (c *DeleteCommand) Type5() raftpb.EntryType {
+	return raftpb.EntryNormal
+}
+
+func (c *DeleteCommand) Data5() ([]byte, error) {
+	req5 := &etcdserverpb.Request{
+		Method:    "DELETE",
+		Path:      c.Key,
+		Dir:       c.Dir,
+		Recursive: c.Recursive,
+	}
+	return req5.Marshal()
+}
+
+type SetCommand struct {
+	Key        string    `json:"key"`
+	Value      string    `json:"value"`
+	ExpireTime time.Time `json:"expireTime"`
+	Dir        bool      `json:"dir"`
+}
+
+func (c *SetCommand) Type5() raftpb.EntryType {
+	return raftpb.EntryNormal
+}
+
+func (c *SetCommand) Data5() ([]byte, error) {
+	req5 := &etcdserverpb.Request{
+		Method: "PUT",
+		Path:   c.Key,
+		Dir:    c.Dir,
+		Val:    c.Value,
+
+		//TODO(bcwaldon): Is this correct?
+		Time: c.ExpireTime.Unix(),
+	}
+	return req5.Marshal()
+}
+
+type UpdateCommand struct {
+	Key        string    `json:"key"`
+	Value      string    `json:"value"`
+	ExpireTime time.Time `json:"expireTime"`
+}
+
+func (c *UpdateCommand) Type5() raftpb.EntryType {
+	return raftpb.EntryNormal
+}
+
+func (c *UpdateCommand) Data5() ([]byte, error) {
+	req5 := &etcdserverpb.Request{
+		Method: "PUT",
+		Path:   c.Key,
+		Val:    c.Value,
+
+		//TODO(bcwaldon): Is this correct?
+		Time: c.ExpireTime.Unix(),
+	}
+	return req5.Marshal()
+}
+
+type SyncCommand struct {
+	Time time.Time `json:"time"`
+}
+
+func (c *SyncCommand) Type5() raftpb.EntryType {
+	return raftpb.EntryNormal
+}
+
+func (c *SyncCommand) Data5() ([]byte, error) {
+	req5 := &etcdserverpb.Request{
+		Method: "SYNC",
+		//TODO(bcwaldon): Is this correct?
+		Time: c.Time.UnixNano(),
+	}
+	return req5.Marshal()
+}
+
+type DefaultJoinCommand struct {
+	//TODO(bcwaldon): implement Type5, Data5
+	Command4
+
+	Name             string `json:"name"`
+	ConnectionString string `json:"connectionString"`
+}
+
+type DefaultLeaveCommand struct {
+	//TODO(bcwaldon): implement Type5, Data5
+	Command4
+
+	Name string `json:"name"`
+}
+
+//TODO(bcwaldon): Why is CommandName here?
+func (c *DefaultLeaveCommand) CommandName() string {
+	return "raft:leave"
+}
+
+type NOPCommand struct{}
+
+//TODO(bcwaldon): Why is CommandName here?
+func (c NOPCommand) CommandName() string {
+	return "raft:nop"
+}
+
+func (c *NOPCommand) Type5() raftpb.EntryType {
+	return raftpb.EntryNormal
+}
+
+func (c *NOPCommand) Data5() ([]byte, error) {
+	return nil, nil
+}
+
+func Entries4To5(commitIndex uint64, ents4 []*etcd4pb.LogEntry) ([]raftpb.Entry, error) {
+	ents4Len := len(ents4)
+
+	if ents4Len == 0 {
+		return nil, nil
+	}
+
+	startIndex := ents4[0].GetIndex()
+	for i, e := range ents4[1:] {
+		eIndex := e.GetIndex()
+		// ensure indexes are monotonically increasing
+		wantIndex := startIndex + uint64(i+1)
+		if wantIndex != eIndex {
+			return nil, fmt.Errorf("skipped log index %d", wantIndex)
+		}
+	}
+
+	ents5 := make([]raftpb.Entry, 0)
+	for i, e := range ents4 {
+		ent, err := toEntry5(e)
+		if err != nil {
+			log.Printf("Ignoring invalid log data in entry %d: %v", i, err)
+		} else {
+			ents5 = append(ents5, *ent)
+		}
+	}
+
+	return ents5, nil
+}
+
+func toEntry5(ent4 *etcd4pb.LogEntry) (*raftpb.Entry, error) {
+	cmd4, err := NewCommand4(ent4.GetCommandName(), ent4.GetCommand())
+	if err != nil {
+		return nil, err
+	}
+
+	data, err := cmd4.Data5()
+	if err != nil {
+		return nil, err
+	}
+
+	ent5 := raftpb.Entry{
+		Term:  int64(ent4.GetTerm()),
+		Index: int64(ent4.GetIndex()),
+		Type:  cmd4.Type5(),
+		Data:  data,
+	}
+
+	log.Printf("%d: %s -> %s", ent5.Index, ent4.GetCommandName(), ent5.Type)
+
+	return &ent5, nil
+}

+ 42 - 0
migrate/log_test.go

@@ -0,0 +1,42 @@
+package migrate
+
+import (
+	"reflect"
+	"testing"
+	"time"
+)
+
+func TestNewCommand(t *testing.T) {
+	entries, err := ReadLogFile("fixtures/cmdlog")
+	if err != nil {
+		t.Errorf("read log file error: %v", err)
+	}
+
+	tests := []interface{}{
+		&JoinCommand{2, 2, "1.local", "http://127.0.0.1:7001", "http://127.0.0.1:4001"},
+		&SetClusterConfigCommand{&ClusterConfig{9, 1800.0, 5.0}},
+		&NOPCommand{},
+		&RemoveCommand{"alice"},
+		&CompareAndDeleteCommand{"foo", "baz", 9},
+		&CompareAndSwapCommand{"foo", "bar", time.Unix(0, 0), "baz", 9},
+		&CreateCommand{"foo", "bar", time.Unix(0, 0), true, true},
+		&DeleteCommand{"foo", true, true},
+		&SetCommand{"foo", "bar", time.Unix(0, 0), true},
+		&SyncCommand{time.Unix(0, 0)},
+		&UpdateCommand{"foo", "bar", time.Unix(0, 0)},
+		&DefaultLeaveCommand{"alice"},
+		&DefaultJoinCommand{"alice", ""},
+	}
+
+	for i, e := range entries {
+		cmd, err := NewCommand(e.GetCommandName(), e.GetCommand())
+		if err != nil {
+			t.Errorf("#%d: %v", i, err)
+			continue
+		}
+
+		if !reflect.DeepEqual(cmd, tests[i]) {
+			t.Errorf("#%d: cmd = %+v, want %+v", i, cmd, tests[i])
+		}
+	}
+}

+ 187 - 0
migrate/snapshot.go

@@ -0,0 +1,187 @@
+package migrate
+
+import (
+	"encoding/json"
+	"errors"
+	"fmt"
+	"hash/crc32"
+	"io/ioutil"
+	"log"
+	"os"
+	"path"
+	"sort"
+	"strconv"
+	"strings"
+
+	raftpb "github.com/coreos/etcd/raft/raftpb"
+)
+
+type Snapshot4 struct {
+	State     []byte `json:"state"`
+	LastIndex uint64 `json:"lastIndex"`
+	LastTerm  uint64 `json:"lastTerm"`
+
+	Peers []struct {
+		Name             string `json:"name"`
+		ConnectionString string `json:"connectionString"`
+	} `json:"peers"`
+
+	//TODO(bcwaldon): is this needed?
+	//Path  string `json:"path"`
+}
+
+func (s *Snapshot4) Snapshot5() *raftpb.Snapshot {
+	snap5 := raftpb.Snapshot{
+		Data:  s.State,
+		Index: int64(s.LastIndex),
+		Term:  int64(s.LastTerm),
+		Nodes: make([]int64, len(s.Peers)),
+	}
+
+	for i, p := range s.Peers {
+		snap5.Nodes[i] = hashName(p.Name)
+	}
+
+	return &snap5
+}
+
+func DecodeLatestSnapshot4FromDir(snapdir string) (*Snapshot4, error) {
+	fname, err := FindLatestFile(snapdir)
+	if err != nil {
+		return nil, err
+	}
+
+	if fname == "" {
+		return nil, nil
+	}
+
+	snappath := path.Join(snapdir, fname)
+	log.Printf("Decoding snapshot from %s", snappath)
+
+	return DecodeSnapshot4FromFile(snappath)
+}
+
+// FindLatestFile identifies the "latest" filename in a given directory
+// by sorting all the files and choosing the highest value.
+func FindLatestFile(dirpath string) (string, error) {
+	dir, err := os.OpenFile(dirpath, os.O_RDONLY, 0)
+	if err != nil {
+		if os.IsNotExist(err) {
+			err = nil
+		}
+		return "", err
+	}
+	defer dir.Close()
+
+	fnames, err := dir.Readdirnames(-1)
+	if err != nil {
+		return "", err
+	}
+
+	if len(fnames) == 0 {
+		return "", nil
+	}
+
+	names, err := NewSnapshotFileNames(fnames)
+	if err != nil {
+		return "", err
+	}
+
+	return names[len(names)-1].FileName, nil
+}
+
+func DecodeSnapshot4FromFile(path string) (*Snapshot4, error) {
+	// Read snapshot data.
+	f, err := os.OpenFile(path, os.O_RDONLY, 0)
+	if err != nil {
+		return nil, err
+	}
+	defer f.Close()
+
+	return DecodeSnapshot4(f)
+}
+
+func DecodeSnapshot4(f *os.File) (*Snapshot4, error) {
+	// Verify checksum
+	var checksum uint32
+	n, err := fmt.Fscanf(f, "%08x\n", &checksum)
+	if err != nil {
+		return nil, err
+	} else if n != 1 {
+		return nil, errors.New("miss heading checksum")
+	}
+
+	// Load remaining snapshot contents.
+	b, err := ioutil.ReadAll(f)
+	if err != nil {
+		return nil, err
+	}
+
+	// Generate checksum.
+	byteChecksum := crc32.ChecksumIEEE(b)
+	if uint32(checksum) != byteChecksum {
+		return nil, errors.New("bad checksum")
+	}
+
+	// Decode snapshot.
+	snapshot := new(Snapshot4)
+	if err = json.Unmarshal(b, snapshot); err != nil {
+		return nil, err
+	}
+	return snapshot, nil
+}
+
+func NewSnapshotFileNames(names []string) ([]SnapshotFileName, error) {
+	s := make([]SnapshotFileName, 0)
+	for _, n := range names {
+		trimmed := strings.TrimSuffix(n, ".ss")
+		if trimmed == n {
+			return nil, fmt.Errorf("file %q does not have .ss extension", n)
+		}
+
+		parts := strings.SplitN(trimmed, "_", 2)
+		if len(parts) != 2 {
+			return nil, fmt.Errorf("unrecognized file name format %q", n)
+		}
+
+		fn := SnapshotFileName{FileName: n}
+
+		var err error
+		fn.Term, err = strconv.ParseUint(parts[0], 10, 64)
+		if err != nil {
+			return nil, fmt.Errorf("unable to parse term from filename %q: %v", err)
+		}
+
+		fn.Index, err = strconv.ParseUint(parts[1], 10, 64)
+		if err != nil {
+			return nil, fmt.Errorf("unable to parse index from filename %q: %v", err)
+		}
+
+		s = append(s, fn)
+	}
+
+	sortable := SnapshotFileNames(s)
+	sort.Sort(&sortable)
+	return s, nil
+}
+
+type SnapshotFileNames []SnapshotFileName
+type SnapshotFileName struct {
+	FileName string
+	Term     uint64
+	Index    uint64
+}
+
+func (n *SnapshotFileNames) Less(i, j int) bool {
+	iTerm, iIndex := (*n)[i].Term, (*n)[i].Index
+	jTerm, jIndex := (*n)[j].Term, (*n)[j].Index
+	return iTerm < jTerm || (iTerm == jTerm && iIndex < jIndex)
+}
+
+func (n *SnapshotFileNames) Swap(i, j int) {
+	(*n)[i], (*n)[j] = (*n)[j], (*n)[i]
+}
+
+func (n *SnapshotFileNames) Len() int {
+	return len([]SnapshotFileName(*n))
+}