|
@@ -5,6 +5,8 @@ import (
|
|
|
"encoding/base64"
|
|
"encoding/base64"
|
|
|
"encoding/json"
|
|
"encoding/json"
|
|
|
"errors"
|
|
"errors"
|
|
|
|
|
+ "hash"
|
|
|
|
|
+ "hash/crc64"
|
|
|
"io"
|
|
"io"
|
|
|
"io/ioutil"
|
|
"io/ioutil"
|
|
|
"os"
|
|
"os"
|
|
@@ -23,7 +25,7 @@ import (
|
|
|
//
|
|
//
|
|
|
func (bucket Bucket) DownloadFile(objectKey, filePath string, partSize int64, options ...Option) error {
|
|
func (bucket Bucket) DownloadFile(objectKey, filePath string, partSize int64, options ...Option) error {
|
|
|
if partSize < 1 {
|
|
if partSize < 1 {
|
|
|
- return errors.New("oss: part size smaller than 1.")
|
|
|
|
|
|
|
+ return errors.New("oss: part size smaller than 1")
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
cpConf, err := getCpConfig(options, filePath)
|
|
cpConf, err := getCpConfig(options, filePath)
|
|
@@ -58,11 +60,12 @@ func getRangeConfig(options []Option) (*unpackedRange, error) {
|
|
|
|
|
|
|
|
// 工作协程参数
|
|
// 工作协程参数
|
|
|
type downloadWorkerArg struct {
|
|
type downloadWorkerArg struct {
|
|
|
- bucket *Bucket
|
|
|
|
|
- key string
|
|
|
|
|
- filePath string
|
|
|
|
|
- options []Option
|
|
|
|
|
- hook downloadPartHook
|
|
|
|
|
|
|
+ bucket *Bucket
|
|
|
|
|
+ key string
|
|
|
|
|
+ filePath string
|
|
|
|
|
+ options []Option
|
|
|
|
|
+ hook downloadPartHook
|
|
|
|
|
+ enableCRC bool
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// Hook用于测试
|
|
// Hook用于测试
|
|
@@ -105,6 +108,14 @@ func downloadWorker(id int, arg downloadWorkerArg, jobs <-chan downloadPart, res
|
|
|
}
|
|
}
|
|
|
defer rd.Close()
|
|
defer rd.Close()
|
|
|
|
|
|
|
|
|
|
+ var crcCalc hash.Hash64
|
|
|
|
|
+ if arg.enableCRC {
|
|
|
|
|
+ crcCalc = crc64.New(crcTable())
|
|
|
|
|
+ contentLen := part.End - part.Start + 1
|
|
|
|
|
+ rd = ioutil.NopCloser(TeeReader(rd, crcCalc, contentLen, nil, nil))
|
|
|
|
|
+ }
|
|
|
|
|
+ defer rd.Close()
|
|
|
|
|
+
|
|
|
select {
|
|
select {
|
|
|
case <-die:
|
|
case <-die:
|
|
|
return
|
|
return
|
|
@@ -131,6 +142,10 @@ func downloadWorker(id int, arg downloadWorkerArg, jobs <-chan downloadPart, res
|
|
|
break
|
|
break
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ if arg.enableCRC {
|
|
|
|
|
+ part.CRC64 = crcCalc.Sum64()
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
fd.Close()
|
|
fd.Close()
|
|
|
results <- part
|
|
results <- part
|
|
|
}
|
|
}
|
|
@@ -146,23 +161,33 @@ func downloadScheduler(jobs chan downloadPart, parts []downloadPart) {
|
|
|
|
|
|
|
|
// 下载片
|
|
// 下载片
|
|
|
type downloadPart struct {
|
|
type downloadPart struct {
|
|
|
- Index int // 片序号,从0开始编号
|
|
|
|
|
- Start int64 // 片起始位置
|
|
|
|
|
- End int64 // 片结束位置
|
|
|
|
|
- Offset int64 // 偏移位置
|
|
|
|
|
|
|
+ Index int // 片序号,从0开始编号
|
|
|
|
|
+ Start int64 // 片起始位置
|
|
|
|
|
+ End int64 // 片结束位置
|
|
|
|
|
+ Offset int64 // 文件中的偏移位置
|
|
|
|
|
+ CRC64 uint64 // 片的校验值
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// 文件分片
|
|
// 文件分片
|
|
|
-func getDownloadParts(bucket *Bucket, objectKey string, partSize int64, uRange *unpackedRange) ([]downloadPart, error) {
|
|
|
|
|
|
|
+func getDownloadParts(bucket *Bucket, objectKey string, partSize int64, uRange *unpackedRange) ([]downloadPart, bool, uint64, error) {
|
|
|
meta, err := bucket.GetObjectDetailedMeta(objectKey)
|
|
meta, err := bucket.GetObjectDetailedMeta(objectKey)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
- return nil, err
|
|
|
|
|
|
|
+ return nil, false, 0, err
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
parts := []downloadPart{}
|
|
parts := []downloadPart{}
|
|
|
objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
|
|
objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
- return nil, err
|
|
|
|
|
|
|
+ return nil, false, 0, err
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ enableCRC := false
|
|
|
|
|
+ crcVal := (uint64)(0)
|
|
|
|
|
+ if bucket.getConfig().IsEnableCRC && meta.Get(HTTPHeaderOssCRC64) != "" {
|
|
|
|
|
+ if uRange == nil || (!uRange.hasStart && !uRange.hasEnd) {
|
|
|
|
|
+ enableCRC = true
|
|
|
|
|
+ crcVal, _ = strconv.ParseUint(meta.Get(HTTPHeaderOssCRC64), 10, 0)
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
part := downloadPart{}
|
|
part := downloadPart{}
|
|
@@ -173,10 +198,11 @@ func getDownloadParts(bucket *Bucket, objectKey string, partSize int64, uRange *
|
|
|
part.Start = offset
|
|
part.Start = offset
|
|
|
part.End = GetPartEnd(offset, end, partSize)
|
|
part.End = GetPartEnd(offset, end, partSize)
|
|
|
part.Offset = start
|
|
part.Offset = start
|
|
|
|
|
+ part.CRC64 = 0
|
|
|
parts = append(parts, part)
|
|
parts = append(parts, part)
|
|
|
i++
|
|
i++
|
|
|
}
|
|
}
|
|
|
- return parts, nil
|
|
|
|
|
|
|
+ return parts, enableCRC, crcVal, nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// 文件大小
|
|
// 文件大小
|
|
@@ -188,6 +214,20 @@ func getObjectBytes(parts []downloadPart) int64 {
|
|
|
return ob
|
|
return ob
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+// 计算连续分片总的CRC
|
|
|
|
|
+func combineCRCInParts(dps []downloadPart) uint64 {
|
|
|
|
|
+ if dps == nil || len(dps) == 0 {
|
|
|
|
|
+ return 0
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ crc := dps[0].CRC64
|
|
|
|
|
+ for i := 1; i < len(dps); i++ {
|
|
|
|
|
+ crc = CRC64Combine(crc, dps[i].CRC64, (uint64)(dps[i].End-dps[i].Start+1))
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ return crc
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
// 并发无断点续传的下载
|
|
// 并发无断点续传的下载
|
|
|
func (bucket Bucket) downloadFile(objectKey, filePath string, partSize int64, options []Option, routines int, uRange *unpackedRange) error {
|
|
func (bucket Bucket) downloadFile(objectKey, filePath string, partSize int64, options []Option, routines int, uRange *unpackedRange) error {
|
|
|
tempFilePath := filePath + TempFileSuffix
|
|
tempFilePath := filePath + TempFileSuffix
|
|
@@ -201,7 +241,7 @@ func (bucket Bucket) downloadFile(objectKey, filePath string, partSize int64, op
|
|
|
fd.Close()
|
|
fd.Close()
|
|
|
|
|
|
|
|
// 分割文件
|
|
// 分割文件
|
|
|
- parts, err := getDownloadParts(&bucket, objectKey, partSize, uRange)
|
|
|
|
|
|
|
+ parts, enableCRC, expectedCRC, err := getDownloadParts(&bucket, objectKey, partSize, uRange)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return err
|
|
return err
|
|
|
}
|
|
}
|
|
@@ -217,7 +257,7 @@ func (bucket Bucket) downloadFile(objectKey, filePath string, partSize int64, op
|
|
|
publishProgress(listener, event)
|
|
publishProgress(listener, event)
|
|
|
|
|
|
|
|
// 启动工作协程
|
|
// 启动工作协程
|
|
|
- arg := downloadWorkerArg{&bucket, objectKey, tempFilePath, options, downloadPartHooker}
|
|
|
|
|
|
|
+ arg := downloadWorkerArg{&bucket, objectKey, tempFilePath, options, downloadPartHooker, enableCRC}
|
|
|
for w := 1; w <= routines; w++ {
|
|
for w := 1; w <= routines; w++ {
|
|
|
go downloadWorker(w, arg, jobs, results, failed, die)
|
|
go downloadWorker(w, arg, jobs, results, failed, die)
|
|
|
}
|
|
}
|
|
@@ -227,13 +267,12 @@ func (bucket Bucket) downloadFile(objectKey, filePath string, partSize int64, op
|
|
|
|
|
|
|
|
// 等待分片下载完成
|
|
// 等待分片下载完成
|
|
|
completed := 0
|
|
completed := 0
|
|
|
- ps := make([]downloadPart, len(parts))
|
|
|
|
|
for completed < len(parts) {
|
|
for completed < len(parts) {
|
|
|
select {
|
|
select {
|
|
|
case part := <-results:
|
|
case part := <-results:
|
|
|
completed++
|
|
completed++
|
|
|
- ps[part.Index] = part
|
|
|
|
|
completedBytes += (part.End - part.Start + 1)
|
|
completedBytes += (part.End - part.Start + 1)
|
|
|
|
|
+ parts[part.Index].CRC64 = part.CRC64
|
|
|
event = newProgressEvent(TransferDataEvent, completedBytes, totalBytes)
|
|
event = newProgressEvent(TransferDataEvent, completedBytes, totalBytes)
|
|
|
publishProgress(listener, event)
|
|
publishProgress(listener, event)
|
|
|
case err := <-failed:
|
|
case err := <-failed:
|
|
@@ -251,6 +290,14 @@ func (bucket Bucket) downloadFile(objectKey, filePath string, partSize int64, op
|
|
|
event = newProgressEvent(TransferCompletedEvent, completedBytes, totalBytes)
|
|
event = newProgressEvent(TransferCompletedEvent, completedBytes, totalBytes)
|
|
|
publishProgress(listener, event)
|
|
publishProgress(listener, event)
|
|
|
|
|
|
|
|
|
|
+ if enableCRC {
|
|
|
|
|
+ actualCRC := combineCRCInParts(parts)
|
|
|
|
|
+ err = checkDownloadCRC(actualCRC, expectedCRC)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return err
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
return os.Rename(tempFilePath, filePath)
|
|
return os.Rename(tempFilePath, filePath)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -259,15 +306,17 @@ func (bucket Bucket) downloadFile(objectKey, filePath string, partSize int64, op
|
|
|
const downloadCpMagic = "92611BED-89E2-46B6-89E5-72F273D4B0A3"
|
|
const downloadCpMagic = "92611BED-89E2-46B6-89E5-72F273D4B0A3"
|
|
|
|
|
|
|
|
type downloadCheckpoint struct {
|
|
type downloadCheckpoint struct {
|
|
|
- Magic string // magic
|
|
|
|
|
- MD5 string // cp内容的MD5
|
|
|
|
|
- FilePath string // 本地文件
|
|
|
|
|
- Object string // key
|
|
|
|
|
- ObjStat objectStat // 文件状态
|
|
|
|
|
- Parts []downloadPart // 全部分片
|
|
|
|
|
- PartStat []bool // 分片下载是否完成
|
|
|
|
|
- Start int64 // 起点
|
|
|
|
|
- End int64 // 终点
|
|
|
|
|
|
|
+ Magic string // magic
|
|
|
|
|
+ MD5 string // cp内容的MD5
|
|
|
|
|
+ FilePath string // 本地文件
|
|
|
|
|
+ Object string // key
|
|
|
|
|
+ ObjStat objectStat // 文件状态
|
|
|
|
|
+ Parts []downloadPart // 全部分片
|
|
|
|
|
+ PartStat []bool // 分片下载是否完成
|
|
|
|
|
+ Start int64 // 起点
|
|
|
|
|
+ End int64 // 终点
|
|
|
|
|
+ enableCRC bool // 是否有CRC校验
|
|
|
|
|
+ CRC uint64 // CRC校验值
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
type objectStat struct {
|
|
type objectStat struct {
|
|
@@ -398,7 +447,7 @@ func (cp *downloadCheckpoint) prepare(bucket *Bucket, objectKey, filePath string
|
|
|
cp.ObjStat.Etag = meta.Get(HTTPHeaderEtag)
|
|
cp.ObjStat.Etag = meta.Get(HTTPHeaderEtag)
|
|
|
|
|
|
|
|
// parts
|
|
// parts
|
|
|
- cp.Parts, err = getDownloadParts(bucket, objectKey, partSize, uRange)
|
|
|
|
|
|
|
+ cp.Parts, cp.enableCRC, cp.CRC, err = getDownloadParts(bucket, objectKey, partSize, uRange)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return err
|
|
return err
|
|
|
}
|
|
}
|
|
@@ -455,7 +504,7 @@ func (bucket Bucket) downloadFileWithCp(objectKey, filePath string, partSize int
|
|
|
publishProgress(listener, event)
|
|
publishProgress(listener, event)
|
|
|
|
|
|
|
|
// 启动工作协程
|
|
// 启动工作协程
|
|
|
- arg := downloadWorkerArg{&bucket, objectKey, tempFilePath, options, downloadPartHooker}
|
|
|
|
|
|
|
+ arg := downloadWorkerArg{&bucket, objectKey, tempFilePath, options, downloadPartHooker, dcp.enableCRC}
|
|
|
for w := 1; w <= routines; w++ {
|
|
for w := 1; w <= routines; w++ {
|
|
|
go downloadWorker(w, arg, jobs, results, failed, die)
|
|
go downloadWorker(w, arg, jobs, results, failed, die)
|
|
|
}
|
|
}
|
|
@@ -470,6 +519,7 @@ func (bucket Bucket) downloadFileWithCp(objectKey, filePath string, partSize int
|
|
|
case part := <-results:
|
|
case part := <-results:
|
|
|
completed++
|
|
completed++
|
|
|
dcp.PartStat[part.Index] = true
|
|
dcp.PartStat[part.Index] = true
|
|
|
|
|
+ dcp.Parts[part.Index].CRC64 = part.CRC64
|
|
|
dcp.dump(cpFilePath)
|
|
dcp.dump(cpFilePath)
|
|
|
completedBytes += (part.End - part.Start + 1)
|
|
completedBytes += (part.End - part.Start + 1)
|
|
|
event = newProgressEvent(TransferDataEvent, completedBytes, dcp.ObjStat.Size)
|
|
event = newProgressEvent(TransferDataEvent, completedBytes, dcp.ObjStat.Size)
|
|
@@ -489,5 +539,13 @@ func (bucket Bucket) downloadFileWithCp(objectKey, filePath string, partSize int
|
|
|
event = newProgressEvent(TransferCompletedEvent, completedBytes, dcp.ObjStat.Size)
|
|
event = newProgressEvent(TransferCompletedEvent, completedBytes, dcp.ObjStat.Size)
|
|
|
publishProgress(listener, event)
|
|
publishProgress(listener, event)
|
|
|
|
|
|
|
|
|
|
+ if dcp.enableCRC {
|
|
|
|
|
+ actualCRC := combineCRCInParts(dcp.Parts)
|
|
|
|
|
+ err = checkDownloadCRC(actualCRC, dcp.CRC)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return err
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
return dcp.complete(cpFilePath, tempFilePath)
|
|
return dcp.complete(cpFilePath, tempFilePath)
|
|
|
}
|
|
}
|