Parcourir la source

use correct options

hangzws il y a 7 ans
Parent
commit
af10b5fa05
4 fichiers modifiés avec 88 ajouts et 51 suppressions
  1. 24 17
      oss/download.go
  2. 30 23
      oss/multicopy.go
  3. 2 2
      oss/multipart.go
  4. 32 9
      oss/upload.go

+ 24 - 17
oss/download.go

@@ -10,6 +10,7 @@ import (
 	"hash/crc64"
 	"io"
 	"io/ioutil"
+	"net/http"
 	"os"
 	"path/filepath"
 	"strconv"
@@ -215,6 +216,12 @@ func (bucket Bucket) downloadFile(objectKey, filePath string, partSize int64, op
 	tempFilePath := filePath + TempFileSuffix
 	listener := getProgressListener(options)
 
+	payerOptions := []Option{}
+	payer := getPayer(options)
+	if payer != "" {
+		payerOptions = append(payerOptions, RequestPayer(PayerType(payer)))
+	}
+
 	// If the file does not exist, create one. If exists, the download will overwrite it.
 	fd, err := os.OpenFile(tempFilePath, os.O_WRONLY|os.O_CREATE, FilePermMode)
 	if err != nil {
@@ -222,7 +229,7 @@ func (bucket Bucket) downloadFile(objectKey, filePath string, partSize int64, op
 	}
 	fd.Close()
 
-	meta, err := bucket.GetObjectDetailedMeta(objectKey, options...)
+	meta, err := bucket.GetObjectDetailedMeta(objectKey, payerOptions...)
 	if err != nil {
 		return err
 	}
@@ -323,7 +330,7 @@ type objectStat struct {
 }
 
 // isValid flags of checkpoint data is valid. It returns true when the data is valid and the checkpoint is valid and the object is not updated.
-func (cp downloadCheckpoint) isValid(bucket *Bucket, objectKey string, uRange *unpackedRange, options []Option) (bool, error) {
+func (cp downloadCheckpoint) isValid(meta http.Header, uRange *unpackedRange) (bool, error) {
 	// Compare the CP's Magic and the MD5
 	cpb := cp
 	cpb.MD5 = ""
@@ -335,12 +342,6 @@ func (cp downloadCheckpoint) isValid(bucket *Bucket, objectKey string, uRange *u
 		return false, nil
 	}
 
-	// Ensure the object is not updated.
-	meta, err := bucket.GetObjectDetailedMeta(objectKey, options...)
-	if err != nil {
-		return false, err
-	}
-
 	objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
 	if err != nil {
 		return false, err
@@ -422,18 +423,12 @@ func (cp downloadCheckpoint) getCompletedBytes() int64 {
 }
 
 // prepare initiates download tasks
-func (cp *downloadCheckpoint) prepare(bucket *Bucket, objectKey, filePath string, partSize int64, uRange *unpackedRange, options []Option) error {
+func (cp *downloadCheckpoint) prepare(meta http.Header, bucket *Bucket, objectKey, filePath string, partSize int64, uRange *unpackedRange) error {
 	// CP
 	cp.Magic = downloadCpMagic
 	cp.FilePath = filePath
 	cp.Object = objectKey
 
-	// Object
-	meta, err := bucket.GetObjectDetailedMeta(objectKey, options...)
-	if err != nil {
-		return err
-	}
-
 	objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
 	if err != nil {
 		return err
@@ -470,6 +465,12 @@ func (bucket Bucket) downloadFileWithCp(objectKey, filePath string, partSize int
 	tempFilePath := filePath + TempFileSuffix
 	listener := getProgressListener(options)
 
+	payerOptions := []Option{}
+	payer := getPayer(options)
+	if payer != "" {
+		payerOptions = append(payerOptions, RequestPayer(PayerType(payer)))
+	}
+
 	// Load checkpoint data.
 	dcp := downloadCheckpoint{}
 	err := dcp.load(cpFilePath)
@@ -477,10 +478,16 @@ func (bucket Bucket) downloadFileWithCp(objectKey, filePath string, partSize int
 		os.Remove(cpFilePath)
 	}
 
+	// Get the object detailed meta.
+	meta, err := bucket.GetObjectDetailedMeta(objectKey, payerOptions...)
+	if err != nil {
+		return err
+	}
+
 	// Load error or data invalid. Re-initialize the download.
-	valid, err := dcp.isValid(&bucket, objectKey, uRange, options)
+	valid, err := dcp.isValid(meta, uRange)
 	if err != nil || !valid {
-		if err = dcp.prepare(&bucket, objectKey, filePath, partSize, uRange, options); err != nil {
+		if err = dcp.prepare(meta, &bucket, objectKey, filePath, partSize, uRange); err != nil {
 			return err
 		}
 		os.Remove(cpFilePath)

+ 30 - 23
oss/multicopy.go

@@ -7,6 +7,7 @@ import (
 	"errors"
 	"fmt"
 	"io/ioutil"
+	"net/http"
 	"os"
 	"strconv"
 )
@@ -133,7 +134,13 @@ func (bucket Bucket) copyFile(srcBucketName, srcObjectKey, destBucketName, destO
 	srcBucket, err := bucket.Client.Bucket(srcBucketName)
 	listener := getProgressListener(options)
 
-	meta, err := srcBucket.GetObjectDetailedMeta(srcObjectKey, options...)
+	payerOptions := []Option{}
+	payer := getPayer(options)
+	if payer != "" {
+		payerOptions = append(payerOptions, RequestPayer(PayerType(payer)))
+	}
+
+	meta, err := srcBucket.GetObjectDetailedMeta(srcObjectKey, payerOptions...)
 	if err != nil {
 		return err
 	}
@@ -162,7 +169,7 @@ func (bucket Bucket) copyFile(srcBucketName, srcObjectKey, destBucketName, destO
 	publishProgress(listener, event)
 
 	// Start to copy workers
-	arg := copyWorkerArg{descBucket, imur, srcBucketName, srcObjectKey, options, copyPartHooker}
+	arg := copyWorkerArg{descBucket, imur, srcBucketName, srcObjectKey, payerOptions, copyPartHooker}
 	for w := 1; w <= routines; w++ {
 		go copyWorker(w, arg, jobs, results, failed, die)
 	}
@@ -183,7 +190,7 @@ func (bucket Bucket) copyFile(srcBucketName, srcObjectKey, destBucketName, destO
 			publishProgress(listener, event)
 		case err := <-failed:
 			close(die)
-			descBucket.AbortMultipartUpload(imur, options...)
+			descBucket.AbortMultipartUpload(imur, payerOptions...)
 			event = newProgressEvent(TransferFailedEvent, completedBytes, totalBytes)
 			publishProgress(listener, event)
 			return err
@@ -198,9 +205,9 @@ func (bucket Bucket) copyFile(srcBucketName, srcObjectKey, destBucketName, destO
 	publishProgress(listener, event)
 
 	// Complete the multipart upload
-	_, err = descBucket.CompleteMultipartUpload(imur, ups, options...)
+	_, err = descBucket.CompleteMultipartUpload(imur, ups, payerOptions...)
 	if err != nil {
-		bucket.AbortMultipartUpload(imur, options...)
+		bucket.AbortMultipartUpload(imur, payerOptions...)
 		return err
 	}
 	return nil
@@ -225,7 +232,7 @@ type copyCheckpoint struct {
 }
 
 // isValid checks if the data is valid which means CP is valid and object is not updated.
-func (cp copyCheckpoint) isValid(bucket *Bucket, objectKey string) (bool, error) {
+func (cp copyCheckpoint) isValid(meta http.Header) (bool, error) {
 	// Compare CP's magic number and the MD5.
 	cpb := cp
 	cpb.MD5 = ""
@@ -237,12 +244,6 @@ func (cp copyCheckpoint) isValid(bucket *Bucket, objectKey string) (bool, error)
 		return false, nil
 	}
 
-	// Make sure the object is not updated.
-	meta, err := bucket.GetObjectDetailedMeta(objectKey)
-	if err != nil {
-		return false, err
-	}
-
 	objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
 	if err != nil {
 		return false, err
@@ -322,7 +323,7 @@ func (cp copyCheckpoint) getCompletedBytes() int64 {
 }
 
 // prepare initializes the multipart upload
-func (cp *copyCheckpoint) prepare(srcBucket *Bucket, srcObjectKey string, destBucket *Bucket, destObjectKey string,
+func (cp *copyCheckpoint) prepare(meta http.Header, srcBucket *Bucket, srcObjectKey string, destBucket *Bucket, destObjectKey string,
 	partSize int64, options []Option) error {
 	// CP
 	cp.Magic = copyCpMagic
@@ -331,12 +332,6 @@ func (cp *copyCheckpoint) prepare(srcBucket *Bucket, srcObjectKey string, destBu
 	cp.DestBucketName = destBucket.BucketName
 	cp.DestObjectKey = destObjectKey
 
-	// Object
-	meta, err := srcBucket.GetObjectDetailedMeta(srcObjectKey, options...)
-	if err != nil {
-		return err
-	}
-
 	objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
 	if err != nil {
 		return err
@@ -382,6 +377,12 @@ func (bucket Bucket) copyFileWithCp(srcBucketName, srcObjectKey, destBucketName,
 	srcBucket, err := bucket.Client.Bucket(srcBucketName)
 	listener := getProgressListener(options)
 
+	payerOptions := []Option{}
+	payer := getPayer(options)
+	if payer != "" {
+		payerOptions = append(payerOptions, RequestPayer(PayerType(payer)))
+	}
+
 	// Load CP data
 	ccp := copyCheckpoint{}
 	err = ccp.load(cpFilePath)
@@ -389,10 +390,16 @@ func (bucket Bucket) copyFileWithCp(srcBucketName, srcObjectKey, destBucketName,
 		os.Remove(cpFilePath)
 	}
 
+	// Make sure the object is not updated.
+	meta, err := srcBucket.GetObjectDetailedMeta(srcObjectKey, payerOptions...)
+	if err != nil {
+		return err
+	}
+
 	// Load error or the CP data is invalid---reinitialize
-	valid, err := ccp.isValid(srcBucket, srcObjectKey)
+	valid, err := ccp.isValid(meta)
 	if err != nil || !valid {
-		if err = ccp.prepare(srcBucket, srcObjectKey, descBucket, destObjectKey, partSize, options); err != nil {
+		if err = ccp.prepare(meta, srcBucket, srcObjectKey, descBucket, destObjectKey, partSize, options); err != nil {
 			return err
 		}
 		os.Remove(cpFilePath)
@@ -415,7 +422,7 @@ func (bucket Bucket) copyFileWithCp(srcBucketName, srcObjectKey, destBucketName,
 	publishProgress(listener, event)
 
 	// Start the worker coroutines
-	arg := copyWorkerArg{descBucket, imur, srcBucketName, srcObjectKey, options, copyPartHooker}
+	arg := copyWorkerArg{descBucket, imur, srcBucketName, srcObjectKey, payerOptions, copyPartHooker}
 	for w := 1; w <= routines; w++ {
 		go copyWorker(w, arg, jobs, results, failed, die)
 	}
@@ -449,5 +456,5 @@ func (bucket Bucket) copyFileWithCp(srcBucketName, srcObjectKey, destBucketName,
 	event = newProgressEvent(TransferCompletedEvent, completedBytes, ccp.ObjStat.Size)
 	publishProgress(listener, event)
 
-	return ccp.complete(descBucket, ccp.CopyParts, cpFilePath, options)
+	return ccp.complete(descBucket, ccp.CopyParts, cpFilePath, payerOptions)
 }

+ 2 - 2
oss/multipart.go

@@ -107,11 +107,11 @@ func (bucket Bucket) UploadPartFromFile(imur InitiateMultipartUploadResult, file
 //
 func (bucket Bucket) DoUploadPart(request *UploadPartRequest, options []Option) (*UploadPartResult, error) {
 	listener := getProgressListener(options)
-	opts := []Option{ContentLength(request.PartSize)}
+	options = append(options, ContentLength(request.PartSize))
 	params := map[string]interface{}{}
 	params["partNumber"] = strconv.Itoa(request.PartNumber)
 	params["uploadId"] = request.InitResult.UploadID
-	resp, err := bucket.do("PUT", request.InitResult.Key, params, opts,
+	resp, err := bucket.do("PUT", request.InitResult.Key, params, options,
 		&io.LimitedReader{R: request.Reader, N: request.PartSize}, listener)
 	if err != nil {
 		return &UploadPartResult{}, err

+ 32 - 9
oss/upload.go

@@ -83,6 +83,16 @@ func getRoutines(options []Option) int {
 	return rs
 }
 
+// getPayer return the payer of the request
+func getPayer(options []Option) string {
+	payerOpt, err := findOption(options, HTTPHeaderOSSRequester, nil)
+	if err != nil || payerOpt == nil {
+		return ""
+	}
+
+	return payerOpt.(string)
+}
+
 // getProgressListener gets the progress callback
 func getProgressListener(options []Option) ProgressListener {
 	isSet, listener, _ := isOptionSet(options, progressListener)
@@ -106,6 +116,7 @@ type workerArg struct {
 	bucket   *Bucket
 	filePath string
 	imur     InitiateMultipartUploadResult
+	options  []Option
 	hook     uploadPartHook
 }
 
@@ -116,7 +127,7 @@ func worker(id int, arg workerArg, jobs <-chan FileChunk, results chan<- UploadP
 			failed <- err
 			break
 		}
-		part, err := arg.bucket.UploadPartFromFile(arg.imur, arg.filePath, chunk.Offset, chunk.Size, chunk.Number)
+		part, err := arg.bucket.UploadPartFromFile(arg.imur, arg.filePath, chunk.Offset, chunk.Size, chunk.Number, arg.options...)
 		if err != nil {
 			failed <- err
 			break
@@ -155,6 +166,12 @@ func (bucket Bucket) uploadFile(objectKey, filePath string, partSize int64, opti
 		return err
 	}
 
+	payerOptions := []Option{}
+	payer := getPayer(options)
+	if payer != "" {
+		payerOptions = append(payerOptions, RequestPayer(PayerType(payer)))
+	}
+
 	// Initialize the multipart upload
 	imur, err := bucket.InitiateMultipartUpload(objectKey, options...)
 	if err != nil {
@@ -172,7 +189,7 @@ func (bucket Bucket) uploadFile(objectKey, filePath string, partSize int64, opti
 	publishProgress(listener, event)
 
 	// Start the worker coroutine
-	arg := workerArg{&bucket, filePath, imur, uploadPartHooker}
+	arg := workerArg{&bucket, filePath, imur, payerOptions, uploadPartHooker}
 	for w := 1; w <= routines; w++ {
 		go worker(w, arg, jobs, results, failed, die)
 	}
@@ -195,7 +212,7 @@ func (bucket Bucket) uploadFile(objectKey, filePath string, partSize int64, opti
 			close(die)
 			event = newProgressEvent(TransferFailedEvent, completedBytes, totalBytes)
 			publishProgress(listener, event)
-			bucket.AbortMultipartUpload(imur)
+			bucket.AbortMultipartUpload(imur, payerOptions...)
 			return err
 		}
 
@@ -208,9 +225,9 @@ func (bucket Bucket) uploadFile(objectKey, filePath string, partSize int64, opti
 	publishProgress(listener, event)
 
 	// Complete the multpart upload
-	_, err = bucket.CompleteMultipartUpload(imur, parts)
+	_, err = bucket.CompleteMultipartUpload(imur, parts, payerOptions...)
 	if err != nil {
-		bucket.AbortMultipartUpload(imur)
+		bucket.AbortMultipartUpload(imur, payerOptions...)
 		return err
 	}
 	return nil
@@ -407,10 +424,10 @@ func prepare(cp *uploadCheckpoint, objectKey, filePath string, partSize int64, b
 }
 
 // complete completes the multipart upload and deletes the local CP files
-func complete(cp *uploadCheckpoint, bucket *Bucket, parts []UploadPart, cpFilePath string) error {
+func complete(cp *uploadCheckpoint, bucket *Bucket, parts []UploadPart, cpFilePath string, options []Option) error {
 	imur := InitiateMultipartUploadResult{Bucket: bucket.BucketName,
 		Key: cp.ObjectKey, UploadID: cp.UploadID}
-	_, err := bucket.CompleteMultipartUpload(imur, parts)
+	_, err := bucket.CompleteMultipartUpload(imur, parts, options...)
 	if err != nil {
 		return err
 	}
@@ -422,6 +439,12 @@ func complete(cp *uploadCheckpoint, bucket *Bucket, parts []UploadPart, cpFilePa
 func (bucket Bucket) uploadFileWithCp(objectKey, filePath string, partSize int64, options []Option, cpFilePath string, routines int) error {
 	listener := getProgressListener(options)
 
+	payerOptions := []Option{}
+	payer := getPayer(options)
+	if payer != "" {
+		payerOptions = append(payerOptions, RequestPayer(PayerType(payer)))
+	}
+
 	// Load CP data
 	ucp := uploadCheckpoint{}
 	err := ucp.load(cpFilePath)
@@ -454,7 +477,7 @@ func (bucket Bucket) uploadFileWithCp(objectKey, filePath string, partSize int64
 	publishProgress(listener, event)
 
 	// Start the workers
-	arg := workerArg{&bucket, filePath, imur, uploadPartHooker}
+	arg := workerArg{&bucket, filePath, imur, payerOptions, uploadPartHooker}
 	for w := 1; w <= routines; w++ {
 		go worker(w, arg, jobs, results, failed, die)
 	}
@@ -489,6 +512,6 @@ func (bucket Bucket) uploadFileWithCp(objectKey, filePath string, partSize int64
 	publishProgress(listener, event)
 
 	// Complete the multipart upload
-	err = complete(&ucp, &bucket, ucp.allParts(), cpFilePath)
+	err = complete(&ucp, &bucket, ucp.allParts(), cpFilePath, payerOptions)
 	return err
 }