Explorar o código

fix get object to file

baiyubin %!s(int64=9) %!d(string=hai) anos
pai
achega
4acd85796e
Modificáronse 8 ficheiros con 89 adicións e 66 borrados
  1. 6 3
      oss/bucket.go
  2. 3 0
      oss/bucket_test.go
  3. 9 2
      oss/const.go
  4. 18 14
      oss/download.go
  5. 3 0
      oss/download_test.go
  6. 40 40
      oss/multicopy.go
  7. 4 1
      oss/multipart_test.go
  8. 6 6
      oss/upload.go

+ 6 - 3
oss/bucket.go

@@ -139,6 +139,8 @@ func (bucket Bucket) GetObject(objectKey string, options ...Option) (io.ReadClos
 // error  操作无错误时返回error为nil,非nil为错误说明。
 //
 func (bucket Bucket) GetObjectToFile(objectKey, filePath string, options ...Option) error {
+	tempFilePath := filePath + TempFileSuffix
+
 	// 读取Object内容
 	result, err := bucket.DoGetObject(&GetObjectRequest{objectKey}, options)
 	if err != nil {
@@ -147,14 +149,14 @@ func (bucket Bucket) GetObjectToFile(objectKey, filePath string, options ...Opti
 	defer result.Response.Body.Close()
 
 	// 如果文件不存在则创建,存在则清空
-	fd, err := os.OpenFile(filePath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0660)
+	fd, err := os.OpenFile(tempFilePath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, FilePermMode)
 	if err != nil {
 		return err
 	}
-	defer fd.Close()
 
 	// 存储数据到文件
 	_, err = io.Copy(fd, result.Response.Body)
+	fd.Close()
 	if err != nil {
 		return err
 	}
@@ -165,11 +167,12 @@ func (bucket Bucket) GetObjectToFile(objectKey, filePath string, options ...Opti
 		result.Response.ClientCRC = result.ClientCRC.Sum64()
 		err = checkCRC(result.Response, "GetObjectToFile")
 		if err != nil {
+			os.Remove(tempFilePath)
 			return err
 		}
 	}
 
-	return nil
+	return os.Rename(tempFilePath, filePath)
 }
 
 //

+ 3 - 0
oss/bucket_test.go

@@ -86,6 +86,9 @@ func (s *OssBucketSuite) TearDownTest(c *C) {
 	err = removeTempFiles("../oss", ".txt")
 	c.Assert(err, IsNil)
 
+	err = removeTempFiles("../oss", ".temp")
+	c.Assert(err, IsNil)
+
 	err = removeTempFiles("../oss", ".txt1")
 	c.Assert(err, IsNil)
 

+ 9 - 2
oss/const.go

@@ -1,5 +1,7 @@
 package oss
 
+import "os"
+
 // ACLType Bucket/Object的访问控制
 type ACLType string
 
@@ -74,9 +76,14 @@ const (
 // 其它常量
 const (
 	MaxPartSize = 5 * 1024 * 1024 * 1024 // 文件片最大值,5GB
-	MinPartSize = 100 * 1024             // 文件片最小值,100KB
+	MinPartSize = 100 * 1024             // 文件片最小值,100KBß
+
+	FilePermMode = os.FileMode(0664) // 新建文件默认权限
 
 	TempFilePrefix = "oss-go-temp-" // 临时文件前缀
+	TempFileSuffix = ".temp"        // 临时文件后缀
+
+	CheckpointFileSuffix = ".cp" // Checkpoint文件后缀
 
-	Version = "1.2.1" // Go sdk版本
+	Version = "1.2.2" // Go sdk版本
 )

+ 18 - 14
oss/download.go

@@ -61,7 +61,7 @@ func defaultDownloadPartHook(part downloadPart) error {
 }
 
 // 工作协程
-func downloadWorker(id int, arg downloadWorkerArg, jobs <-chan downloadPart, results chan<- downloadPart, failed chan<- error, die <- chan bool) {
+func downloadWorker(id int, arg downloadWorkerArg, jobs <-chan downloadPart, results chan<- downloadPart, failed chan<- error, die <-chan bool) {
 	for part := range jobs {
 		if err := arg.hook(part); err != nil {
 			failed <- err
@@ -78,12 +78,12 @@ func downloadWorker(id int, arg downloadWorkerArg, jobs <-chan downloadPart, res
 		defer rd.Close()
 
 		select {
-			case <-die:
-				return
-			default:
+		case <-die:
+			return
+		default:
 		}
 
-		fd, err := os.OpenFile(arg.filePath, os.O_WRONLY, 0660)
+		fd, err := os.OpenFile(arg.filePath, os.O_WRONLY, FilePermMode)
 		if err != nil {
 			failed <- err
 			break
@@ -148,8 +148,10 @@ func getDownloadParts(bucket *Bucket, objectKey string, partSize int64) ([]downl
 
 // 并发无断点续传的下载
 func (bucket Bucket) downloadFile(objectKey, filePath string, partSize int64, options []Option, routines int) error {
+	tempFilePath := filePath + TempFileSuffix
+
 	// 如果文件不存在则创建,存在不清空,下载分片会重写文件内容
-	fd, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE, 0660)
+	fd, err := os.OpenFile(tempFilePath, os.O_WRONLY|os.O_CREATE, FilePermMode)
 	if err != nil {
 		return err
 	}
@@ -167,7 +169,7 @@ func (bucket Bucket) downloadFile(objectKey, filePath string, partSize int64, op
 	die := make(chan bool)
 
 	// 启动工作协程
-	arg := downloadWorkerArg{&bucket, objectKey, filePath, options, downloadPartHooker}
+	arg := downloadWorkerArg{&bucket, objectKey, tempFilePath, options, downloadPartHooker}
 	for w := 1; w <= routines; w++ {
 		go downloadWorker(w, arg, jobs, results, failed, die)
 	}
@@ -193,7 +195,7 @@ func (bucket Bucket) downloadFile(objectKey, filePath string, partSize int64, op
 		}
 	}
 
-	return nil
+	return os.Rename(tempFilePath, filePath)
 }
 
 // ----- 并发有断点的下载  -----
@@ -282,7 +284,7 @@ func (cp *downloadCheckpoint) dump(filePath string) error {
 	}
 
 	// dump
-	return ioutil.WriteFile(filePath, js, 0644)
+	return ioutil.WriteFile(filePath, js, FilePermMode)
 }
 
 // 未完成的分片
@@ -331,13 +333,15 @@ func (cp *downloadCheckpoint) prepare(bucket *Bucket, objectKey, filePath string
 	return nil
 }
 
-func (cp *downloadCheckpoint) complete(cpFilePath string) error {
+func (cp *downloadCheckpoint) complete(cpFilePath, downFilepath string) error {
 	os.Remove(cpFilePath)
-    return nil
+	return os.Rename(downFilepath, cp.FilePath)
 }
 
 // 并发带断点的下载
 func (bucket Bucket) downloadFileWithCp(objectKey, filePath string, partSize int64, options []Option, cpFilePath string, routines int) error {
+	tempFilePath := filePath + TempFileSuffix
+
 	// LOAD CP数据
 	dcp := downloadCheckpoint{}
 	err := dcp.load(cpFilePath)
@@ -355,7 +359,7 @@ func (bucket Bucket) downloadFileWithCp(objectKey, filePath string, partSize int
 	}
 
 	// 如果文件不存在则创建,存在不清空,下载分片会重写文件内容
-	fd, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE, 0660)
+	fd, err := os.OpenFile(tempFilePath, os.O_WRONLY|os.O_CREATE, FilePermMode)
 	if err != nil {
 		return err
 	}
@@ -369,7 +373,7 @@ func (bucket Bucket) downloadFileWithCp(objectKey, filePath string, partSize int
 	die := make(chan bool)
 
 	// 启动工作协程
-	arg := downloadWorkerArg{&bucket, objectKey, filePath, options, downloadPartHooker}
+	arg := downloadWorkerArg{&bucket, objectKey, tempFilePath, options, downloadPartHooker}
 	for w := 1; w <= routines; w++ {
 		go downloadWorker(w, arg, jobs, results, failed, die)
 	}
@@ -395,5 +399,5 @@ func (bucket Bucket) downloadFileWithCp(objectKey, filePath string, partSize int
 		}
 	}
 
-	return dcp.complete(cpFilePath)
+	return dcp.complete(cpFilePath, tempFilePath)
 }

+ 3 - 0
oss/download_test.go

@@ -66,6 +66,9 @@ func (s *OssDownloadSuite) SetUpTest(c *C) {
 func (s *OssDownloadSuite) TearDownTest(c *C) {
 	err := removeTempFiles("../oss", ".jpg")
 	c.Assert(err, IsNil)
+
+	err = removeTempFiles("../oss", ".temp")
+	c.Assert(err, IsNil)
 }
 
 // TestUploadRoutineWithoutRecovery 多线程无断点恢复的下载

+ 40 - 40
oss/multicopy.go

@@ -36,11 +36,11 @@ func (bucket Bucket) CopyFile(srcBucketName, srcObjectKey, destObjectKey string,
 	routines := getRoutines(options)
 
 	if cpConf.IsEnable {
-		return bucket.copyFileWithCp(srcBucketName, srcObjectKey, destBucketName, destObjectKey, 
+		return bucket.copyFileWithCp(srcBucketName, srcObjectKey, destBucketName, destObjectKey,
 			partSize, options, cpConf.FilePath, routines)
 	}
 
-	return bucket.copyFile(srcBucketName, srcObjectKey, destBucketName, destObjectKey, 
+	return bucket.copyFile(srcBucketName, srcObjectKey, destBucketName, destObjectKey,
 		partSize, options, routines)
 }
 
@@ -48,12 +48,12 @@ func (bucket Bucket) CopyFile(srcBucketName, srcObjectKey, destObjectKey string,
 
 // 工作协程参数
 type copyWorkerArg struct {
-	bucket         *Bucket
-	imur           InitiateMultipartUploadResult
-	srcBucketName  string  
-	srcObjectKey   string  
-	options        []Option
-	hook           copyPartHook
+	bucket        *Bucket
+	imur          InitiateMultipartUploadResult
+	srcBucketName string
+	srcObjectKey  string
+	options       []Option
+	hook          copyPartHook
 }
 
 // Hook用于测试
@@ -66,23 +66,23 @@ func defaultCopyPartHook(part copyPart) error {
 }
 
 // 工作协程
-func copyWorker(id int, arg copyWorkerArg, jobs <-chan copyPart, results chan<- UploadPart, failed chan<- error, die <- chan bool) {	
+func copyWorker(id int, arg copyWorkerArg, jobs <-chan copyPart, results chan<- UploadPart, failed chan<- error, die <-chan bool) {
 	for chunk := range jobs {
 		if err := arg.hook(chunk); err != nil {
 			failed <- err
 			break
 		}
 		chunkSize := chunk.End - chunk.Start + 1
-		part, err := arg.bucket.UploadPartCopy(arg.imur, arg.srcBucketName, arg.srcObjectKey, 
+		part, err := arg.bucket.UploadPartCopy(arg.imur, arg.srcBucketName, arg.srcObjectKey,
 			chunk.Start, chunkSize, chunk.Number, arg.options...)
 		if err != nil {
 			failed <- err
 			break
 		}
 		select {
-			case <-die:
-				return
-			default:
+		case <-die:
+			return
+		default:
 		}
 		results <- part
 	}
@@ -98,9 +98,9 @@ func copyScheduler(jobs chan copyPart, parts []copyPart) {
 
 // 分片
 type copyPart struct {
-	Number int  // 片序号[1, 10000]
-	Start int64 // 片起始位置
-	End   int64 // 片结束位置
+	Number int   // 片序号[1, 10000]
+	Start  int64 // 片起始位置
+	End    int64 // 片结束位置
 }
 
 // 文件分片
@@ -129,17 +129,17 @@ func getCopyParts(bucket *Bucket, objectKey string, partSize int64) ([]copyPart,
 }
 
 // 并发无断点续传的下载
-func (bucket Bucket) copyFile(srcBucketName, srcObjectKey, destBucketName, destObjectKey string, 
+func (bucket Bucket) copyFile(srcBucketName, srcObjectKey, destBucketName, destObjectKey string,
 	partSize int64, options []Option, routines int) error {
 	descBucket, err := bucket.Client.Bucket(destBucketName)
 	srcBucket, err := bucket.Client.Bucket(srcBucketName)
-	
+
 	// 分割文件
 	parts, err := getCopyParts(srcBucket, srcObjectKey, partSize)
 	if err != nil {
 		return err
 	}
-	
+
 	// 初始化上传任务
 	imur, err := descBucket.InitiateMultipartUpload(destObjectKey, options...)
 	if err != nil {
@@ -193,17 +193,17 @@ func (bucket Bucket) copyFile(srcBucketName, srcObjectKey, destBucketName, destO
 const copyCpMagic = "84F1F18C-FF1D-403B-A1D8-9DEB5F65910A"
 
 type copyCheckpoint struct {
-	Magic    string          // magic
-	MD5      string          // cp内容的MD5
-	SrcBucketName  string    // 源Bucket
-	SrcObjectKey   string    // 源Object
-	DestBucketName string    // 目标Bucket
-	DestObjectKey  string    // 目标Bucket
-	CopyID         string    // copy id
-	ObjStat   objectStat     // 文件状态
-	Parts     []copyPart     // 全部分片
-	CopyParts []UploadPart   // 分片上传成功后的返回值
-	PartStat  []bool         // 分片下载是否完成
+	Magic          string       // magic
+	MD5            string       // cp内容的MD5
+	SrcBucketName  string       // 源Bucket
+	SrcObjectKey   string       // 源Object
+	DestBucketName string       // 目标Bucket
+	DestObjectKey  string       // 目标Bucket
+	CopyID         string       // copy id
+	ObjStat        objectStat   // 文件状态
+	Parts          []copyPart   // 全部分片
+	CopyParts      []UploadPart // 分片上传成功后的返回值
+	PartStat       []bool       // 分片下载是否完成
 }
 
 // CP数据是否有效,CP有效且Object没有更新时有效
@@ -253,8 +253,8 @@ func (cp *copyCheckpoint) load(filePath string) error {
 
 // 更新分片状态
 func (cp *copyCheckpoint) update(part UploadPart) {
-	cp.CopyParts[part.PartNumber - 1] = part
-	cp.PartStat[part.PartNumber - 1] = true	
+	cp.CopyParts[part.PartNumber-1] = part
+	cp.PartStat[part.PartNumber-1] = true
 }
 
 // dump到文件
@@ -278,7 +278,7 @@ func (cp *copyCheckpoint) dump(filePath string) error {
 	}
 
 	// dump
-	return ioutil.WriteFile(filePath, js, 0644)
+	return ioutil.WriteFile(filePath, js, FilePermMode)
 }
 
 // 未完成的分片
@@ -293,7 +293,7 @@ func (cp copyCheckpoint) todoParts() []copyPart {
 }
 
 // 初始化下载任务
-func (cp *copyCheckpoint) prepare(srcBucket *Bucket, srcObjectKey string, destBucket *Bucket, destObjectKey string, 
+func (cp *copyCheckpoint) prepare(srcBucket *Bucket, srcObjectKey string, destBucket *Bucket, destObjectKey string,
 	partSize int64, options []Option) error {
 	// cp
 	cp.Magic = copyCpMagic
@@ -327,7 +327,7 @@ func (cp *copyCheckpoint) prepare(srcBucket *Bucket, srcObjectKey string, destBu
 		cp.PartStat[i] = false
 	}
 	cp.CopyParts = make([]UploadPart, len(cp.Parts))
-	
+
 	// init copy
 	imur, err := destBucket.InitiateMultipartUpload(destObjectKey, options...)
 	if err != nil {
@@ -350,11 +350,11 @@ func (cp *copyCheckpoint) complete(bucket *Bucket, parts []UploadPart, cpFilePat
 }
 
 // 并发带断点的下载
-func (bucket Bucket) copyFileWithCp(srcBucketName, srcObjectKey, destBucketName, destObjectKey string, 
+func (bucket Bucket) copyFileWithCp(srcBucketName, srcObjectKey, destBucketName, destObjectKey string,
 	partSize int64, options []Option, cpFilePath string, routines int) error {
 	descBucket, err := bucket.Client.Bucket(destBucketName)
 	srcBucket, err := bucket.Client.Bucket(srcBucketName)
-	
+
 	// LOAD CP数据
 	ccp := copyCheckpoint{}
 	err = ccp.load(cpFilePath)
@@ -377,7 +377,7 @@ func (bucket Bucket) copyFileWithCp(srcBucketName, srcObjectKey, destBucketName,
 		Bucket:   destBucketName,
 		Key:      destObjectKey,
 		UploadID: ccp.CopyID}
-	
+
 	jobs := make(chan copyPart, len(parts))
 	results := make(chan UploadPart, len(parts))
 	failed := make(chan error)
@@ -398,7 +398,7 @@ func (bucket Bucket) copyFileWithCp(srcBucketName, srcObjectKey, destBucketName,
 		select {
 		case part := <-results:
 			completed++
-			ccp.update(part);
+			ccp.update(part)
 			ccp.dump(cpFilePath)
 		case err := <-failed:
 			close(die)
@@ -409,6 +409,6 @@ func (bucket Bucket) copyFileWithCp(srcBucketName, srcObjectKey, destBucketName,
 			break
 		}
 	}
-	
+
 	return ccp.complete(descBucket, ccp.CopyParts, cpFilePath)
 }

+ 4 - 1
oss/multipart_test.go

@@ -91,6 +91,9 @@ func (s *OssBucketMultipartSuite) TearDownTest(c *C) {
 	err := removeTempFiles("../oss", ".jpg")
 	c.Assert(err, IsNil)
 
+	err = removeTempFiles("../oss", ".temp")
+	c.Assert(err, IsNil)
+
 	err = removeTempFiles("../oss", ".txt1")
 	c.Assert(err, IsNil)
 
@@ -928,7 +931,7 @@ func (s *OssBucketMultipartSuite) TestDownloadFileNegative(c *C) {
 	c.Assert(err, NotNil)
 
 	// 文件不存在
-	err = s.bucket.DownloadFile(objectName, "D:\\work\\oss\\", 1024*1024*1024+1)
+	err = s.bucket.DownloadFile(objectName, "/OSS/TEMP/ZIBI/QUQU/BALA", 1024*1024*1024+1)
 	c.Assert(err, NotNil)
 
 	// Key不存在

+ 6 - 6
oss/upload.go

@@ -57,7 +57,7 @@ func getCpConfig(options []Option, filePath string) (*cpConfig, error) {
 	}
 
 	if cpc.IsEnable && cpc.FilePath == "" {
-		cpc.FilePath = filePath + ".cp"
+		cpc.FilePath = filePath + CheckpointFileSuffix
 	}
 
 	return &cpc, nil
@@ -102,7 +102,7 @@ type workerArg struct {
 }
 
 // 工作协程
-func worker(id int, arg workerArg, jobs <-chan FileChunk, results chan<- UploadPart, failed chan<- error, die <- chan bool) {
+func worker(id int, arg workerArg, jobs <-chan FileChunk, results chan<- UploadPart, failed chan<- error, die <-chan bool) {
 	for chunk := range jobs {
 		if err := arg.hook(id, chunk); err != nil {
 			failed <- err
@@ -114,9 +114,9 @@ func worker(id int, arg workerArg, jobs <-chan FileChunk, results chan<- UploadP
 			break
 		}
 		select {
-			case <-die:
-				return
-			default:
+		case <-die:
+			return
+		default:
 		}
 		results <- part
 	}
@@ -282,7 +282,7 @@ func (cp *uploadCheckpoint) dump(filePath string) error {
 	}
 
 	// dump
-	return ioutil.WriteFile(filePath, js, 0644)
+	return ioutil.WriteFile(filePath, js, FilePermMode)
 }
 
 // 更新分片状态