|
|
@@ -31,13 +31,27 @@ func (bucket Bucket) DownloadFile(objectKey, filePath string, partSize int64, op
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
+ uRange, err := getRangeConfig(options)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
routines := getRoutines(options)
|
|
|
|
|
|
if cpConf.IsEnable {
|
|
|
- return bucket.downloadFileWithCp(objectKey, filePath, partSize, options, cpConf.FilePath, routines)
|
|
|
+ return bucket.downloadFileWithCp(objectKey, filePath, partSize, options, cpConf.FilePath, routines, uRange)
|
|
|
}
|
|
|
|
|
|
- return bucket.downloadFile(objectKey, filePath, partSize, options, routines)
|
|
|
+ return bucket.downloadFile(objectKey, filePath, partSize, options, routines, uRange)
|
|
|
+}
|
|
|
+
|
|
|
+// 获取下载范围
|
|
|
+func getRangeConfig(options []Option) (*unpackedRange, error) {
|
|
|
+ rangeOpt, err := findOption(options, HTTPHeaderRange, nil)
|
|
|
+ if err != nil || rangeOpt == nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ return parseRange(rangeOpt.(string))
|
|
|
}
|
|
|
|
|
|
// ----- 并发无断点的下载 -----
|
|
|
@@ -104,7 +118,7 @@ func downloadWorker(id int, arg downloadWorkerArg, jobs <-chan downloadPart, res
|
|
|
}
|
|
|
defer fd.Close()
|
|
|
|
|
|
- _, err = fd.Seek(part.Start, os.SEEK_SET)
|
|
|
+ _, err = fd.Seek(part.Start-part.Offset, os.SEEK_SET)
|
|
|
if err != nil {
|
|
|
failed <- err
|
|
|
break
|
|
|
@@ -130,13 +144,14 @@ func downloadScheduler(jobs chan downloadPart, parts []downloadPart) {
|
|
|
|
|
|
// 下载片
|
|
|
type downloadPart struct {
|
|
|
- Index int // 片序号,从0开始编号
|
|
|
- Start int64 // 片起始位置
|
|
|
- End int64 // 片结束位置
|
|
|
+ Index int // 片序号,从0开始编号
|
|
|
+ Start int64 // 片起始位置
|
|
|
+ End int64 // 片结束位置
|
|
|
+ Offset int64 // 偏移位置
|
|
|
}
|
|
|
|
|
|
// 文件分片
|
|
|
-func getDownloadParts(bucket *Bucket, objectKey string, partSize int64) ([]downloadPart, error) {
|
|
|
+func getDownloadParts(bucket *Bucket, objectKey string, partSize int64, uRange *unpackedRange) ([]downloadPart, error) {
|
|
|
meta, err := bucket.GetObjectDetailedMeta(objectKey)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
@@ -150,10 +165,12 @@ func getDownloadParts(bucket *Bucket, objectKey string, partSize int64) ([]downl
|
|
|
|
|
|
part := downloadPart{}
|
|
|
i := 0
|
|
|
- for offset := int64(0); offset < objectSize; offset += partSize {
|
|
|
+ start, end := adjustRange(uRange, objectSize)
|
|
|
+ for offset := start; offset < end; offset += partSize {
|
|
|
part.Index = i
|
|
|
part.Start = offset
|
|
|
- part.End = GetPartEnd(offset, objectSize, partSize)
|
|
|
+ part.End = GetPartEnd(offset, end, partSize)
|
|
|
+ part.Offset = start
|
|
|
parts = append(parts, part)
|
|
|
i++
|
|
|
}
|
|
|
@@ -170,7 +187,7 @@ func getObjectBytes(parts []downloadPart) int64 {
|
|
|
}
|
|
|
|
|
|
// 并发无断点续传的下载
|
|
|
-func (bucket Bucket) downloadFile(objectKey, filePath string, partSize int64, options []Option, routines int) error {
|
|
|
+func (bucket Bucket) downloadFile(objectKey, filePath string, partSize int64, options []Option, routines int, uRange *unpackedRange) error {
|
|
|
tempFilePath := filePath + TempFileSuffix
|
|
|
listener := getProgressListener(options)
|
|
|
|
|
|
@@ -182,7 +199,7 @@ func (bucket Bucket) downloadFile(objectKey, filePath string, partSize int64, op
|
|
|
fd.Close()
|
|
|
|
|
|
// 分割文件
|
|
|
- parts, err := getDownloadParts(&bucket, objectKey, partSize)
|
|
|
+ parts, err := getDownloadParts(&bucket, objectKey, partSize, uRange)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
@@ -247,6 +264,8 @@ type downloadCheckpoint struct {
|
|
|
ObjStat objectStat // 文件状态
|
|
|
Parts []downloadPart // 全部分片
|
|
|
PartStat []bool // 分片下载是否完成
|
|
|
+ Start int64 // 起点
|
|
|
+ End int64 // 终点
|
|
|
}
|
|
|
|
|
|
type objectStat struct {
|
|
|
@@ -256,7 +275,7 @@ type objectStat struct {
|
|
|
}
|
|
|
|
|
|
// CP数据是否有效,CP有效且Object没有更新时有效
|
|
|
-func (cp downloadCheckpoint) isValid(bucket *Bucket, objectKey string) (bool, error) {
|
|
|
+func (cp downloadCheckpoint) isValid(bucket *Bucket, objectKey string, uRange *unpackedRange) (bool, error) {
|
|
|
// 比较CP的Magic及MD5
|
|
|
cpb := cp
|
|
|
cpb.MD5 = ""
|
|
|
@@ -286,6 +305,14 @@ func (cp downloadCheckpoint) isValid(bucket *Bucket, objectKey string) (bool, er
|
|
|
return false, nil
|
|
|
}
|
|
|
|
|
|
+ // 确认下载范围是否变化
|
|
|
+ if uRange != nil {
|
|
|
+ start, end := adjustRange(uRange, objectSize)
|
|
|
+ if start != cp.Start || end != cp.End {
|
|
|
+ return false, nil
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
return true, nil
|
|
|
}
|
|
|
|
|
|
@@ -347,7 +374,7 @@ func (cp downloadCheckpoint) getCompletedBytes() int64 {
|
|
|
}
|
|
|
|
|
|
// 初始化下载任务
|
|
|
-func (cp *downloadCheckpoint) prepare(bucket *Bucket, objectKey, filePath string, partSize int64) error {
|
|
|
+func (cp *downloadCheckpoint) prepare(bucket *Bucket, objectKey, filePath string, partSize int64, uRange *unpackedRange) error {
|
|
|
// cp
|
|
|
cp.Magic = downloadCpMagic
|
|
|
cp.FilePath = filePath
|
|
|
@@ -369,7 +396,7 @@ func (cp *downloadCheckpoint) prepare(bucket *Bucket, objectKey, filePath string
|
|
|
cp.ObjStat.Etag = meta.Get(HTTPHeaderEtag)
|
|
|
|
|
|
// parts
|
|
|
- cp.Parts, err = getDownloadParts(bucket, objectKey, partSize)
|
|
|
+ cp.Parts, err = getDownloadParts(bucket, objectKey, partSize, uRange)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
@@ -387,7 +414,7 @@ func (cp *downloadCheckpoint) complete(cpFilePath, downFilepath string) error {
|
|
|
}
|
|
|
|
|
|
// 并发带断点的下载
|
|
|
-func (bucket Bucket) downloadFileWithCp(objectKey, filePath string, partSize int64, options []Option, cpFilePath string, routines int) error {
|
|
|
+func (bucket Bucket) downloadFileWithCp(objectKey, filePath string, partSize int64, options []Option, cpFilePath string, routines int, uRange *unpackedRange) error {
|
|
|
tempFilePath := filePath + TempFileSuffix
|
|
|
listener := getProgressListener(options)
|
|
|
|
|
|
@@ -399,9 +426,9 @@ func (bucket Bucket) downloadFileWithCp(objectKey, filePath string, partSize int
|
|
|
}
|
|
|
|
|
|
// LOAD出错或数据无效重新初始化下载
|
|
|
- valid, err := dcp.isValid(&bucket, objectKey)
|
|
|
+ valid, err := dcp.isValid(&bucket, objectKey, uRange)
|
|
|
if err != nil || !valid {
|
|
|
- if err = dcp.prepare(&bucket, objectKey, filePath, partSize); err != nil {
|
|
|
+ if err = dcp.prepare(&bucket, objectKey, filePath, partSize, uRange); err != nil {
|
|
|
return err
|
|
|
}
|
|
|
os.Remove(cpFilePath)
|