123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402 |
- // Copyright 2016 The etcd Authors
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package command
- import (
- "encoding/binary"
- "encoding/json"
- "fmt"
- "io"
- "os"
- "os/exec"
- "path/filepath"
- "time"
- "github.com/coreos/etcd/client"
- etcdErr "github.com/coreos/etcd/error"
- "github.com/coreos/etcd/etcdserver"
- "github.com/coreos/etcd/etcdserver/api"
- pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
- "github.com/coreos/etcd/etcdserver/membership"
- "github.com/coreos/etcd/mvcc"
- "github.com/coreos/etcd/mvcc/backend"
- "github.com/coreos/etcd/mvcc/mvccpb"
- "github.com/coreos/etcd/pkg/pbutil"
- "github.com/coreos/etcd/pkg/types"
- "github.com/coreos/etcd/raft/raftpb"
- "github.com/coreos/etcd/snap"
- "github.com/coreos/etcd/store"
- "github.com/coreos/etcd/wal"
- "github.com/coreos/etcd/wal/walpb"
- "github.com/gogo/protobuf/proto"
- "github.com/spf13/cobra"
- )
- var (
- migrateExcludeTTLKey bool
- migrateDatadir string
- migrateWALdir string
- migrateTransformer string
- )
- // NewMigrateCommand returns the cobra command for "migrate".
- func NewMigrateCommand() *cobra.Command {
- mc := &cobra.Command{
- Use: "migrate",
- Short: "Migrates keys in a v2 store to a mvcc store",
- Run: migrateCommandFunc,
- }
- mc.Flags().BoolVar(&migrateExcludeTTLKey, "no-ttl", false, "Do not convert TTL keys")
- mc.Flags().StringVar(&migrateDatadir, "data-dir", "", "Path to the data directory")
- mc.Flags().StringVar(&migrateWALdir, "wal-dir", "", "Path to the WAL directory")
- mc.Flags().StringVar(&migrateTransformer, "transformer", "", "Path to the user-provided transformer program")
- return mc
- }
- func migrateCommandFunc(cmd *cobra.Command, args []string) {
- var (
- writer io.WriteCloser
- reader io.ReadCloser
- errc chan error
- )
- if migrateTransformer != "" {
- writer, reader, errc = startTransformer()
- } else {
- fmt.Println("using default transformer")
- writer, reader, errc = defaultTransformer()
- }
- st, index := rebuildStoreV2()
- be := prepareBackend()
- defer be.Close()
- go func() {
- writeStore(writer, st)
- writer.Close()
- }()
- readKeys(reader, be)
- mvcc.UpdateConsistentIndex(be, index)
- err := <-errc
- if err != nil {
- fmt.Println("failed to transform keys")
- ExitWithError(ExitError, err)
- }
- fmt.Println("finished transforming keys")
- }
- func prepareBackend() backend.Backend {
- var be backend.Backend
- bch := make(chan struct{})
- dbpath := filepath.Join(migrateDatadir, "member", "snap", "db")
- go func() {
- defer close(bch)
- be = backend.NewDefaultBackend(dbpath)
- }()
- select {
- case <-bch:
- case <-time.After(time.Second):
- fmt.Fprintf(os.Stderr, "waiting for etcd to close and release its lock on %q\n", dbpath)
- <-bch
- }
- tx := be.BatchTx()
- tx.Lock()
- tx.UnsafeCreateBucket([]byte("key"))
- tx.UnsafeCreateBucket([]byte("meta"))
- tx.Unlock()
- return be
- }
- func rebuildStoreV2() (store.Store, uint64) {
- var index uint64
- cl := membership.NewCluster("")
- waldir := migrateWALdir
- if len(waldir) == 0 {
- waldir = filepath.Join(migrateDatadir, "member", "wal")
- }
- snapdir := filepath.Join(migrateDatadir, "member", "snap")
- ss := snap.New(snapdir)
- snapshot, err := ss.Load()
- if err != nil && err != snap.ErrNoSnapshot {
- ExitWithError(ExitError, err)
- }
- var walsnap walpb.Snapshot
- if snapshot != nil {
- walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
- index = snapshot.Metadata.Index
- }
- w, err := wal.OpenForRead(waldir, walsnap)
- if err != nil {
- ExitWithError(ExitError, err)
- }
- defer w.Close()
- _, _, ents, err := w.ReadAll()
- if err != nil {
- ExitWithError(ExitError, err)
- }
- st := store.New()
- if snapshot != nil {
- err := st.Recovery(snapshot.Data)
- if err != nil {
- ExitWithError(ExitError, err)
- }
- }
- cl.SetStore(st)
- cl.Recover(api.UpdateCapability)
- applier := etcdserver.NewApplierV2(st, cl)
- for _, ent := range ents {
- if ent.Type == raftpb.EntryConfChange {
- var cc raftpb.ConfChange
- pbutil.MustUnmarshal(&cc, ent.Data)
- applyConf(cc, cl)
- continue
- }
- var raftReq pb.InternalRaftRequest
- if !pbutil.MaybeUnmarshal(&raftReq, ent.Data) { // backward compatible
- var r pb.Request
- pbutil.MustUnmarshal(&r, ent.Data)
- applyRequest(&r, applier)
- } else {
- if raftReq.V2 != nil {
- req := raftReq.V2
- applyRequest(req, applier)
- }
- }
- if ent.Index > index {
- index = ent.Index
- }
- }
- return st, index
- }
- func applyConf(cc raftpb.ConfChange, cl *membership.RaftCluster) {
- if err := cl.ValidateConfigurationChange(cc); err != nil {
- return
- }
- switch cc.Type {
- case raftpb.ConfChangeAddNode:
- m := new(membership.Member)
- if err := json.Unmarshal(cc.Context, m); err != nil {
- panic(err)
- }
- cl.AddMember(m)
- case raftpb.ConfChangeRemoveNode:
- cl.RemoveMember(types.ID(cc.NodeID))
- case raftpb.ConfChangeUpdateNode:
- m := new(membership.Member)
- if err := json.Unmarshal(cc.Context, m); err != nil {
- panic(err)
- }
- cl.UpdateRaftAttributes(m.ID, m.RaftAttributes)
- }
- }
- func applyRequest(req *pb.Request, applyV2 etcdserver.ApplierV2) {
- r := (*etcdserver.RequestV2)(req)
- r.TTLOptions()
- switch r.Method {
- case "POST":
- applyV2.Post(r)
- case "PUT":
- applyV2.Put(r)
- case "DELETE":
- applyV2.Delete(r)
- case "QGET":
- applyV2.QGet(r)
- case "SYNC":
- applyV2.Sync(r)
- default:
- panic("unknown command")
- }
- }
- func writeStore(w io.Writer, st store.Store) uint64 {
- all, err := st.Get("/1", true, true)
- if err != nil {
- if eerr, ok := err.(*etcdErr.Error); ok && eerr.ErrorCode == etcdErr.EcodeKeyNotFound {
- fmt.Println("no v2 keys to migrate")
- os.Exit(0)
- }
- ExitWithError(ExitError, err)
- }
- return writeKeys(w, all.Node)
- }
- func writeKeys(w io.Writer, n *store.NodeExtern) uint64 {
- maxIndex := n.ModifiedIndex
- nodes := n.Nodes
- // remove store v2 bucket prefix
- n.Key = n.Key[2:]
- if n.Key == "" {
- n.Key = "/"
- }
- if n.Dir {
- n.Nodes = nil
- }
- if !migrateExcludeTTLKey || n.TTL == 0 {
- b, err := json.Marshal(n)
- if err != nil {
- ExitWithError(ExitError, err)
- }
- fmt.Fprint(w, string(b))
- }
- for _, nn := range nodes {
- max := writeKeys(w, nn)
- if max > maxIndex {
- maxIndex = max
- }
- }
- return maxIndex
- }
- func readKeys(r io.Reader, be backend.Backend) error {
- for {
- length64, err := readInt64(r)
- if err != nil {
- if err == io.EOF {
- return nil
- }
- return err
- }
- buf := make([]byte, int(length64))
- if _, err = io.ReadFull(r, buf); err != nil {
- return err
- }
- var kv mvccpb.KeyValue
- err = proto.Unmarshal(buf, &kv)
- if err != nil {
- return err
- }
- mvcc.WriteKV(be, kv)
- }
- }
- func readInt64(r io.Reader) (int64, error) {
- var n int64
- err := binary.Read(r, binary.LittleEndian, &n)
- return n, err
- }
- func startTransformer() (io.WriteCloser, io.ReadCloser, chan error) {
- cmd := exec.Command(migrateTransformer)
- cmd.Stderr = os.Stderr
- writer, err := cmd.StdinPipe()
- if err != nil {
- ExitWithError(ExitError, err)
- }
- reader, rerr := cmd.StdoutPipe()
- if rerr != nil {
- ExitWithError(ExitError, rerr)
- }
- if err := cmd.Start(); err != nil {
- ExitWithError(ExitError, err)
- }
- errc := make(chan error, 1)
- go func() {
- errc <- cmd.Wait()
- }()
- return writer, reader, errc
- }
- func defaultTransformer() (io.WriteCloser, io.ReadCloser, chan error) {
- // transformer decodes v2 keys from sr
- sr, sw := io.Pipe()
- // transformer encodes v3 keys into dw
- dr, dw := io.Pipe()
- decoder := json.NewDecoder(sr)
- errc := make(chan error, 1)
- go func() {
- defer func() {
- sr.Close()
- dw.Close()
- }()
- for decoder.More() {
- node := &client.Node{}
- if err := decoder.Decode(node); err != nil {
- errc <- err
- return
- }
- kv := transform(node)
- if kv == nil {
- continue
- }
- data, err := proto.Marshal(kv)
- if err != nil {
- errc <- err
- return
- }
- buf := make([]byte, 8)
- binary.LittleEndian.PutUint64(buf, uint64(len(data)))
- if _, err := dw.Write(buf); err != nil {
- errc <- err
- return
- }
- if _, err := dw.Write(data); err != nil {
- errc <- err
- return
- }
- }
- errc <- nil
- }()
- return sw, dr, errc
- }
- func transform(n *client.Node) *mvccpb.KeyValue {
- const unKnownVersion = 1
- if n.Dir {
- return nil
- }
- kv := &mvccpb.KeyValue{
- Key: []byte(n.Key),
- Value: []byte(n.Value),
- CreateRevision: int64(n.CreatedIndex),
- ModRevision: int64(n.ModifiedIndex),
- Version: unKnownVersion,
- }
- return kv
- }
|