download.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493
  1. package oss
  2. import (
  3. "crypto/md5"
  4. "encoding/base64"
  5. "encoding/json"
  6. "errors"
  7. "io"
  8. "io/ioutil"
  9. "os"
  10. "strconv"
  11. )
  12. //
  13. // DownloadFile 分片下载文件
  14. //
  15. // objectKey object key。
  16. // filePath 本地文件。objectKey下载到文件。
  17. // partSize 本次上传文件片的大小,字节数。比如100 * 1024为每片100KB。
  18. // options Object的属性限制项。详见GetObject。
  19. //
  20. // error 操作成功error为nil,非nil为错误信息。
  21. //
  22. func (bucket Bucket) DownloadFile(objectKey, filePath string, partSize int64, options ...Option) error {
  23. if partSize < 1 {
  24. return errors.New("oss: part size smaller than 1.")
  25. }
  26. cpConf, err := getCpConfig(options, filePath)
  27. if err != nil {
  28. return err
  29. }
  30. uRange, err := getRangeConfig(options)
  31. if err != nil {
  32. return err
  33. }
  34. routines := getRoutines(options)
  35. if cpConf.IsEnable {
  36. return bucket.downloadFileWithCp(objectKey, filePath, partSize, options, cpConf.FilePath, routines, uRange)
  37. }
  38. return bucket.downloadFile(objectKey, filePath, partSize, options, routines, uRange)
  39. }
  40. // 获取下载范围
  41. func getRangeConfig(options []Option) (*unpackedRange, error) {
  42. rangeOpt, err := findOption(options, HTTPHeaderRange, nil)
  43. if err != nil || rangeOpt == nil {
  44. return nil, err
  45. }
  46. return parseRange(rangeOpt.(string))
  47. }
  48. // ----- 并发无断点的下载 -----
  49. // 工作协程参数
  50. type downloadWorkerArg struct {
  51. bucket *Bucket
  52. key string
  53. filePath string
  54. options []Option
  55. hook downloadPartHook
  56. }
  57. // Hook用于测试
  58. type downloadPartHook func(part downloadPart) error
  59. var downloadPartHooker downloadPartHook = defaultDownloadPartHook
  60. func defaultDownloadPartHook(part downloadPart) error {
  61. return nil
  62. }
  63. // 默认ProgressListener,屏蔽GetObject的Options中ProgressListener
  64. type defaultDownloadProgressListener struct {
  65. }
  66. // ProgressChanged 静默处理
  67. func (listener *defaultDownloadProgressListener) ProgressChanged(event *ProgressEvent) {
  68. }
  69. // 工作协程
  70. func downloadWorker(id int, arg downloadWorkerArg, jobs <-chan downloadPart, results chan<- downloadPart, failed chan<- error, die <-chan bool) {
  71. for part := range jobs {
  72. if err := arg.hook(part); err != nil {
  73. failed <- err
  74. break
  75. }
  76. // resolve options
  77. r := Range(part.Start, part.End)
  78. p := Progress(&defaultDownloadProgressListener{})
  79. opts := make([]Option, len(arg.options)+2)
  80. // append orderly, can not be reversed!
  81. opts = append(opts, arg.options...)
  82. opts = append(opts, r, p)
  83. rd, err := arg.bucket.GetObject(arg.key, opts...)
  84. if err != nil {
  85. failed <- err
  86. break
  87. }
  88. defer rd.Close()
  89. select {
  90. case <-die:
  91. return
  92. default:
  93. }
  94. fd, err := os.OpenFile(arg.filePath, os.O_WRONLY, FilePermMode)
  95. if err != nil {
  96. failed <- err
  97. break
  98. }
  99. _, err = fd.Seek(part.Start-part.Offset, os.SEEK_SET)
  100. if err != nil {
  101. fd.Close()
  102. failed <- err
  103. break
  104. }
  105. _, err = io.Copy(fd, rd)
  106. if err != nil {
  107. fd.Close()
  108. failed <- err
  109. break
  110. }
  111. fd.Close()
  112. results <- part
  113. }
  114. }
  115. // 调度协程
  116. func downloadScheduler(jobs chan downloadPart, parts []downloadPart) {
  117. for _, part := range parts {
  118. jobs <- part
  119. }
  120. close(jobs)
  121. }
  122. // 下载片
  123. type downloadPart struct {
  124. Index int // 片序号,从0开始编号
  125. Start int64 // 片起始位置
  126. End int64 // 片结束位置
  127. Offset int64 // 偏移位置
  128. }
  129. // 文件分片
  130. func getDownloadParts(bucket *Bucket, objectKey string, partSize int64, uRange *unpackedRange) ([]downloadPart, error) {
  131. meta, err := bucket.GetObjectDetailedMeta(objectKey)
  132. if err != nil {
  133. return nil, err
  134. }
  135. parts := []downloadPart{}
  136. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  137. if err != nil {
  138. return nil, err
  139. }
  140. part := downloadPart{}
  141. i := 0
  142. start, end := adjustRange(uRange, objectSize)
  143. for offset := start; offset < end; offset += partSize {
  144. part.Index = i
  145. part.Start = offset
  146. part.End = GetPartEnd(offset, end, partSize)
  147. part.Offset = start
  148. parts = append(parts, part)
  149. i++
  150. }
  151. return parts, nil
  152. }
  153. // 文件大小
  154. func getObjectBytes(parts []downloadPart) int64 {
  155. var ob int64
  156. for _, part := range parts {
  157. ob += (part.End - part.Start + 1)
  158. }
  159. return ob
  160. }
  161. // 并发无断点续传的下载
  162. func (bucket Bucket) downloadFile(objectKey, filePath string, partSize int64, options []Option, routines int, uRange *unpackedRange) error {
  163. tempFilePath := filePath + TempFileSuffix
  164. listener := getProgressListener(options)
  165. // 如果文件不存在则创建,存在不清空,下载分片会重写文件内容
  166. fd, err := os.OpenFile(tempFilePath, os.O_WRONLY|os.O_CREATE, FilePermMode)
  167. if err != nil {
  168. return err
  169. }
  170. fd.Close()
  171. // 分割文件
  172. parts, err := getDownloadParts(&bucket, objectKey, partSize, uRange)
  173. if err != nil {
  174. return err
  175. }
  176. jobs := make(chan downloadPart, len(parts))
  177. results := make(chan downloadPart, len(parts))
  178. failed := make(chan error)
  179. die := make(chan bool)
  180. var completedBytes int64
  181. totalBytes := getObjectBytes(parts)
  182. event := newProgressEvent(TransferStartedEvent, 0, totalBytes)
  183. publishProgress(listener, event)
  184. // 启动工作协程
  185. arg := downloadWorkerArg{&bucket, objectKey, tempFilePath, options, downloadPartHooker}
  186. for w := 1; w <= routines; w++ {
  187. go downloadWorker(w, arg, jobs, results, failed, die)
  188. }
  189. // 并发上传分片
  190. go downloadScheduler(jobs, parts)
  191. // 等待分片下载完成
  192. completed := 0
  193. ps := make([]downloadPart, len(parts))
  194. for completed < len(parts) {
  195. select {
  196. case part := <-results:
  197. completed++
  198. ps[part.Index] = part
  199. completedBytes += (part.End - part.Start + 1)
  200. event = newProgressEvent(TransferDataEvent, completedBytes, totalBytes)
  201. publishProgress(listener, event)
  202. case err := <-failed:
  203. close(die)
  204. event = newProgressEvent(TransferFailedEvent, completedBytes, totalBytes)
  205. publishProgress(listener, event)
  206. return err
  207. }
  208. if completed >= len(parts) {
  209. break
  210. }
  211. }
  212. event = newProgressEvent(TransferCompletedEvent, completedBytes, totalBytes)
  213. publishProgress(listener, event)
  214. return os.Rename(tempFilePath, filePath)
  215. }
  216. // ----- 并发有断点的下载 -----
  217. const downloadCpMagic = "92611BED-89E2-46B6-89E5-72F273D4B0A3"
  218. type downloadCheckpoint struct {
  219. Magic string // magic
  220. MD5 string // cp内容的MD5
  221. FilePath string // 本地文件
  222. Object string // key
  223. ObjStat objectStat // 文件状态
  224. Parts []downloadPart // 全部分片
  225. PartStat []bool // 分片下载是否完成
  226. Start int64 // 起点
  227. End int64 // 终点
  228. }
  229. type objectStat struct {
  230. Size int64 // 大小
  231. LastModified string // 最后修改时间
  232. Etag string // etag
  233. }
  234. // CP数据是否有效,CP有效且Object没有更新时有效
  235. func (cp downloadCheckpoint) isValid(bucket *Bucket, objectKey string, uRange *unpackedRange) (bool, error) {
  236. // 比较CP的Magic及MD5
  237. cpb := cp
  238. cpb.MD5 = ""
  239. js, _ := json.Marshal(cpb)
  240. sum := md5.Sum(js)
  241. b64 := base64.StdEncoding.EncodeToString(sum[:])
  242. if cp.Magic != downloadCpMagic || b64 != cp.MD5 {
  243. return false, nil
  244. }
  245. // 确认object没有更新
  246. meta, err := bucket.GetObjectDetailedMeta(objectKey)
  247. if err != nil {
  248. return false, err
  249. }
  250. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  251. if err != nil {
  252. return false, err
  253. }
  254. // 比较Object的大小/最后修改时间/etag
  255. if cp.ObjStat.Size != objectSize ||
  256. cp.ObjStat.LastModified != meta.Get(HTTPHeaderLastModified) ||
  257. cp.ObjStat.Etag != meta.Get(HTTPHeaderEtag) {
  258. return false, nil
  259. }
  260. // 确认下载范围是否变化
  261. if uRange != nil {
  262. start, end := adjustRange(uRange, objectSize)
  263. if start != cp.Start || end != cp.End {
  264. return false, nil
  265. }
  266. }
  267. return true, nil
  268. }
  269. // 从文件中load
  270. func (cp *downloadCheckpoint) load(filePath string) error {
  271. contents, err := ioutil.ReadFile(filePath)
  272. if err != nil {
  273. return err
  274. }
  275. err = json.Unmarshal(contents, cp)
  276. return err
  277. }
  278. // dump到文件
  279. func (cp *downloadCheckpoint) dump(filePath string) error {
  280. bcp := *cp
  281. // 计算MD5
  282. bcp.MD5 = ""
  283. js, err := json.Marshal(bcp)
  284. if err != nil {
  285. return err
  286. }
  287. sum := md5.Sum(js)
  288. b64 := base64.StdEncoding.EncodeToString(sum[:])
  289. bcp.MD5 = b64
  290. // 序列化
  291. js, err = json.Marshal(bcp)
  292. if err != nil {
  293. return err
  294. }
  295. // dump
  296. return ioutil.WriteFile(filePath, js, FilePermMode)
  297. }
  298. // 未完成的分片
  299. func (cp downloadCheckpoint) todoParts() []downloadPart {
  300. dps := []downloadPart{}
  301. for i, ps := range cp.PartStat {
  302. if !ps {
  303. dps = append(dps, cp.Parts[i])
  304. }
  305. }
  306. return dps
  307. }
  308. // 完成的字节数
  309. func (cp downloadCheckpoint) getCompletedBytes() int64 {
  310. var completedBytes int64
  311. for i, part := range cp.Parts {
  312. if cp.PartStat[i] {
  313. completedBytes += (part.End - part.Start + 1)
  314. }
  315. }
  316. return completedBytes
  317. }
  318. // 初始化下载任务
  319. func (cp *downloadCheckpoint) prepare(bucket *Bucket, objectKey, filePath string, partSize int64, uRange *unpackedRange) error {
  320. // cp
  321. cp.Magic = downloadCpMagic
  322. cp.FilePath = filePath
  323. cp.Object = objectKey
  324. // object
  325. meta, err := bucket.GetObjectDetailedMeta(objectKey)
  326. if err != nil {
  327. return err
  328. }
  329. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  330. if err != nil {
  331. return err
  332. }
  333. cp.ObjStat.Size = objectSize
  334. cp.ObjStat.LastModified = meta.Get(HTTPHeaderLastModified)
  335. cp.ObjStat.Etag = meta.Get(HTTPHeaderEtag)
  336. // parts
  337. cp.Parts, err = getDownloadParts(bucket, objectKey, partSize, uRange)
  338. if err != nil {
  339. return err
  340. }
  341. cp.PartStat = make([]bool, len(cp.Parts))
  342. for i := range cp.PartStat {
  343. cp.PartStat[i] = false
  344. }
  345. return nil
  346. }
  347. func (cp *downloadCheckpoint) complete(cpFilePath, downFilepath string) error {
  348. os.Remove(cpFilePath)
  349. return os.Rename(downFilepath, cp.FilePath)
  350. }
  351. // 并发带断点的下载
  352. func (bucket Bucket) downloadFileWithCp(objectKey, filePath string, partSize int64, options []Option, cpFilePath string, routines int, uRange *unpackedRange) error {
  353. tempFilePath := filePath + TempFileSuffix
  354. listener := getProgressListener(options)
  355. // LOAD CP数据
  356. dcp := downloadCheckpoint{}
  357. err := dcp.load(cpFilePath)
  358. if err != nil {
  359. os.Remove(cpFilePath)
  360. }
  361. // LOAD出错或数据无效重新初始化下载
  362. valid, err := dcp.isValid(&bucket, objectKey, uRange)
  363. if err != nil || !valid {
  364. if err = dcp.prepare(&bucket, objectKey, filePath, partSize, uRange); err != nil {
  365. return err
  366. }
  367. os.Remove(cpFilePath)
  368. }
  369. // 如果文件不存在则创建,存在不清空,下载分片会重写文件内容
  370. fd, err := os.OpenFile(tempFilePath, os.O_WRONLY|os.O_CREATE, FilePermMode)
  371. if err != nil {
  372. return err
  373. }
  374. fd.Close()
  375. // 未完成的分片
  376. parts := dcp.todoParts()
  377. jobs := make(chan downloadPart, len(parts))
  378. results := make(chan downloadPart, len(parts))
  379. failed := make(chan error)
  380. die := make(chan bool)
  381. completedBytes := dcp.getCompletedBytes()
  382. event := newProgressEvent(TransferStartedEvent, completedBytes, dcp.ObjStat.Size)
  383. publishProgress(listener, event)
  384. // 启动工作协程
  385. arg := downloadWorkerArg{&bucket, objectKey, tempFilePath, options, downloadPartHooker}
  386. for w := 1; w <= routines; w++ {
  387. go downloadWorker(w, arg, jobs, results, failed, die)
  388. }
  389. // 并发下载分片
  390. go downloadScheduler(jobs, parts)
  391. // 等待分片下载完成
  392. completed := 0
  393. for completed < len(parts) {
  394. select {
  395. case part := <-results:
  396. completed++
  397. dcp.PartStat[part.Index] = true
  398. dcp.dump(cpFilePath)
  399. completedBytes += (part.End - part.Start + 1)
  400. event = newProgressEvent(TransferDataEvent, completedBytes, dcp.ObjStat.Size)
  401. publishProgress(listener, event)
  402. case err := <-failed:
  403. close(die)
  404. event = newProgressEvent(TransferFailedEvent, completedBytes, dcp.ObjStat.Size)
  405. publishProgress(listener, event)
  406. return err
  407. }
  408. if completed >= len(parts) {
  409. break
  410. }
  411. }
  412. event = newProgressEvent(TransferCompletedEvent, completedBytes, dcp.ObjStat.Size)
  413. publishProgress(listener, event)
  414. return dcp.complete(cpFilePath, tempFilePath)
  415. }