|
|
@@ -1,16 +1,16 @@
|
|
|
package command
|
|
|
|
|
|
import (
|
|
|
- "errors"
|
|
|
"fmt"
|
|
|
"io/ioutil"
|
|
|
"log"
|
|
|
"os"
|
|
|
- "strings"
|
|
|
"sync"
|
|
|
+ "time"
|
|
|
|
|
|
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/codegangsta/cli"
|
|
|
- "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd"
|
|
|
+ "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
|
|
+ "github.com/coreos/etcd/client"
|
|
|
"github.com/coreos/etcd/store"
|
|
|
)
|
|
|
|
|
|
@@ -46,33 +46,13 @@ func handleImportSnap(c *cli.Context) {
|
|
|
|
|
|
st := store.New()
|
|
|
err = st.Recovery(d)
|
|
|
- if err != nil {
|
|
|
- fmt.Printf("cannot recover the snapshot file: %v\n", err)
|
|
|
- os.Exit(1)
|
|
|
- }
|
|
|
-
|
|
|
- endpoints, err := getEndpoints(c)
|
|
|
- if err != nil {
|
|
|
- handleError(ExitServerError, err)
|
|
|
- }
|
|
|
- tr, err := getTransport(c)
|
|
|
- if err != nil {
|
|
|
- handleError(ExitServerError, err)
|
|
|
- }
|
|
|
|
|
|
wg := &sync.WaitGroup{}
|
|
|
setc := make(chan set)
|
|
|
concurrent := c.Int("c")
|
|
|
fmt.Printf("starting to import snapshot %s with %d clients\n", c.String("snap"), concurrent)
|
|
|
for i := 0; i < concurrent; i++ {
|
|
|
- client := etcd.NewClient(endpoints)
|
|
|
- client.SetTransport(tr)
|
|
|
-
|
|
|
- if ok := client.SyncCluster(); !ok {
|
|
|
- handleError(ExitBadConnection, errors.New("cannot sync with the cluster using endpoints "+strings.Join(endpoints, ", ")))
|
|
|
- }
|
|
|
- wg.Add(1)
|
|
|
- go runSet(client, setc, wg)
|
|
|
+ go runSet(mustNewKeyAPI(c), setc, wg)
|
|
|
}
|
|
|
|
|
|
all, err := st.Get("/", true, true)
|
|
|
@@ -108,14 +88,14 @@ func copyKeys(n *store.NodeExtern, setc chan set) int {
|
|
|
return num
|
|
|
}
|
|
|
|
|
|
-func runSet(c *etcd.Client, setc chan set, wg *sync.WaitGroup) {
|
|
|
+func runSet(ki client.KeysAPI, setc chan set, wg *sync.WaitGroup) {
|
|
|
for s := range setc {
|
|
|
log.Println("copying key:", s.key)
|
|
|
if s.ttl != 0 && s.ttl < 300 {
|
|
|
log.Printf("extending key %s's ttl to 300 seconds", s.key)
|
|
|
s.ttl = 5 * 60
|
|
|
}
|
|
|
- _, err := c.Set(s.key, s.value, uint64(s.ttl))
|
|
|
+ _, err := ki.Set(context.TODO(), s.key, s.value, &client.SetOptions{TTL: time.Duration(s.ttl) * time.Second})
|
|
|
if err != nil {
|
|
|
log.Fatalf("failed to copy key: %v\n", err)
|
|
|
}
|