Browse Source

update checkpoint file path style

hangzws 7 years ago
parent
commit
a7c64e9543
4 changed files with 46 additions and 34 deletions
  1. 10 8
      oss/download.go
  2. 8 8
      oss/multicopy.go
  3. 3 3
      oss/option.go
  4. 25 15
      oss/upload.go

+ 10 - 8
oss/download.go

@@ -5,11 +5,13 @@ import (
 	"encoding/base64"
 	"encoding/json"
 	"errors"
+	"fmt"
 	"hash"
 	"hash/crc64"
 	"io"
 	"io/ioutil"
 	"os"
+	"path/filepath"
 	"strconv"
 )
 
@@ -27,20 +29,20 @@ func (bucket Bucket) DownloadFile(objectKey, filePath string, partSize int64, op
 		return errors.New("oss: part size smaller than 1")
 	}
 
-	cpConf, err := getCpConfig(options, filePath)
-	if err != nil {
-		return err
-	}
-
 	uRange, err := getRangeConfig(options)
 	if err != nil {
 		return err
 	}
 
+	cpConf := getCpConfig(options)
 	routines := getRoutines(options)
 
-	if cpConf.IsEnable {
-		return bucket.downloadFileWithCp(objectKey, filePath, partSize, options, cpConf.FilePath, routines, uRange)
+	if cpConf != nil && cpConf.IsEnable && cpConf.cpDir != "" {
+		src := fmt.Sprintf("oss://%v/%v", bucket.BucketName, objectKey)
+		absPath, _ := filepath.Abs(filePath)
+		cpFileName := getCpFileName(src, absPath)
+		cpFilePath := cpConf.cpDir + string(os.PathSeparator) + cpFileName
+		return bucket.downloadFileWithCp(objectKey, filePath, partSize, options, cpFilePath, routines, uRange)
 	}
 
 	return bucket.downloadFile(objectKey, filePath, partSize, options, routines, uRange)
@@ -76,7 +78,7 @@ func defaultDownloadPartHook(part downloadPart) error {
 	return nil
 }
 
-// defaultDownloadProgressListener defines default ProgressListener, shields the ProgressListener in options of GetObject. 
+// defaultDownloadProgressListener defines default ProgressListener, shields the ProgressListener in options of GetObject.
 type defaultDownloadProgressListener struct {
 }
 

+ 8 - 8
oss/multicopy.go

@@ -5,9 +5,9 @@ import (
 	"encoding/base64"
 	"encoding/json"
 	"errors"
+	"fmt"
 	"io/ioutil"
 	"os"
-	"path/filepath"
 	"strconv"
 )
 
@@ -27,16 +27,16 @@ func (bucket Bucket) CopyFile(srcBucketName, srcObjectKey, destObjectKey string,
 		return errors.New("oss: part size invalid range (1024KB, 5GB]")
 	}
 
-	cpConf, err := getCpConfig(options, filepath.Base(destObjectKey))
-	if err != nil {
-		return err
-	}
-
+	cpConf := getCpConfig(options)
 	routines := getRoutines(options)
 
-	if cpConf.IsEnable {
+	if cpConf != nil && cpConf.IsEnable && cpConf.cpDir != "" {
+		src := fmt.Sprintf("oss://%v/%v", srcBucketName, srcObjectKey)
+		dest := fmt.Sprintf("oss://%v/%v", bucket.BucketName, destObjectKey)
+		cpFileName := getCpFileName(src, dest)
+		cpFilePath := cpConf.cpDir + string(os.PathSeparator) + cpFileName
 		return bucket.copyFileWithCp(srcBucketName, srcObjectKey, destBucketName, destObjectKey,
-			partSize, options, cpConf.FilePath, routines)
+			partSize, options, cpFilePath, routines)
 	}
 
 	return bucket.copyFile(srcBucketName, srcObjectKey, destBucketName, destObjectKey,

+ 3 - 3
oss/option.go

@@ -255,12 +255,12 @@ func StorageClass(value StorageClassType) Option {
 // Checkpoint configuration
 type cpConfig struct {
 	IsEnable bool
-	FilePath string
+	cpDir    string
 }
 
 // Checkpoint sets the isEnable flag and checkpoint file path for DownloadFile/UploadFile.
-func Checkpoint(isEnable bool, filePath string) Option {
-	return addArg(checkpointConfig, &cpConfig{isEnable, filePath})
+func Checkpoint(isEnable bool, cpDir string) Option {
+	return addArg(checkpointConfig, &cpConfig{isEnable, cpDir})
 }
 
 // Routines DownloadFile/UploadFile routine count

+ 25 - 15
oss/upload.go

@@ -3,10 +3,13 @@ package oss
 import (
 	"crypto/md5"
 	"encoding/base64"
+	"encoding/hex"
 	"encoding/json"
 	"errors"
+	"fmt"
 	"io/ioutil"
 	"os"
+	"path/filepath"
 	"time"
 )
 
@@ -24,15 +27,15 @@ func (bucket Bucket) UploadFile(objectKey, filePath string, partSize int64, opti
 		return errors.New("oss: part size invalid range (1024KB, 5GB]")
 	}
 
-	cpConf, err := getCpConfig(options, filePath)
-	if err != nil {
-		return err
-	}
-
+	cpConf := getCpConfig(options)
 	routines := getRoutines(options)
 
-	if cpConf.IsEnable {
-		return bucket.uploadFileWithCp(objectKey, filePath, partSize, options, cpConf.FilePath, routines)
+	if cpConf != nil && cpConf.IsEnable && cpConf.cpDir != "" {
+		dest := fmt.Sprintf("oss://%v/%v", bucket.BucketName, objectKey)
+		absPath, _ := filepath.Abs(filePath)
+		cpFileName := getCpFileName(absPath, dest)
+		cpFilePath := cpConf.cpDir + string(os.PathSeparator) + cpFileName
+		return bucket.uploadFileWithCp(objectKey, filePath, partSize, options, cpFilePath, routines)
 	}
 
 	return bucket.uploadFile(objectKey, filePath, partSize, options, routines)
@@ -41,19 +44,26 @@ func (bucket Bucket) UploadFile(objectKey, filePath string, partSize int64, opti
 // ----- concurrent upload without checkpoint  -----
 
 // getCpConfig gets checkpoint configuration
-func getCpConfig(options []Option, filePath string) (*cpConfig, error) {
-	cpc := &cpConfig{}
+func getCpConfig(options []Option) *cpConfig {
 	cpcOpt, err := findOption(options, checkpointConfig, nil)
 	if err != nil || cpcOpt == nil {
-		return cpc, err
+		return nil
 	}
 
-	cpc = cpcOpt.(*cpConfig)
-	if cpc.IsEnable && cpc.FilePath == "" {
-		cpc.FilePath = filePath + CheckpointFileSuffix
-	}
+	return cpcOpt.(*cpConfig)
+}
+
+// getCpFileName return the name of the checkpoint file
+func getCpFileName(src, dest string) string {
+	md5Ctx := md5.New()
+	md5Ctx.Write([]byte(src))
+	srcCheckSum := hex.EncodeToString(md5Ctx.Sum(nil))
+
+	md5Ctx.Reset()
+	md5Ctx.Write([]byte(dest))
+	destCheckSum := hex.EncodeToString(md5Ctx.Sum(nil))
 
-	return cpc, nil
+	return fmt.Sprintf("%v-%v.cp", srcCheckSum, destCheckSum)
 }
 
 // getRoutines gets the routine count. by default it's 1.