|
@@ -44,7 +44,7 @@ func getUploadCpFilePath(cpConf *cpConfig, srcFile, destBucket, destObject strin
|
|
|
if cpConf.FilePath == "" && cpConf.DirPath != "" {
|
|
if cpConf.FilePath == "" && cpConf.DirPath != "" {
|
|
|
dest := fmt.Sprintf("oss://%v/%v", destBucket, destObject)
|
|
dest := fmt.Sprintf("oss://%v/%v", destBucket, destObject)
|
|
|
absPath, _ := filepath.Abs(srcFile)
|
|
absPath, _ := filepath.Abs(srcFile)
|
|
|
- cpFileName := getCpFileName(absPath, dest)
|
|
|
|
|
|
|
+ cpFileName := getCpFileName(absPath, dest, "")
|
|
|
cpConf.FilePath = cpConf.DirPath + string(os.PathSeparator) + cpFileName
|
|
cpConf.FilePath = cpConf.DirPath + string(os.PathSeparator) + cpFileName
|
|
|
}
|
|
}
|
|
|
return cpConf.FilePath
|
|
return cpConf.FilePath
|
|
@@ -63,7 +63,7 @@ func getCpConfig(options []Option) *cpConfig {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// getCpFileName return the name of the checkpoint file
|
|
// getCpFileName return the name of the checkpoint file
|
|
|
-func getCpFileName(src, dest string) string {
|
|
|
|
|
|
|
+func getCpFileName(src, dest, versionId string) string {
|
|
|
md5Ctx := md5.New()
|
|
md5Ctx := md5.New()
|
|
|
md5Ctx.Write([]byte(src))
|
|
md5Ctx.Write([]byte(src))
|
|
|
srcCheckSum := hex.EncodeToString(md5Ctx.Sum(nil))
|
|
srcCheckSum := hex.EncodeToString(md5Ctx.Sum(nil))
|
|
@@ -72,7 +72,14 @@ func getCpFileName(src, dest string) string {
|
|
|
md5Ctx.Write([]byte(dest))
|
|
md5Ctx.Write([]byte(dest))
|
|
|
destCheckSum := hex.EncodeToString(md5Ctx.Sum(nil))
|
|
destCheckSum := hex.EncodeToString(md5Ctx.Sum(nil))
|
|
|
|
|
|
|
|
- return fmt.Sprintf("%v-%v.cp", srcCheckSum, destCheckSum)
|
|
|
|
|
|
|
+ if versionId == "" {
|
|
|
|
|
+ return fmt.Sprintf("%v-%v.cp", srcCheckSum, destCheckSum)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ md5Ctx.Reset()
|
|
|
|
|
+ md5Ctx.Write([]byte(versionId))
|
|
|
|
|
+ versionCheckSum := hex.EncodeToString(md5Ctx.Sum(nil))
|
|
|
|
|
+ return fmt.Sprintf("%v-%v-%v.cp", srcCheckSum, destCheckSum, versionCheckSum)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// getRoutines gets the routine count. by default it's 1.
|
|
// getRoutines gets the routine count. by default it's 1.
|
|
@@ -175,6 +182,10 @@ func (bucket Bucket) uploadFile(objectKey, filePath string, partSize int64, opti
|
|
|
return err
|
|
return err
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ partOptions := ChoiceTransferPartOption(options)
|
|
|
|
|
+ completeOptions := ChoiceCompletePartOption(options)
|
|
|
|
|
+ abortOptions := ChoiceAbortPartOption(options)
|
|
|
|
|
+
|
|
|
// Initialize the multipart upload
|
|
// Initialize the multipart upload
|
|
|
imur, err := bucket.InitiateMultipartUpload(objectKey, options...)
|
|
imur, err := bucket.InitiateMultipartUpload(objectKey, options...)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
@@ -191,11 +202,8 @@ func (bucket Bucket) uploadFile(objectKey, filePath string, partSize int64, opti
|
|
|
event := newProgressEvent(TransferStartedEvent, 0, totalBytes, 0)
|
|
event := newProgressEvent(TransferStartedEvent, 0, totalBytes, 0)
|
|
|
publishProgress(listener, event)
|
|
publishProgress(listener, event)
|
|
|
|
|
|
|
|
- // oss server don't support x-oss-storage-class
|
|
|
|
|
- options = deleteOption(options, HTTPHeaderOssStorageClass)
|
|
|
|
|
-
|
|
|
|
|
// Start the worker coroutine
|
|
// Start the worker coroutine
|
|
|
- arg := workerArg{&bucket, filePath, imur, options, uploadPartHooker}
|
|
|
|
|
|
|
+ arg := workerArg{&bucket, filePath, imur, partOptions, uploadPartHooker}
|
|
|
for w := 1; w <= routines; w++ {
|
|
for w := 1; w <= routines; w++ {
|
|
|
go worker(w, arg, jobs, results, failed, die)
|
|
go worker(w, arg, jobs, results, failed, die)
|
|
|
}
|
|
}
|
|
@@ -221,7 +229,7 @@ func (bucket Bucket) uploadFile(objectKey, filePath string, partSize int64, opti
|
|
|
close(die)
|
|
close(die)
|
|
|
event = newProgressEvent(TransferFailedEvent, completedBytes, totalBytes, 0)
|
|
event = newProgressEvent(TransferFailedEvent, completedBytes, totalBytes, 0)
|
|
|
publishProgress(listener, event)
|
|
publishProgress(listener, event)
|
|
|
- bucket.AbortMultipartUpload(imur, options...)
|
|
|
|
|
|
|
+ bucket.AbortMultipartUpload(imur, abortOptions...)
|
|
|
return err
|
|
return err
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -234,9 +242,9 @@ func (bucket Bucket) uploadFile(objectKey, filePath string, partSize int64, opti
|
|
|
publishProgress(listener, event)
|
|
publishProgress(listener, event)
|
|
|
|
|
|
|
|
// Complete the multpart upload
|
|
// Complete the multpart upload
|
|
|
- _, err = bucket.CompleteMultipartUpload(imur, parts, options...)
|
|
|
|
|
|
|
+ _, err = bucket.CompleteMultipartUpload(imur, parts, completeOptions...)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
- bucket.AbortMultipartUpload(imur, options...)
|
|
|
|
|
|
|
+ bucket.AbortMultipartUpload(imur, abortOptions...)
|
|
|
return err
|
|
return err
|
|
|
}
|
|
}
|
|
|
return nil
|
|
return nil
|
|
@@ -299,7 +307,7 @@ func (cp uploadCheckpoint) isValid(filePath string) (bool, error) {
|
|
|
|
|
|
|
|
// Compare the file size, file's last modified time and file's MD5
|
|
// Compare the file size, file's last modified time and file's MD5
|
|
|
if cp.FileStat.Size != st.Size() ||
|
|
if cp.FileStat.Size != st.Size() ||
|
|
|
- cp.FileStat.LastModified != st.ModTime() ||
|
|
|
|
|
|
|
+ !cp.FileStat.LastModified.Equal(st.ModTime()) ||
|
|
|
cp.FileStat.MD5 != md {
|
|
cp.FileStat.MD5 != md {
|
|
|
return false, nil
|
|
return false, nil
|
|
|
}
|
|
}
|
|
@@ -448,6 +456,9 @@ func complete(cp *uploadCheckpoint, bucket *Bucket, parts []UploadPart, cpFilePa
|
|
|
func (bucket Bucket) uploadFileWithCp(objectKey, filePath string, partSize int64, options []Option, cpFilePath string, routines int) error {
|
|
func (bucket Bucket) uploadFileWithCp(objectKey, filePath string, partSize int64, options []Option, cpFilePath string, routines int) error {
|
|
|
listener := getProgressListener(options)
|
|
listener := getProgressListener(options)
|
|
|
|
|
|
|
|
|
|
+ partOptions := ChoiceTransferPartOption(options)
|
|
|
|
|
+ completeOptions := ChoiceCompletePartOption(options)
|
|
|
|
|
+
|
|
|
// Load CP data
|
|
// Load CP data
|
|
|
ucp := uploadCheckpoint{}
|
|
ucp := uploadCheckpoint{}
|
|
|
err := ucp.load(cpFilePath)
|
|
err := ucp.load(cpFilePath)
|
|
@@ -482,11 +493,8 @@ func (bucket Bucket) uploadFileWithCp(objectKey, filePath string, partSize int64
|
|
|
event := newProgressEvent(TransferStartedEvent, completedBytes, ucp.FileStat.Size, 0)
|
|
event := newProgressEvent(TransferStartedEvent, completedBytes, ucp.FileStat.Size, 0)
|
|
|
publishProgress(listener, event)
|
|
publishProgress(listener, event)
|
|
|
|
|
|
|
|
- // oss server don't support x-oss-storage-class
|
|
|
|
|
- options = deleteOption(options, HTTPHeaderOssStorageClass)
|
|
|
|
|
-
|
|
|
|
|
// Start the workers
|
|
// Start the workers
|
|
|
- arg := workerArg{&bucket, filePath, imur, options, uploadPartHooker}
|
|
|
|
|
|
|
+ arg := workerArg{&bucket, filePath, imur, partOptions, uploadPartHooker}
|
|
|
for w := 1; w <= routines; w++ {
|
|
for w := 1; w <= routines; w++ {
|
|
|
go worker(w, arg, jobs, results, failed, die)
|
|
go worker(w, arg, jobs, results, failed, die)
|
|
|
}
|
|
}
|
|
@@ -521,6 +529,6 @@ func (bucket Bucket) uploadFileWithCp(objectKey, filePath string, partSize int64
|
|
|
publishProgress(listener, event)
|
|
publishProgress(listener, event)
|
|
|
|
|
|
|
|
// Complete the multipart upload
|
|
// Complete the multipart upload
|
|
|
- err = complete(&ucp, &bucket, ucp.allParts(), cpFilePath, options)
|
|
|
|
|
|
|
+ err = complete(&ucp, &bucket, ucp.allParts(), cpFilePath, completeOptions)
|
|
|
return err
|
|
return err
|
|
|
}
|
|
}
|