|
|
@@ -17,6 +17,7 @@ package command
|
|
|
import (
|
|
|
"errors"
|
|
|
"fmt"
|
|
|
+ "strings"
|
|
|
"sync/atomic"
|
|
|
"time"
|
|
|
|
|
|
@@ -29,11 +30,13 @@ import (
|
|
|
)
|
|
|
|
|
|
var (
|
|
|
- mminsecureTr bool
|
|
|
- mmcert string
|
|
|
- mmkey string
|
|
|
- mmcacert string
|
|
|
- mmprefix string
|
|
|
+ mminsecureTr bool
|
|
|
+ mmcert string
|
|
|
+ mmkey string
|
|
|
+ mmcacert string
|
|
|
+ mmprefix string
|
|
|
+ mmdestprefix string
|
|
|
+ mmremdestprefix bool
|
|
|
)
|
|
|
|
|
|
// NewMakeMirrorCommand returns the cobra command for "makeMirror".
|
|
|
@@ -45,7 +48,8 @@ func NewMakeMirrorCommand() *cobra.Command {
|
|
|
}
|
|
|
|
|
|
c.Flags().StringVar(&mmprefix, "prefix", "", "Key-value prefix to mirror")
|
|
|
- // TODO: add dest-prefix to mirror a prefix to a different prefix in the destination cluster?
|
|
|
+ c.Flags().StringVar(&mmdestprefix, "dest-prefix", "", "destination prefix to mirror a prefix to a different prefix in the destination cluster")
|
|
|
+ c.Flags().BoolVar(&mmremdestprefix, "no-dest-prefix", false, "mirror key-values to the root of the destination cluster")
|
|
|
c.Flags().StringVar(&mmcert, "dest-cert", "", "Identify secure client using this TLS certificate file for the destination cluster")
|
|
|
c.Flags().StringVar(&mmkey, "dest-key", "", "Identify secure client using this TLS key file")
|
|
|
c.Flags().StringVar(&mmcacert, "dest-cacert", "", "Verify certificates of TLS enabled secure servers using this CA bundle")
|
|
|
@@ -85,14 +89,23 @@ func makeMirror(ctx context.Context, c *clientv3.Client, dc *clientv3.Client) er
|
|
|
}
|
|
|
}()
|
|
|
|
|
|
- // TODO: remove the prefix of the destination cluster?
|
|
|
s := mirror.NewSyncer(c, mmprefix, 0)
|
|
|
|
|
|
rc, errc := s.SyncBase(ctx)
|
|
|
|
|
|
+ // if destination prefix is specified and remove destination prefix is true return error
|
|
|
+ if mmremdestprefix && len(mmdestprefix) > 0 {
|
|
|
+ ExitWithError(ExitBadArgs, fmt.Errorf("`--dest-prefix` and `--rem-dest-prefix` cannot be set at the same time, choose one."))
|
|
|
+ }
|
|
|
+
|
|
|
+ // if remove destination prefix is false and destination prefix is empty set the value of destination prefix same as prefix
|
|
|
+ if !mmremdestprefix && len(mmdestprefix) == 0 {
|
|
|
+ mmdestprefix = mmprefix
|
|
|
+ }
|
|
|
+
|
|
|
for r := range rc {
|
|
|
for _, kv := range r.Kvs {
|
|
|
- _, err := dc.Put(ctx, string(kv.Key), string(kv.Value))
|
|
|
+ _, err := dc.Put(ctx, modifyPrefix(string(kv.Key)), string(kv.Value))
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
@@ -126,10 +139,10 @@ func makeMirror(ctx context.Context, c *clientv3.Client, dc *clientv3.Client) er
|
|
|
}
|
|
|
switch ev.Type {
|
|
|
case mvccpb.PUT:
|
|
|
- ops = append(ops, clientv3.OpPut(string(ev.Kv.Key), string(ev.Kv.Value)))
|
|
|
+ ops = append(ops, clientv3.OpPut(modifyPrefix(string(ev.Kv.Key)), string(ev.Kv.Value)))
|
|
|
atomic.AddInt64(&total, 1)
|
|
|
case mvccpb.DELETE:
|
|
|
- ops = append(ops, clientv3.OpDelete(string(ev.Kv.Key)))
|
|
|
+ ops = append(ops, clientv3.OpDelete(modifyPrefix(string(ev.Kv.Key))))
|
|
|
atomic.AddInt64(&total, 1)
|
|
|
default:
|
|
|
panic("unexpected event type")
|
|
|
@@ -146,3 +159,7 @@ func makeMirror(ctx context.Context, c *clientv3.Client, dc *clientv3.Client) er
|
|
|
|
|
|
return nil
|
|
|
}
|
|
|
+
|
|
|
+func modifyPrefix(key string) string {
|
|
|
+ return strings.Replace(key, mmprefix, mmdestprefix, 1)
|
|
|
+}
|