Browse Source

Merge pull request #5391 from xiang90/migrate

etcdctl: add migrate command
Xiang Li 9 years ago
parent
commit
edb11881f8
6 changed files with 447 additions and 28 deletions
  1. 350 0
      etcdctl/ctlv3/command/migrate_command.go
  2. 1 0
      etcdctl/ctlv3/ctl.go
  3. 26 15
      etcdserver/apply_v2.go
  4. 2 2
      etcdserver/server.go
  5. 11 11
      etcdserver/server_test.go
  6. 57 0
      mvcc/util.go

+ 350 - 0
etcdctl/ctlv3/command/migrate_command.go

@@ -0,0 +1,350 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package command
+
+import (
+	"encoding/binary"
+	"encoding/json"
+	"fmt"
+	"io"
+	"os"
+	"os/exec"
+	"path"
+	"time"
+
+	"github.com/coreos/etcd/client"
+	"github.com/coreos/etcd/etcdserver"
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	"github.com/coreos/etcd/mvcc"
+	"github.com/coreos/etcd/mvcc/backend"
+	"github.com/coreos/etcd/mvcc/mvccpb"
+	"github.com/coreos/etcd/pkg/pbutil"
+	"github.com/coreos/etcd/raft/raftpb"
+	"github.com/coreos/etcd/snap"
+	"github.com/coreos/etcd/store"
+	"github.com/coreos/etcd/wal"
+	"github.com/coreos/etcd/wal/walpb"
+	"github.com/gogo/protobuf/proto"
+	"github.com/spf13/cobra"
+)
+
+var (
+	migrateDatadir     string
+	migrateWALdir      string
+	migrateTransformer string
+)
+
+// NewMigrateCommand returns the cobra command for "migrate".
+func NewMigrateCommand() *cobra.Command {
+	mc := &cobra.Command{
+		Use:   "migrate",
+		Short: "migrate",
+		Run:   migrateCommandFunc,
+	}
+
+	mc.Flags().StringVar(&migrateDatadir, "data-dir", "", "Path to the data directory.")
+	mc.Flags().StringVar(&migrateWALdir, "wal-dir", "", "Path to the WAL directory.")
+	mc.Flags().StringVar(&migrateTransformer, "transformer", "", "Path to the user-provided transformer program.")
+	return mc
+}
+
+func migrateCommandFunc(cmd *cobra.Command, args []string) {
+	var (
+		writer io.WriteCloser
+		reader io.ReadCloser
+		errc   chan error
+	)
+	if migrateTransformer != "" {
+		writer, reader, errc = startTransformer()
+	} else {
+		fmt.Println("using default transformer")
+		writer, reader, errc = defaultTransformer()
+	}
+
+	st := rebuildStoreV2()
+	be := prepareBackend()
+	defer be.Close()
+
+	maxIndexc := make(chan uint64, 1)
+	go func() {
+		maxIndexc <- writeStore(writer, st)
+		writer.Close()
+	}()
+
+	readKeys(reader, be)
+	mvcc.UpdateConsistentIndex(be, <-maxIndexc)
+	err := <-errc
+	if err != nil {
+		fmt.Println("failed to transform keys")
+		ExitWithError(ExitError, err)
+	}
+
+	fmt.Println("finished transforming keys")
+}
+
+func prepareBackend() backend.Backend {
+	dbpath := path.Join(migrateDatadir, "member", "snap", "db")
+	be := backend.New(dbpath, time.Second, 10000)
+	tx := be.BatchTx()
+	tx.Lock()
+	tx.UnsafeCreateBucket([]byte("key"))
+	tx.UnsafeCreateBucket([]byte("meta"))
+	tx.Unlock()
+	return be
+}
+
+func rebuildStoreV2() store.Store {
+	waldir := migrateWALdir
+	if len(waldir) == 0 {
+		waldir = path.Join(migrateDatadir, "member", "wal")
+	}
+	snapdir := path.Join(migrateDatadir, "member", "snap")
+
+	ss := snap.New(snapdir)
+	snapshot, err := ss.Load()
+	if err != nil && err != snap.ErrNoSnapshot {
+		ExitWithError(ExitError, err)
+	}
+
+	var walsnap walpb.Snapshot
+	if snapshot != nil {
+		walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
+	}
+
+	w, err := wal.OpenForRead(waldir, walsnap)
+	if err != nil {
+		ExitWithError(ExitError, err)
+	}
+	defer w.Close()
+
+	_, _, ents, err := w.ReadAll()
+	if err != nil {
+		ExitWithError(ExitError, err)
+	}
+
+	st := store.New()
+	if snapshot != nil {
+		err := st.Recovery(snapshot.Data)
+		if err != nil {
+			ExitWithError(ExitError, err)
+		}
+	}
+
+	applier := etcdserver.NewApplierV2(st, nil)
+	for _, ent := range ents {
+		if ent.Type != raftpb.EntryNormal {
+			continue
+		}
+
+		var raftReq pb.InternalRaftRequest
+		if !pbutil.MaybeUnmarshal(&raftReq, ent.Data) { // backward compatible
+			var r pb.Request
+			pbutil.MustUnmarshal(&r, ent.Data)
+			applyRequest(&r, applier)
+		} else {
+			if raftReq.V2 != nil {
+				req := raftReq.V2
+				applyRequest(req, applier)
+			}
+		}
+	}
+
+	return st
+}
+
+func applyRequest(r *pb.Request, applyV2 etcdserver.ApplierV2) {
+	toTTLOptions(r)
+	switch r.Method {
+	case "POST":
+		applyV2.Post(r)
+	case "PUT":
+		applyV2.Put(r)
+	case "DELETE":
+		applyV2.Delete(r)
+	case "QGET":
+		applyV2.QGet(r)
+	case "SYNC":
+		applyV2.Sync(r)
+	default:
+		panic("unknown command")
+	}
+}
+
+func toTTLOptions(r *pb.Request) store.TTLOptionSet {
+	refresh, _ := pbutil.GetBool(r.Refresh)
+	ttlOptions := store.TTLOptionSet{Refresh: refresh}
+	if r.Expiration != 0 {
+		ttlOptions.ExpireTime = time.Unix(0, r.Expiration)
+	}
+	return ttlOptions
+}
+
+func writeStore(w io.Writer, st store.Store) uint64 {
+	all, err := st.Get("/1", true, true)
+	if err != nil {
+		ExitWithError(ExitError, err)
+	}
+	return writeKeys(w, all.Node)
+}
+
+func writeKeys(w io.Writer, n *store.NodeExtern) uint64 {
+	maxIndex := n.ModifiedIndex
+
+	nodes := n.Nodes
+	// remove store v2 bucket prefix
+	n.Key = n.Key[2:]
+	if n.Key == "" {
+		n.Key = "/"
+	}
+	if n.Dir {
+		n.Nodes = nil
+	}
+	b, err := json.Marshal(n)
+	if err != nil {
+		ExitWithError(ExitError, err)
+	}
+	fmt.Fprintf(w, string(b))
+	for _, nn := range nodes {
+		max := writeKeys(w, nn)
+		if max > maxIndex {
+			maxIndex = max
+		}
+	}
+	return maxIndex
+}
+
+func readKeys(r io.Reader, be backend.Backend) error {
+	for {
+		length64, err := readInt64(r)
+		if err != nil {
+			if err == io.EOF {
+				return nil
+			}
+			return err
+		}
+
+		buf := make([]byte, int(length64))
+		if _, err = io.ReadFull(r, buf); err != nil {
+			return err
+		}
+
+		var kv mvccpb.KeyValue
+		err = proto.Unmarshal(buf, &kv)
+		if err != nil {
+			return err
+		}
+
+		mvcc.WriteKV(be, kv)
+	}
+}
+
+func readInt64(r io.Reader) (int64, error) {
+	var n int64
+	err := binary.Read(r, binary.LittleEndian, &n)
+	return n, err
+}
+
+func startTransformer() (io.WriteCloser, io.ReadCloser, chan error) {
+	cmd := exec.Command(migrateTransformer)
+	cmd.Stderr = os.Stderr
+
+	writer, err := cmd.StdinPipe()
+	if err != nil {
+		ExitWithError(ExitError, err)
+	}
+
+	reader, rerr := cmd.StdoutPipe()
+	if rerr != nil {
+		ExitWithError(ExitError, rerr)
+	}
+
+	if err := cmd.Start(); err != nil {
+		ExitWithError(ExitError, err)
+	}
+
+	errc := make(chan error, 1)
+
+	go func() {
+		errc <- cmd.Wait()
+	}()
+
+	return writer, reader, errc
+}
+
+func defaultTransformer() (io.WriteCloser, io.ReadCloser, chan error) {
+	// transformer decodes v2 keys from sr
+	sr, sw := io.Pipe()
+	// transformer encodes v3 keys into dw
+	dr, dw := io.Pipe()
+
+	decoder := json.NewDecoder(sr)
+
+	errc := make(chan error, 1)
+
+	go func() {
+		defer func() {
+			sr.Close()
+			dw.Close()
+		}()
+
+		for decoder.More() {
+			node := &client.Node{}
+			if err := decoder.Decode(node); err != nil {
+				errc <- err
+				return
+			}
+
+			kv := transform(node)
+			if kv == nil {
+				continue
+			}
+
+			data, err := proto.Marshal(kv)
+			if err != nil {
+				errc <- err
+				return
+			}
+			buf := make([]byte, 8)
+			binary.LittleEndian.PutUint64(buf, uint64(len(data)))
+			if _, err := dw.Write(buf); err != nil {
+				errc <- err
+				return
+			}
+			if _, err := dw.Write(data); err != nil {
+				errc <- err
+				return
+			}
+		}
+
+		errc <- nil
+	}()
+
+	return sw, dr, errc
+}
+
+func transform(n *client.Node) *mvccpb.KeyValue {
+	const unKnownVersion = 1
+	if n.Dir {
+		return nil
+	}
+	kv := &mvccpb.KeyValue{
+		Key:            []byte(n.Key),
+		Value:          []byte(n.Value),
+		CreateRevision: int64(n.CreatedIndex),
+		ModRevision:    int64(n.ModifiedIndex),
+		Version:        unKnownVersion,
+	}
+	return kv
+}

+ 1 - 0
etcdctl/ctlv3/ctl.go

@@ -74,6 +74,7 @@ func init() {
 		command.NewMemberCommand(),
 		command.NewSnapshotCommand(),
 		command.NewMakeMirrorCommand(),
+		command.NewMigrateCommand(),
 		command.NewLockCommand(),
 		command.NewAuthCommand(),
 		command.NewElectCommand(),

+ 26 - 15
etcdserver/apply_v2.go

@@ -26,8 +26,8 @@ import (
 	"github.com/coreos/go-semver/semver"
 )
 
-// applierV2 is the interface for processing V2 raft messages
-type applierV2 interface {
+// ApplierV2 is the interface for processing V2 raft messages
+type ApplierV2 interface {
 	Delete(r *pb.Request) Response
 	Post(r *pb.Request) Response
 	Put(r *pb.Request) Response
@@ -35,19 +35,26 @@ type applierV2 interface {
 	Sync(r *pb.Request) Response
 }
 
-type applierV2store struct{ s *EtcdServer }
+func NewApplierV2(s store.Store, c *membership.RaftCluster) ApplierV2 {
+	return &applierV2store{store: s, cluster: c}
+}
+
+type applierV2store struct {
+	store   store.Store
+	cluster *membership.RaftCluster
+}
 
 func (a *applierV2store) Delete(r *pb.Request) Response {
 	switch {
 	case r.PrevIndex > 0 || r.PrevValue != "":
-		return toResponse(a.s.store.CompareAndDelete(r.Path, r.PrevValue, r.PrevIndex))
+		return toResponse(a.store.CompareAndDelete(r.Path, r.PrevValue, r.PrevIndex))
 	default:
-		return toResponse(a.s.store.Delete(r.Path, r.Dir, r.Recursive))
+		return toResponse(a.store.Delete(r.Path, r.Dir, r.Recursive))
 	}
 }
 
 func (a *applierV2store) Post(r *pb.Request) Response {
-	return toResponse(a.s.store.Create(r.Path, r.Dir, r.Val, true, toTTLOptions(r)))
+	return toResponse(a.store.Create(r.Path, r.Dir, r.Val, true, toTTLOptions(r)))
 }
 
 func (a *applierV2store) Put(r *pb.Request) Response {
@@ -57,14 +64,14 @@ func (a *applierV2store) Put(r *pb.Request) Response {
 	case existsSet:
 		if exists {
 			if r.PrevIndex == 0 && r.PrevValue == "" {
-				return toResponse(a.s.store.Update(r.Path, r.Val, ttlOptions))
+				return toResponse(a.store.Update(r.Path, r.Val, ttlOptions))
 			} else {
-				return toResponse(a.s.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions))
+				return toResponse(a.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions))
 			}
 		}
-		return toResponse(a.s.store.Create(r.Path, r.Dir, r.Val, false, ttlOptions))
+		return toResponse(a.store.Create(r.Path, r.Dir, r.Val, false, ttlOptions))
 	case r.PrevIndex > 0 || r.PrevValue != "":
-		return toResponse(a.s.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions))
+		return toResponse(a.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions))
 	default:
 		if storeMemberAttributeRegexp.MatchString(r.Path) {
 			id := membership.MustParseMemberIDFromKey(path.Dir(r.Path))
@@ -72,25 +79,29 @@ func (a *applierV2store) Put(r *pb.Request) Response {
 			if err := json.Unmarshal([]byte(r.Val), &attr); err != nil {
 				plog.Panicf("unmarshal %s should never fail: %v", r.Val, err)
 			}
-			a.s.cluster.UpdateAttributes(id, attr)
+			if a.cluster != nil {
+				a.cluster.UpdateAttributes(id, attr)
+			}
 			// return an empty response since there is no consumer.
 			return Response{}
 		}
 		if r.Path == membership.StoreClusterVersionKey() {
-			a.s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)))
+			if a.cluster != nil {
+				a.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)))
+			}
 			// return an empty response since there is no consumer.
 			return Response{}
 		}
-		return toResponse(a.s.store.Set(r.Path, r.Dir, r.Val, ttlOptions))
+		return toResponse(a.store.Set(r.Path, r.Dir, r.Val, ttlOptions))
 	}
 }
 
 func (a *applierV2store) QGet(r *pb.Request) Response {
-	return toResponse(a.s.store.Get(r.Path, r.Recursive, r.Sorted))
+	return toResponse(a.store.Get(r.Path, r.Recursive, r.Sorted))
 }
 
 func (a *applierV2store) Sync(r *pb.Request) Response {
-	a.s.store.DeleteExpiredKeys(time.Unix(0, r.Time))
+	a.store.DeleteExpiredKeys(time.Unix(0, r.Time))
 	return Response{}
 }
 

+ 2 - 2
etcdserver/server.go

@@ -180,7 +180,7 @@ type EtcdServer struct {
 
 	store store.Store
 
-	applyV2 applierV2
+	applyV2 ApplierV2
 
 	applyV3    applierV3
 	kv         mvcc.ConsistentWatchableKV
@@ -391,7 +391,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
 		msgSnapC:      make(chan raftpb.Message, maxInFlightMsgSnap),
 	}
 
-	srv.applyV2 = &applierV2store{srv}
+	srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}
 
 	srv.be = be
 	srv.lessor = lease.NewLessor(srv.be)

+ 11 - 11
etcdserver/server_test.go

@@ -179,7 +179,7 @@ func TestApplyRepeat(t *testing.T) {
 		cluster:  cl,
 		reqIDGen: idutil.NewGenerator(0, time.Time{}),
 	}
-	s.applyV2 = &applierV2store{s}
+	s.applyV2 = &applierV2store{store: s.store, cluster: s.cluster}
 	s.start()
 	req := &pb.Request{Method: "QGET", ID: uint64(1)}
 	ents := []raftpb.Entry{{Index: 1, Data: pbutil.MustMarshal(req)}}
@@ -445,7 +445,7 @@ func TestApplyRequest(t *testing.T) {
 	for i, tt := range tests {
 		st := mockstore.NewRecorder()
 		srv := &EtcdServer{store: st}
-		srv.applyV2 = &applierV2store{srv}
+		srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}
 		resp := srv.applyV2Request(&tt.req)
 
 		if !reflect.DeepEqual(resp, tt.wresp) {
@@ -464,7 +464,7 @@ func TestApplyRequestOnAdminMemberAttributes(t *testing.T) {
 		store:   mockstore.NewRecorder(),
 		cluster: cl,
 	}
-	srv.applyV2 = &applierV2store{srv}
+	srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}
 
 	req := pb.Request{
 		Method: "PUT",
@@ -639,7 +639,7 @@ func TestDoProposal(t *testing.T) {
 			store:    st,
 			reqIDGen: idutil.NewGenerator(0, time.Time{}),
 		}
-		srv.applyV2 = &applierV2store{srv}
+		srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}
 		srv.start()
 		resp, err := srv.Do(context.Background(), tt)
 		srv.Stop()
@@ -666,7 +666,7 @@ func TestDoProposalCancelled(t *testing.T) {
 		w:        wt,
 		reqIDGen: idutil.NewGenerator(0, time.Time{}),
 	}
-	srv.applyV2 = &applierV2store{srv}
+	srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}
 
 	ctx, cancel := context.WithCancel(context.Background())
 	cancel()
@@ -688,7 +688,7 @@ func TestDoProposalTimeout(t *testing.T) {
 		w:        mockwait.NewNop(),
 		reqIDGen: idutil.NewGenerator(0, time.Time{}),
 	}
-	srv.applyV2 = &applierV2store{srv}
+	srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}
 
 	ctx, _ := context.WithTimeout(context.Background(), 0)
 	_, err := srv.Do(ctx, pb.Request{Method: "PUT"})
@@ -704,7 +704,7 @@ func TestDoProposalStopped(t *testing.T) {
 		w:        mockwait.NewNop(),
 		reqIDGen: idutil.NewGenerator(0, time.Time{}),
 	}
-	srv.applyV2 = &applierV2store{srv}
+	srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}
 
 	srv.done = make(chan struct{})
 	close(srv.done)
@@ -721,7 +721,7 @@ func TestSync(t *testing.T) {
 		r:        raftNode{Node: n},
 		reqIDGen: idutil.NewGenerator(0, time.Time{}),
 	}
-	srv.applyV2 = &applierV2store{srv}
+	srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}
 
 	// check that sync is non-blocking
 	done := make(chan struct{})
@@ -761,7 +761,7 @@ func TestSyncTimeout(t *testing.T) {
 		r:        raftNode{Node: n},
 		reqIDGen: idutil.NewGenerator(0, time.Time{}),
 	}
-	srv.applyV2 = &applierV2store{srv}
+	srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}
 
 	// check that sync is non-blocking
 	done := make(chan struct{})
@@ -900,7 +900,7 @@ func TestTriggerSnap(t *testing.T) {
 		store:    st,
 		reqIDGen: idutil.NewGenerator(0, time.Time{}),
 	}
-	srv.applyV2 = &applierV2store{srv}
+	srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}
 
 	srv.kv = mvcc.New(be, &lease.FakeLessor{}, &srv.consistIndex)
 	srv.be = be
@@ -968,7 +968,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
 		cluster:  cl,
 		msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap),
 	}
-	s.applyV2 = &applierV2store{s}
+	s.applyV2 = &applierV2store{store: s.store, cluster: s.cluster}
 
 	be, tmpPath := backend.NewDefaultTmpBackend()
 	defer func() {

+ 57 - 0
mvcc/util.go

@@ -0,0 +1,57 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mvcc
+
+import (
+	"encoding/binary"
+	"log"
+
+	"github.com/coreos/etcd/mvcc/backend"
+	"github.com/coreos/etcd/mvcc/mvccpb"
+)
+
+func UpdateConsistentIndex(be backend.Backend, index uint64) {
+	tx := be.BatchTx()
+	tx.Lock()
+	defer tx.Unlock()
+
+	var oldi uint64
+	_, vs := tx.UnsafeRange(metaBucketName, consistentIndexKeyName, nil, 0)
+	if len(vs) != 0 {
+		oldi = binary.BigEndian.Uint64(vs[0])
+	}
+
+	if index <= oldi {
+		return
+	}
+
+	bs := make([]byte, 8)
+	binary.BigEndian.PutUint64(bs, index)
+	tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs)
+}
+
+func WriteKV(be backend.Backend, kv mvccpb.KeyValue) {
+	ibytes := newRevBytes()
+	revToBytes(revision{main: kv.ModRevision}, ibytes)
+
+	d, err := kv.Marshal()
+	if err != nil {
+		log.Fatalf("mvcc: cannot marshal event: %v", err)
+	}
+
+	be.BatchTx().Lock()
+	be.BatchTx().UnsafePut(keyBucketName, ibytes, d)
+	be.BatchTx().Unlock()
+}