|
@@ -5,9 +5,10 @@ import (
|
|
|
"encoding/base64"
|
|
"encoding/base64"
|
|
|
"encoding/json"
|
|
"encoding/json"
|
|
|
"errors"
|
|
"errors"
|
|
|
|
|
+ "fmt"
|
|
|
"io/ioutil"
|
|
"io/ioutil"
|
|
|
|
|
+ "net/http"
|
|
|
"os"
|
|
"os"
|
|
|
- "path/filepath"
|
|
|
|
|
"strconv"
|
|
"strconv"
|
|
|
)
|
|
)
|
|
|
|
|
|
|
@@ -27,22 +28,30 @@ func (bucket Bucket) CopyFile(srcBucketName, srcObjectKey, destObjectKey string,
|
|
|
return errors.New("oss: part size invalid range (1024KB, 5GB]")
|
|
return errors.New("oss: part size invalid range (1024KB, 5GB]")
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- cpConf, err := getCpConfig(options, filepath.Base(destObjectKey))
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- return err
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
|
|
+ cpConf := getCpConfig(options)
|
|
|
routines := getRoutines(options)
|
|
routines := getRoutines(options)
|
|
|
|
|
|
|
|
- if cpConf.IsEnable {
|
|
|
|
|
- return bucket.copyFileWithCp(srcBucketName, srcObjectKey, destBucketName, destObjectKey,
|
|
|
|
|
- partSize, options, cpConf.FilePath, routines)
|
|
|
|
|
|
|
+ if cpConf != nil && cpConf.IsEnable {
|
|
|
|
|
+ cpFilePath := getCopyCpFilePath(cpConf, srcBucketName, srcObjectKey, destBucketName, destObjectKey)
|
|
|
|
|
+ if cpFilePath != "" {
|
|
|
|
|
+ return bucket.copyFileWithCp(srcBucketName, srcObjectKey, destBucketName, destObjectKey, partSize, options, cpFilePath, routines)
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
return bucket.copyFile(srcBucketName, srcObjectKey, destBucketName, destObjectKey,
|
|
return bucket.copyFile(srcBucketName, srcObjectKey, destBucketName, destObjectKey,
|
|
|
partSize, options, routines)
|
|
partSize, options, routines)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+func getCopyCpFilePath(cpConf *cpConfig, srcBucket, srcObject, destBucket, destObject string) string {
|
|
|
|
|
+ if cpConf.FilePath == "" && cpConf.DirPath != "" {
|
|
|
|
|
+ dest := fmt.Sprintf("oss://%v/%v", destBucket, destObject)
|
|
|
|
|
+ src := fmt.Sprintf("oss://%v/%v", srcBucket, srcObject)
|
|
|
|
|
+ cpFileName := getCpFileName(src, dest)
|
|
|
|
|
+ cpConf.FilePath = cpConf.DirPath + string(os.PathSeparator) + cpFileName
|
|
|
|
|
+ }
|
|
|
|
|
+ return cpConf.FilePath
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
// ----- Concurrently copy without checkpoint ---------
|
|
// ----- Concurrently copy without checkpoint ---------
|
|
|
|
|
|
|
|
// copyWorkerArg defines the copy worker arguments
|
|
// copyWorkerArg defines the copy worker arguments
|
|
@@ -103,18 +112,8 @@ type copyPart struct {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// getCopyParts calculates copy parts
|
|
// getCopyParts calculates copy parts
|
|
|
-func getCopyParts(bucket *Bucket, objectKey string, partSize int64) ([]copyPart, error) {
|
|
|
|
|
- meta, err := bucket.GetObjectDetailedMeta(objectKey)
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- return nil, err
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
|
|
+func getCopyParts(objectSize, partSize int64) []copyPart {
|
|
|
parts := []copyPart{}
|
|
parts := []copyPart{}
|
|
|
- objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- return nil, err
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
part := copyPart{}
|
|
part := copyPart{}
|
|
|
i := 0
|
|
i := 0
|
|
|
for offset := int64(0); offset < objectSize; offset += partSize {
|
|
for offset := int64(0); offset < objectSize; offset += partSize {
|
|
@@ -124,7 +123,7 @@ func getCopyParts(bucket *Bucket, objectKey string, partSize int64) ([]copyPart,
|
|
|
parts = append(parts, part)
|
|
parts = append(parts, part)
|
|
|
i++
|
|
i++
|
|
|
}
|
|
}
|
|
|
- return parts, nil
|
|
|
|
|
|
|
+ return parts
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// getSrcObjectBytes gets the source file size
|
|
// getSrcObjectBytes gets the source file size
|
|
@@ -143,12 +142,24 @@ func (bucket Bucket) copyFile(srcBucketName, srcObjectKey, destBucketName, destO
|
|
|
srcBucket, err := bucket.Client.Bucket(srcBucketName)
|
|
srcBucket, err := bucket.Client.Bucket(srcBucketName)
|
|
|
listener := getProgressListener(options)
|
|
listener := getProgressListener(options)
|
|
|
|
|
|
|
|
- // Get copy parts
|
|
|
|
|
- parts, err := getCopyParts(srcBucket, srcObjectKey, partSize)
|
|
|
|
|
|
|
+ payerOptions := []Option{}
|
|
|
|
|
+ payer := getPayer(options)
|
|
|
|
|
+ if payer != "" {
|
|
|
|
|
+ payerOptions = append(payerOptions, RequestPayer(PayerType(payer)))
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ meta, err := srcBucket.GetObjectDetailedMeta(srcObjectKey, payerOptions...)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return err
|
|
return err
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return err
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // Get copy parts
|
|
|
|
|
+ parts := getCopyParts(objectSize, partSize)
|
|
|
// Initialize the multipart upload
|
|
// Initialize the multipart upload
|
|
|
imur, err := descBucket.InitiateMultipartUpload(destObjectKey, options...)
|
|
imur, err := descBucket.InitiateMultipartUpload(destObjectKey, options...)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
@@ -166,7 +177,7 @@ func (bucket Bucket) copyFile(srcBucketName, srcObjectKey, destBucketName, destO
|
|
|
publishProgress(listener, event)
|
|
publishProgress(listener, event)
|
|
|
|
|
|
|
|
// Start to copy workers
|
|
// Start to copy workers
|
|
|
- arg := copyWorkerArg{descBucket, imur, srcBucketName, srcObjectKey, options, copyPartHooker}
|
|
|
|
|
|
|
+ arg := copyWorkerArg{descBucket, imur, srcBucketName, srcObjectKey, payerOptions, copyPartHooker}
|
|
|
for w := 1; w <= routines; w++ {
|
|
for w := 1; w <= routines; w++ {
|
|
|
go copyWorker(w, arg, jobs, results, failed, die)
|
|
go copyWorker(w, arg, jobs, results, failed, die)
|
|
|
}
|
|
}
|
|
@@ -187,7 +198,7 @@ func (bucket Bucket) copyFile(srcBucketName, srcObjectKey, destBucketName, destO
|
|
|
publishProgress(listener, event)
|
|
publishProgress(listener, event)
|
|
|
case err := <-failed:
|
|
case err := <-failed:
|
|
|
close(die)
|
|
close(die)
|
|
|
- descBucket.AbortMultipartUpload(imur)
|
|
|
|
|
|
|
+ descBucket.AbortMultipartUpload(imur, payerOptions...)
|
|
|
event = newProgressEvent(TransferFailedEvent, completedBytes, totalBytes)
|
|
event = newProgressEvent(TransferFailedEvent, completedBytes, totalBytes)
|
|
|
publishProgress(listener, event)
|
|
publishProgress(listener, event)
|
|
|
return err
|
|
return err
|
|
@@ -202,9 +213,9 @@ func (bucket Bucket) copyFile(srcBucketName, srcObjectKey, destBucketName, destO
|
|
|
publishProgress(listener, event)
|
|
publishProgress(listener, event)
|
|
|
|
|
|
|
|
// Complete the multipart upload
|
|
// Complete the multipart upload
|
|
|
- _, err = descBucket.CompleteMultipartUpload(imur, ups)
|
|
|
|
|
|
|
+ _, err = descBucket.CompleteMultipartUpload(imur, ups, payerOptions...)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
- bucket.AbortMultipartUpload(imur)
|
|
|
|
|
|
|
+ bucket.AbortMultipartUpload(imur, payerOptions...)
|
|
|
return err
|
|
return err
|
|
|
}
|
|
}
|
|
|
return nil
|
|
return nil
|
|
@@ -229,7 +240,7 @@ type copyCheckpoint struct {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// isValid checks if the data is valid which means CP is valid and object is not updated.
|
|
// isValid checks if the data is valid which means CP is valid and object is not updated.
|
|
|
-func (cp copyCheckpoint) isValid(bucket *Bucket, objectKey string) (bool, error) {
|
|
|
|
|
|
|
+func (cp copyCheckpoint) isValid(meta http.Header) (bool, error) {
|
|
|
// Compare CP's magic number and the MD5.
|
|
// Compare CP's magic number and the MD5.
|
|
|
cpb := cp
|
|
cpb := cp
|
|
|
cpb.MD5 = ""
|
|
cpb.MD5 = ""
|
|
@@ -241,12 +252,6 @@ func (cp copyCheckpoint) isValid(bucket *Bucket, objectKey string) (bool, error)
|
|
|
return false, nil
|
|
return false, nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // Make sure the object is not updated.
|
|
|
|
|
- meta, err := bucket.GetObjectDetailedMeta(objectKey)
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- return false, err
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
|
|
objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return false, err
|
|
return false, err
|
|
@@ -326,7 +331,7 @@ func (cp copyCheckpoint) getCompletedBytes() int64 {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// prepare initializes the multipart upload
|
|
// prepare initializes the multipart upload
|
|
|
-func (cp *copyCheckpoint) prepare(srcBucket *Bucket, srcObjectKey string, destBucket *Bucket, destObjectKey string,
|
|
|
|
|
|
|
+func (cp *copyCheckpoint) prepare(meta http.Header, srcBucket *Bucket, srcObjectKey string, destBucket *Bucket, destObjectKey string,
|
|
|
partSize int64, options []Option) error {
|
|
partSize int64, options []Option) error {
|
|
|
// CP
|
|
// CP
|
|
|
cp.Magic = copyCpMagic
|
|
cp.Magic = copyCpMagic
|
|
@@ -335,12 +340,6 @@ func (cp *copyCheckpoint) prepare(srcBucket *Bucket, srcObjectKey string, destBu
|
|
|
cp.DestBucketName = destBucket.BucketName
|
|
cp.DestBucketName = destBucket.BucketName
|
|
|
cp.DestObjectKey = destObjectKey
|
|
cp.DestObjectKey = destObjectKey
|
|
|
|
|
|
|
|
- // Object
|
|
|
|
|
- meta, err := srcBucket.GetObjectDetailedMeta(srcObjectKey)
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- return err
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
|
|
objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return err
|
|
return err
|
|
@@ -351,10 +350,7 @@ func (cp *copyCheckpoint) prepare(srcBucket *Bucket, srcObjectKey string, destBu
|
|
|
cp.ObjStat.Etag = meta.Get(HTTPHeaderEtag)
|
|
cp.ObjStat.Etag = meta.Get(HTTPHeaderEtag)
|
|
|
|
|
|
|
|
// Parts
|
|
// Parts
|
|
|
- cp.Parts, err = getCopyParts(srcBucket, srcObjectKey, partSize)
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- return err
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ cp.Parts = getCopyParts(objectSize, partSize)
|
|
|
cp.PartStat = make([]bool, len(cp.Parts))
|
|
cp.PartStat = make([]bool, len(cp.Parts))
|
|
|
for i := range cp.PartStat {
|
|
for i := range cp.PartStat {
|
|
|
cp.PartStat[i] = false
|
|
cp.PartStat[i] = false
|
|
@@ -371,10 +367,10 @@ func (cp *copyCheckpoint) prepare(srcBucket *Bucket, srcObjectKey string, destBu
|
|
|
return nil
|
|
return nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (cp *copyCheckpoint) complete(bucket *Bucket, parts []UploadPart, cpFilePath string) error {
|
|
|
|
|
|
|
+func (cp *copyCheckpoint) complete(bucket *Bucket, parts []UploadPart, cpFilePath string, options []Option) error {
|
|
|
imur := InitiateMultipartUploadResult{Bucket: cp.DestBucketName,
|
|
imur := InitiateMultipartUploadResult{Bucket: cp.DestBucketName,
|
|
|
Key: cp.DestObjectKey, UploadID: cp.CopyID}
|
|
Key: cp.DestObjectKey, UploadID: cp.CopyID}
|
|
|
- _, err := bucket.CompleteMultipartUpload(imur, parts)
|
|
|
|
|
|
|
+ _, err := bucket.CompleteMultipartUpload(imur, parts, options...)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return err
|
|
return err
|
|
|
}
|
|
}
|
|
@@ -389,6 +385,12 @@ func (bucket Bucket) copyFileWithCp(srcBucketName, srcObjectKey, destBucketName,
|
|
|
srcBucket, err := bucket.Client.Bucket(srcBucketName)
|
|
srcBucket, err := bucket.Client.Bucket(srcBucketName)
|
|
|
listener := getProgressListener(options)
|
|
listener := getProgressListener(options)
|
|
|
|
|
|
|
|
|
|
+ payerOptions := []Option{}
|
|
|
|
|
+ payer := getPayer(options)
|
|
|
|
|
+ if payer != "" {
|
|
|
|
|
+ payerOptions = append(payerOptions, RequestPayer(PayerType(payer)))
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
// Load CP data
|
|
// Load CP data
|
|
|
ccp := copyCheckpoint{}
|
|
ccp := copyCheckpoint{}
|
|
|
err = ccp.load(cpFilePath)
|
|
err = ccp.load(cpFilePath)
|
|
@@ -396,10 +398,16 @@ func (bucket Bucket) copyFileWithCp(srcBucketName, srcObjectKey, destBucketName,
|
|
|
os.Remove(cpFilePath)
|
|
os.Remove(cpFilePath)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ // Make sure the object is not updated.
|
|
|
|
|
+ meta, err := srcBucket.GetObjectDetailedMeta(srcObjectKey, payerOptions...)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return err
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
// Load error or the CP data is invalid---reinitialize
|
|
// Load error or the CP data is invalid---reinitialize
|
|
|
- valid, err := ccp.isValid(srcBucket, srcObjectKey)
|
|
|
|
|
|
|
+ valid, err := ccp.isValid(meta)
|
|
|
if err != nil || !valid {
|
|
if err != nil || !valid {
|
|
|
- if err = ccp.prepare(srcBucket, srcObjectKey, descBucket, destObjectKey, partSize, options); err != nil {
|
|
|
|
|
|
|
+ if err = ccp.prepare(meta, srcBucket, srcObjectKey, descBucket, destObjectKey, partSize, options); err != nil {
|
|
|
return err
|
|
return err
|
|
|
}
|
|
}
|
|
|
os.Remove(cpFilePath)
|
|
os.Remove(cpFilePath)
|
|
@@ -422,7 +430,7 @@ func (bucket Bucket) copyFileWithCp(srcBucketName, srcObjectKey, destBucketName,
|
|
|
publishProgress(listener, event)
|
|
publishProgress(listener, event)
|
|
|
|
|
|
|
|
// Start the worker coroutines
|
|
// Start the worker coroutines
|
|
|
- arg := copyWorkerArg{descBucket, imur, srcBucketName, srcObjectKey, options, copyPartHooker}
|
|
|
|
|
|
|
+ arg := copyWorkerArg{descBucket, imur, srcBucketName, srcObjectKey, payerOptions, copyPartHooker}
|
|
|
for w := 1; w <= routines; w++ {
|
|
for w := 1; w <= routines; w++ {
|
|
|
go copyWorker(w, arg, jobs, results, failed, die)
|
|
go copyWorker(w, arg, jobs, results, failed, die)
|
|
|
}
|
|
}
|
|
@@ -456,5 +464,5 @@ func (bucket Bucket) copyFileWithCp(srcBucketName, srcObjectKey, destBucketName,
|
|
|
event = newProgressEvent(TransferCompletedEvent, completedBytes, ccp.ObjStat.Size)
|
|
event = newProgressEvent(TransferCompletedEvent, completedBytes, ccp.ObjStat.Size)
|
|
|
publishProgress(listener, event)
|
|
publishProgress(listener, event)
|
|
|
|
|
|
|
|
- return ccp.complete(descBucket, ccp.CopyParts, cpFilePath)
|
|
|
|
|
|
|
+ return ccp.complete(descBucket, ccp.CopyParts, cpFilePath, payerOptions)
|
|
|
}
|
|
}
|