download.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551
  1. package oss
  2. import (
  3. "crypto/md5"
  4. "encoding/base64"
  5. "encoding/json"
  6. "errors"
  7. "hash"
  8. "hash/crc64"
  9. "io"
  10. "io/ioutil"
  11. "os"
  12. "strconv"
  13. )
  14. //
  15. // DownloadFile 分片下载文件
  16. //
  17. // objectKey object key。
  18. // filePath 本地文件。objectKey下载到文件。
  19. // partSize 本次上传文件片的大小,字节数。比如100 * 1024为每片100KB。
  20. // options Object的属性限制项。详见GetObject。
  21. //
  22. // error 操作成功error为nil,非nil为错误信息。
  23. //
  24. func (bucket Bucket) DownloadFile(objectKey, filePath string, partSize int64, options ...Option) error {
  25. if partSize < 1 {
  26. return errors.New("oss: part size smaller than 1")
  27. }
  28. cpConf, err := getCpConfig(options, filePath)
  29. if err != nil {
  30. return err
  31. }
  32. uRange, err := getRangeConfig(options)
  33. if err != nil {
  34. return err
  35. }
  36. routines := getRoutines(options)
  37. if cpConf.IsEnable {
  38. return bucket.downloadFileWithCp(objectKey, filePath, partSize, options, cpConf.FilePath, routines, uRange)
  39. }
  40. return bucket.downloadFile(objectKey, filePath, partSize, options, routines, uRange)
  41. }
  42. // 获取下载范围
  43. func getRangeConfig(options []Option) (*unpackedRange, error) {
  44. rangeOpt, err := findOption(options, HTTPHeaderRange, nil)
  45. if err != nil || rangeOpt == nil {
  46. return nil, err
  47. }
  48. return parseRange(rangeOpt.(string))
  49. }
  50. // ----- 并发无断点的下载 -----
  51. // 工作协程参数
  52. type downloadWorkerArg struct {
  53. bucket *Bucket
  54. key string
  55. filePath string
  56. options []Option
  57. hook downloadPartHook
  58. enableCRC bool
  59. }
  60. // Hook用于测试
  61. type downloadPartHook func(part downloadPart) error
  62. var downloadPartHooker downloadPartHook = defaultDownloadPartHook
  63. func defaultDownloadPartHook(part downloadPart) error {
  64. return nil
  65. }
  66. // 默认ProgressListener,屏蔽GetObject的Options中ProgressListener
  67. type defaultDownloadProgressListener struct {
  68. }
  69. // ProgressChanged 静默处理
  70. func (listener *defaultDownloadProgressListener) ProgressChanged(event *ProgressEvent) {
  71. }
  72. // 工作协程
  73. func downloadWorker(id int, arg downloadWorkerArg, jobs <-chan downloadPart, results chan<- downloadPart, failed chan<- error, die <-chan bool) {
  74. for part := range jobs {
  75. if err := arg.hook(part); err != nil {
  76. failed <- err
  77. break
  78. }
  79. // resolve options
  80. r := Range(part.Start, part.End)
  81. p := Progress(&defaultDownloadProgressListener{})
  82. opts := make([]Option, len(arg.options)+2)
  83. // append orderly, can not be reversed!
  84. opts = append(opts, arg.options...)
  85. opts = append(opts, r, p)
  86. rd, err := arg.bucket.GetObject(arg.key, opts...)
  87. if err != nil {
  88. failed <- err
  89. break
  90. }
  91. defer rd.Close()
  92. var crcCalc hash.Hash64
  93. if arg.enableCRC {
  94. crcCalc = crc64.New(crcTable())
  95. contentLen := part.End - part.Start + 1
  96. rd = ioutil.NopCloser(TeeReader(rd, crcCalc, contentLen, nil, nil))
  97. }
  98. defer rd.Close()
  99. select {
  100. case <-die:
  101. return
  102. default:
  103. }
  104. fd, err := os.OpenFile(arg.filePath, os.O_WRONLY, FilePermMode)
  105. if err != nil {
  106. failed <- err
  107. break
  108. }
  109. _, err = fd.Seek(part.Start-part.Offset, os.SEEK_SET)
  110. if err != nil {
  111. fd.Close()
  112. failed <- err
  113. break
  114. }
  115. _, err = io.Copy(fd, rd)
  116. if err != nil {
  117. fd.Close()
  118. failed <- err
  119. break
  120. }
  121. if arg.enableCRC {
  122. part.CRC64 = crcCalc.Sum64()
  123. }
  124. fd.Close()
  125. results <- part
  126. }
  127. }
  128. // 调度协程
  129. func downloadScheduler(jobs chan downloadPart, parts []downloadPart) {
  130. for _, part := range parts {
  131. jobs <- part
  132. }
  133. close(jobs)
  134. }
  135. // 下载片
  136. type downloadPart struct {
  137. Index int // 片序号,从0开始编号
  138. Start int64 // 片起始位置
  139. End int64 // 片结束位置
  140. Offset int64 // 文件中的偏移位置
  141. CRC64 uint64 // 片的校验值
  142. }
  143. // 文件分片
  144. func getDownloadParts(bucket *Bucket, objectKey string, partSize int64, uRange *unpackedRange) ([]downloadPart, bool, uint64, error) {
  145. meta, err := bucket.GetObjectDetailedMeta(objectKey)
  146. if err != nil {
  147. return nil, false, 0, err
  148. }
  149. parts := []downloadPart{}
  150. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  151. if err != nil {
  152. return nil, false, 0, err
  153. }
  154. enableCRC := false
  155. crcVal := (uint64)(0)
  156. if bucket.getConfig().IsEnableCRC && meta.Get(HTTPHeaderOssCRC64) != "" {
  157. if uRange == nil || (!uRange.hasStart && !uRange.hasEnd) {
  158. enableCRC = true
  159. crcVal, _ = strconv.ParseUint(meta.Get(HTTPHeaderOssCRC64), 10, 0)
  160. }
  161. }
  162. part := downloadPart{}
  163. i := 0
  164. start, end := adjustRange(uRange, objectSize)
  165. for offset := start; offset < end; offset += partSize {
  166. part.Index = i
  167. part.Start = offset
  168. part.End = GetPartEnd(offset, end, partSize)
  169. part.Offset = start
  170. part.CRC64 = 0
  171. parts = append(parts, part)
  172. i++
  173. }
  174. return parts, enableCRC, crcVal, nil
  175. }
  176. // 文件大小
  177. func getObjectBytes(parts []downloadPart) int64 {
  178. var ob int64
  179. for _, part := range parts {
  180. ob += (part.End - part.Start + 1)
  181. }
  182. return ob
  183. }
  184. // 计算连续分片总的CRC
  185. func combineCRCInParts(dps []downloadPart) uint64 {
  186. if dps == nil || len(dps) == 0 {
  187. return 0
  188. }
  189. crc := dps[0].CRC64
  190. for i := 1; i < len(dps); i++ {
  191. crc = CRC64Combine(crc, dps[i].CRC64, (uint64)(dps[i].End-dps[i].Start+1))
  192. }
  193. return crc
  194. }
  195. // 并发无断点续传的下载
  196. func (bucket Bucket) downloadFile(objectKey, filePath string, partSize int64, options []Option, routines int, uRange *unpackedRange) error {
  197. tempFilePath := filePath + TempFileSuffix
  198. listener := getProgressListener(options)
  199. // 如果文件不存在则创建,存在不清空,下载分片会重写文件内容
  200. fd, err := os.OpenFile(tempFilePath, os.O_WRONLY|os.O_CREATE, FilePermMode)
  201. if err != nil {
  202. return err
  203. }
  204. fd.Close()
  205. // 分割文件
  206. parts, enableCRC, expectedCRC, err := getDownloadParts(&bucket, objectKey, partSize, uRange)
  207. if err != nil {
  208. return err
  209. }
  210. jobs := make(chan downloadPart, len(parts))
  211. results := make(chan downloadPart, len(parts))
  212. failed := make(chan error)
  213. die := make(chan bool)
  214. var completedBytes int64
  215. totalBytes := getObjectBytes(parts)
  216. event := newProgressEvent(TransferStartedEvent, 0, totalBytes)
  217. publishProgress(listener, event)
  218. // 启动工作协程
  219. arg := downloadWorkerArg{&bucket, objectKey, tempFilePath, options, downloadPartHooker, enableCRC}
  220. for w := 1; w <= routines; w++ {
  221. go downloadWorker(w, arg, jobs, results, failed, die)
  222. }
  223. // 并发上传分片
  224. go downloadScheduler(jobs, parts)
  225. // 等待分片下载完成
  226. completed := 0
  227. for completed < len(parts) {
  228. select {
  229. case part := <-results:
  230. completed++
  231. completedBytes += (part.End - part.Start + 1)
  232. parts[part.Index].CRC64 = part.CRC64
  233. event = newProgressEvent(TransferDataEvent, completedBytes, totalBytes)
  234. publishProgress(listener, event)
  235. case err := <-failed:
  236. close(die)
  237. event = newProgressEvent(TransferFailedEvent, completedBytes, totalBytes)
  238. publishProgress(listener, event)
  239. return err
  240. }
  241. if completed >= len(parts) {
  242. break
  243. }
  244. }
  245. event = newProgressEvent(TransferCompletedEvent, completedBytes, totalBytes)
  246. publishProgress(listener, event)
  247. if enableCRC {
  248. actualCRC := combineCRCInParts(parts)
  249. err = checkDownloadCRC(actualCRC, expectedCRC)
  250. if err != nil {
  251. return err
  252. }
  253. }
  254. return os.Rename(tempFilePath, filePath)
  255. }
  256. // ----- 并发有断点的下载 -----
  257. const downloadCpMagic = "92611BED-89E2-46B6-89E5-72F273D4B0A3"
  258. type downloadCheckpoint struct {
  259. Magic string // magic
  260. MD5 string // cp内容的MD5
  261. FilePath string // 本地文件
  262. Object string // key
  263. ObjStat objectStat // 文件状态
  264. Parts []downloadPart // 全部分片
  265. PartStat []bool // 分片下载是否完成
  266. Start int64 // 起点
  267. End int64 // 终点
  268. enableCRC bool // 是否有CRC校验
  269. CRC uint64 // CRC校验值
  270. }
  271. type objectStat struct {
  272. Size int64 // 大小
  273. LastModified string // 最后修改时间
  274. Etag string // etag
  275. }
  276. // CP数据是否有效,CP有效且Object没有更新时有效
  277. func (cp downloadCheckpoint) isValid(bucket *Bucket, objectKey string, uRange *unpackedRange) (bool, error) {
  278. // 比较CP的Magic及MD5
  279. cpb := cp
  280. cpb.MD5 = ""
  281. js, _ := json.Marshal(cpb)
  282. sum := md5.Sum(js)
  283. b64 := base64.StdEncoding.EncodeToString(sum[:])
  284. if cp.Magic != downloadCpMagic || b64 != cp.MD5 {
  285. return false, nil
  286. }
  287. // 确认object没有更新
  288. meta, err := bucket.GetObjectDetailedMeta(objectKey)
  289. if err != nil {
  290. return false, err
  291. }
  292. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  293. if err != nil {
  294. return false, err
  295. }
  296. // 比较Object的大小/最后修改时间/etag
  297. if cp.ObjStat.Size != objectSize ||
  298. cp.ObjStat.LastModified != meta.Get(HTTPHeaderLastModified) ||
  299. cp.ObjStat.Etag != meta.Get(HTTPHeaderEtag) {
  300. return false, nil
  301. }
  302. // 确认下载范围是否变化
  303. if uRange != nil {
  304. start, end := adjustRange(uRange, objectSize)
  305. if start != cp.Start || end != cp.End {
  306. return false, nil
  307. }
  308. }
  309. return true, nil
  310. }
  311. // 从文件中load
  312. func (cp *downloadCheckpoint) load(filePath string) error {
  313. contents, err := ioutil.ReadFile(filePath)
  314. if err != nil {
  315. return err
  316. }
  317. err = json.Unmarshal(contents, cp)
  318. return err
  319. }
  320. // dump到文件
  321. func (cp *downloadCheckpoint) dump(filePath string) error {
  322. bcp := *cp
  323. // 计算MD5
  324. bcp.MD5 = ""
  325. js, err := json.Marshal(bcp)
  326. if err != nil {
  327. return err
  328. }
  329. sum := md5.Sum(js)
  330. b64 := base64.StdEncoding.EncodeToString(sum[:])
  331. bcp.MD5 = b64
  332. // 序列化
  333. js, err = json.Marshal(bcp)
  334. if err != nil {
  335. return err
  336. }
  337. // dump
  338. return ioutil.WriteFile(filePath, js, FilePermMode)
  339. }
  340. // 未完成的分片
  341. func (cp downloadCheckpoint) todoParts() []downloadPart {
  342. dps := []downloadPart{}
  343. for i, ps := range cp.PartStat {
  344. if !ps {
  345. dps = append(dps, cp.Parts[i])
  346. }
  347. }
  348. return dps
  349. }
  350. // 完成的字节数
  351. func (cp downloadCheckpoint) getCompletedBytes() int64 {
  352. var completedBytes int64
  353. for i, part := range cp.Parts {
  354. if cp.PartStat[i] {
  355. completedBytes += (part.End - part.Start + 1)
  356. }
  357. }
  358. return completedBytes
  359. }
  360. // 初始化下载任务
  361. func (cp *downloadCheckpoint) prepare(bucket *Bucket, objectKey, filePath string, partSize int64, uRange *unpackedRange) error {
  362. // cp
  363. cp.Magic = downloadCpMagic
  364. cp.FilePath = filePath
  365. cp.Object = objectKey
  366. // object
  367. meta, err := bucket.GetObjectDetailedMeta(objectKey)
  368. if err != nil {
  369. return err
  370. }
  371. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  372. if err != nil {
  373. return err
  374. }
  375. cp.ObjStat.Size = objectSize
  376. cp.ObjStat.LastModified = meta.Get(HTTPHeaderLastModified)
  377. cp.ObjStat.Etag = meta.Get(HTTPHeaderEtag)
  378. // parts
  379. cp.Parts, cp.enableCRC, cp.CRC, err = getDownloadParts(bucket, objectKey, partSize, uRange)
  380. if err != nil {
  381. return err
  382. }
  383. cp.PartStat = make([]bool, len(cp.Parts))
  384. for i := range cp.PartStat {
  385. cp.PartStat[i] = false
  386. }
  387. return nil
  388. }
  389. func (cp *downloadCheckpoint) complete(cpFilePath, downFilepath string) error {
  390. os.Remove(cpFilePath)
  391. return os.Rename(downFilepath, cp.FilePath)
  392. }
  393. // 并发带断点的下载
  394. func (bucket Bucket) downloadFileWithCp(objectKey, filePath string, partSize int64, options []Option, cpFilePath string, routines int, uRange *unpackedRange) error {
  395. tempFilePath := filePath + TempFileSuffix
  396. listener := getProgressListener(options)
  397. // LOAD CP数据
  398. dcp := downloadCheckpoint{}
  399. err := dcp.load(cpFilePath)
  400. if err != nil {
  401. os.Remove(cpFilePath)
  402. }
  403. // LOAD出错或数据无效重新初始化下载
  404. valid, err := dcp.isValid(&bucket, objectKey, uRange)
  405. if err != nil || !valid {
  406. if err = dcp.prepare(&bucket, objectKey, filePath, partSize, uRange); err != nil {
  407. return err
  408. }
  409. os.Remove(cpFilePath)
  410. }
  411. // 如果文件不存在则创建,存在不清空,下载分片会重写文件内容
  412. fd, err := os.OpenFile(tempFilePath, os.O_WRONLY|os.O_CREATE, FilePermMode)
  413. if err != nil {
  414. return err
  415. }
  416. fd.Close()
  417. // 未完成的分片
  418. parts := dcp.todoParts()
  419. jobs := make(chan downloadPart, len(parts))
  420. results := make(chan downloadPart, len(parts))
  421. failed := make(chan error)
  422. die := make(chan bool)
  423. completedBytes := dcp.getCompletedBytes()
  424. event := newProgressEvent(TransferStartedEvent, completedBytes, dcp.ObjStat.Size)
  425. publishProgress(listener, event)
  426. // 启动工作协程
  427. arg := downloadWorkerArg{&bucket, objectKey, tempFilePath, options, downloadPartHooker, dcp.enableCRC}
  428. for w := 1; w <= routines; w++ {
  429. go downloadWorker(w, arg, jobs, results, failed, die)
  430. }
  431. // 并发下载分片
  432. go downloadScheduler(jobs, parts)
  433. // 等待分片下载完成
  434. completed := 0
  435. for completed < len(parts) {
  436. select {
  437. case part := <-results:
  438. completed++
  439. dcp.PartStat[part.Index] = true
  440. dcp.Parts[part.Index].CRC64 = part.CRC64
  441. dcp.dump(cpFilePath)
  442. completedBytes += (part.End - part.Start + 1)
  443. event = newProgressEvent(TransferDataEvent, completedBytes, dcp.ObjStat.Size)
  444. publishProgress(listener, event)
  445. case err := <-failed:
  446. close(die)
  447. event = newProgressEvent(TransferFailedEvent, completedBytes, dcp.ObjStat.Size)
  448. publishProgress(listener, event)
  449. return err
  450. }
  451. if completed >= len(parts) {
  452. break
  453. }
  454. }
  455. event = newProgressEvent(TransferCompletedEvent, completedBytes, dcp.ObjStat.Size)
  456. publishProgress(listener, event)
  457. if dcp.enableCRC {
  458. actualCRC := combineCRCInParts(dcp.Parts)
  459. err = checkDownloadCRC(actualCRC, dcp.CRC)
  460. if err != nil {
  461. return err
  462. }
  463. }
  464. return dcp.complete(cpFilePath, tempFilePath)
  465. }