download.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464
  1. package oss
  2. import (
  3. "crypto/md5"
  4. "encoding/base64"
  5. "encoding/json"
  6. "errors"
  7. "io"
  8. "io/ioutil"
  9. "os"
  10. "strconv"
  11. )
  12. //
  13. // DownloadFile 分片下载文件
  14. //
  15. // objectKey object key。
  16. // filePath 本地文件。objectKey下载到文件。
  17. // partSize 本次上传文件片的大小,字节数。比如100 * 1024为每片100KB。
  18. // options Object的属性限制项。详见GetObject。
  19. //
  20. // error 操作成功error为nil,非nil为错误信息。
  21. //
  22. func (bucket Bucket) DownloadFile(objectKey, filePath string, partSize int64, options ...Option) error {
  23. if partSize < 1 || partSize > MaxPartSize {
  24. return errors.New("oss: part size invalid range (1, 5GB]")
  25. }
  26. cpConf, err := getCpConfig(options, filePath)
  27. if err != nil {
  28. return err
  29. }
  30. routines := getRoutines(options)
  31. if cpConf.IsEnable {
  32. return bucket.downloadFileWithCp(objectKey, filePath, partSize, options, cpConf.FilePath, routines)
  33. }
  34. return bucket.downloadFile(objectKey, filePath, partSize, options, routines)
  35. }
  36. // ----- 并发无断点的下载 -----
  37. // 工作协程参数
  38. type downloadWorkerArg struct {
  39. bucket *Bucket
  40. key string
  41. filePath string
  42. options []Option
  43. hook downloadPartHook
  44. }
  45. // Hook用于测试
  46. type downloadPartHook func(part downloadPart) error
  47. var downloadPartHooker downloadPartHook = defaultDownloadPartHook
  48. func defaultDownloadPartHook(part downloadPart) error {
  49. return nil
  50. }
  51. // 默认ProgressListener,屏蔽GetObject的Options中ProgressListener
  52. type defaultDownloadProgressListener struct {
  53. }
  54. // ProgressChanged 静默处理
  55. func (listener *defaultDownloadProgressListener) ProgressChanged(event *ProgressEvent) {
  56. }
  57. // 工作协程
  58. func downloadWorker(id int, arg downloadWorkerArg, jobs <-chan downloadPart, results chan<- downloadPart, failed chan<- error, die <-chan bool) {
  59. for part := range jobs {
  60. if err := arg.hook(part); err != nil {
  61. failed <- err
  62. break
  63. }
  64. // resolve options
  65. r := Range(part.Start, part.End)
  66. p := Progress(&defaultDownloadProgressListener{})
  67. opts := make([]Option, len(arg.options)+2)
  68. // append orderly, can not be reversed!
  69. opts = append(opts, arg.options...)
  70. opts = append(opts, r, p)
  71. rd, err := arg.bucket.GetObject(arg.key, opts...)
  72. if err != nil {
  73. failed <- err
  74. break
  75. }
  76. defer rd.Close()
  77. select {
  78. case <-die:
  79. return
  80. default:
  81. }
  82. fd, err := os.OpenFile(arg.filePath, os.O_WRONLY, FilePermMode)
  83. if err != nil {
  84. failed <- err
  85. break
  86. }
  87. defer fd.Close()
  88. _, err = fd.Seek(part.Start, os.SEEK_SET)
  89. if err != nil {
  90. failed <- err
  91. break
  92. }
  93. _, err = io.Copy(fd, rd)
  94. if err != nil {
  95. failed <- err
  96. break
  97. }
  98. results <- part
  99. }
  100. }
  101. // 调度协程
  102. func downloadScheduler(jobs chan downloadPart, parts []downloadPart) {
  103. for _, part := range parts {
  104. jobs <- part
  105. }
  106. close(jobs)
  107. }
  108. // 下载片
  109. type downloadPart struct {
  110. Index int // 片序号,从0开始编号
  111. Start int64 // 片起始位置
  112. End int64 // 片结束位置
  113. }
  114. // 文件分片
  115. func getDownloadParts(bucket *Bucket, objectKey string, partSize int64) ([]downloadPart, error) {
  116. meta, err := bucket.GetObjectDetailedMeta(objectKey)
  117. if err != nil {
  118. return nil, err
  119. }
  120. parts := []downloadPart{}
  121. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  122. if err != nil {
  123. return nil, err
  124. }
  125. part := downloadPart{}
  126. i := 0
  127. for offset := int64(0); offset < objectSize; offset += partSize {
  128. part.Index = i
  129. part.Start = offset
  130. part.End = GetPartEnd(offset, objectSize, partSize)
  131. parts = append(parts, part)
  132. i++
  133. }
  134. return parts, nil
  135. }
  136. // 文件大小
  137. func getObjectBytes(parts []downloadPart) int64 {
  138. var ob int64
  139. for _, part := range parts {
  140. ob += (part.End - part.Start + 1)
  141. }
  142. return ob
  143. }
  144. // 并发无断点续传的下载
  145. func (bucket Bucket) downloadFile(objectKey, filePath string, partSize int64, options []Option, routines int) error {
  146. tempFilePath := filePath + TempFileSuffix
  147. listener := getProgressListener(options)
  148. // 如果文件不存在则创建,存在不清空,下载分片会重写文件内容
  149. fd, err := os.OpenFile(tempFilePath, os.O_WRONLY|os.O_CREATE, FilePermMode)
  150. if err != nil {
  151. return err
  152. }
  153. fd.Close()
  154. // 分割文件
  155. parts, err := getDownloadParts(&bucket, objectKey, partSize)
  156. if err != nil {
  157. return err
  158. }
  159. jobs := make(chan downloadPart, len(parts))
  160. results := make(chan downloadPart, len(parts))
  161. failed := make(chan error)
  162. die := make(chan bool)
  163. var completedBytes int64
  164. totalBytes := getObjectBytes(parts)
  165. event := newProgressEvent(TransferStartedEvent, 0, totalBytes)
  166. publishProgress(listener, event)
  167. // 启动工作协程
  168. arg := downloadWorkerArg{&bucket, objectKey, tempFilePath, options, downloadPartHooker}
  169. for w := 1; w <= routines; w++ {
  170. go downloadWorker(w, arg, jobs, results, failed, die)
  171. }
  172. // 并发上传分片
  173. go downloadScheduler(jobs, parts)
  174. // 等待分片下载完成
  175. completed := 0
  176. ps := make([]downloadPart, len(parts))
  177. for completed < len(parts) {
  178. select {
  179. case part := <-results:
  180. completed++
  181. ps[part.Index] = part
  182. completedBytes += (part.End - part.Start + 1)
  183. event = newProgressEvent(TransferDataEvent, completedBytes, totalBytes)
  184. publishProgress(listener, event)
  185. case err := <-failed:
  186. close(die)
  187. event = newProgressEvent(TransferFailedEvent, completedBytes, totalBytes)
  188. publishProgress(listener, event)
  189. return err
  190. }
  191. if completed >= len(parts) {
  192. break
  193. }
  194. }
  195. event = newProgressEvent(TransferCompletedEvent, completedBytes, totalBytes)
  196. publishProgress(listener, event)
  197. return os.Rename(tempFilePath, filePath)
  198. }
  199. // ----- 并发有断点的下载 -----
  200. const downloadCpMagic = "92611BED-89E2-46B6-89E5-72F273D4B0A3"
  201. type downloadCheckpoint struct {
  202. Magic string // magic
  203. MD5 string // cp内容的MD5
  204. FilePath string // 本地文件
  205. Object string // key
  206. ObjStat objectStat // 文件状态
  207. Parts []downloadPart // 全部分片
  208. PartStat []bool // 分片下载是否完成
  209. }
  210. type objectStat struct {
  211. Size int64 // 大小
  212. LastModified string // 最后修改时间
  213. Etag string // etag
  214. }
  215. // CP数据是否有效,CP有效且Object没有更新时有效
  216. func (cp downloadCheckpoint) isValid(bucket *Bucket, objectKey string) (bool, error) {
  217. // 比较CP的Magic及MD5
  218. cpb := cp
  219. cpb.MD5 = ""
  220. js, _ := json.Marshal(cpb)
  221. sum := md5.Sum(js)
  222. b64 := base64.StdEncoding.EncodeToString(sum[:])
  223. if cp.Magic != downloadCpMagic || b64 != cp.MD5 {
  224. return false, nil
  225. }
  226. // 确认object没有更新
  227. meta, err := bucket.GetObjectDetailedMeta(objectKey)
  228. if err != nil {
  229. return false, err
  230. }
  231. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  232. if err != nil {
  233. return false, err
  234. }
  235. // 比较Object的大小/最后修改时间/etag
  236. if cp.ObjStat.Size != objectSize ||
  237. cp.ObjStat.LastModified != meta.Get(HTTPHeaderLastModified) ||
  238. cp.ObjStat.Etag != meta.Get(HTTPHeaderEtag) {
  239. return false, nil
  240. }
  241. return true, nil
  242. }
  243. // 从文件中load
  244. func (cp *downloadCheckpoint) load(filePath string) error {
  245. contents, err := ioutil.ReadFile(filePath)
  246. if err != nil {
  247. return err
  248. }
  249. err = json.Unmarshal(contents, cp)
  250. return err
  251. }
  252. // dump到文件
  253. func (cp *downloadCheckpoint) dump(filePath string) error {
  254. bcp := *cp
  255. // 计算MD5
  256. bcp.MD5 = ""
  257. js, err := json.Marshal(bcp)
  258. if err != nil {
  259. return err
  260. }
  261. sum := md5.Sum(js)
  262. b64 := base64.StdEncoding.EncodeToString(sum[:])
  263. bcp.MD5 = b64
  264. // 序列化
  265. js, err = json.Marshal(bcp)
  266. if err != nil {
  267. return err
  268. }
  269. // dump
  270. return ioutil.WriteFile(filePath, js, FilePermMode)
  271. }
  272. // 未完成的分片
  273. func (cp downloadCheckpoint) todoParts() []downloadPart {
  274. dps := []downloadPart{}
  275. for i, ps := range cp.PartStat {
  276. if !ps {
  277. dps = append(dps, cp.Parts[i])
  278. }
  279. }
  280. return dps
  281. }
  282. // 完成的字节数
  283. func (cp downloadCheckpoint) getCompletedBytes() int64 {
  284. var completedBytes int64
  285. for i, part := range cp.Parts {
  286. if cp.PartStat[i] {
  287. completedBytes += (part.End - part.Start + 1)
  288. }
  289. }
  290. return completedBytes
  291. }
  292. // 初始化下载任务
  293. func (cp *downloadCheckpoint) prepare(bucket *Bucket, objectKey, filePath string, partSize int64) error {
  294. // cp
  295. cp.Magic = downloadCpMagic
  296. cp.FilePath = filePath
  297. cp.Object = objectKey
  298. // object
  299. meta, err := bucket.GetObjectDetailedMeta(objectKey)
  300. if err != nil {
  301. return err
  302. }
  303. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  304. if err != nil {
  305. return err
  306. }
  307. cp.ObjStat.Size = objectSize
  308. cp.ObjStat.LastModified = meta.Get(HTTPHeaderLastModified)
  309. cp.ObjStat.Etag = meta.Get(HTTPHeaderEtag)
  310. // parts
  311. cp.Parts, err = getDownloadParts(bucket, objectKey, partSize)
  312. if err != nil {
  313. return err
  314. }
  315. cp.PartStat = make([]bool, len(cp.Parts))
  316. for i := range cp.PartStat {
  317. cp.PartStat[i] = false
  318. }
  319. return nil
  320. }
  321. func (cp *downloadCheckpoint) complete(cpFilePath, downFilepath string) error {
  322. os.Remove(cpFilePath)
  323. return os.Rename(downFilepath, cp.FilePath)
  324. }
  325. // 并发带断点的下载
  326. func (bucket Bucket) downloadFileWithCp(objectKey, filePath string, partSize int64, options []Option, cpFilePath string, routines int) error {
  327. tempFilePath := filePath + TempFileSuffix
  328. listener := getProgressListener(options)
  329. // LOAD CP数据
  330. dcp := downloadCheckpoint{}
  331. err := dcp.load(cpFilePath)
  332. if err != nil {
  333. os.Remove(cpFilePath)
  334. }
  335. // LOAD出错或数据无效重新初始化下载
  336. valid, err := dcp.isValid(&bucket, objectKey)
  337. if err != nil || !valid {
  338. if err = dcp.prepare(&bucket, objectKey, filePath, partSize); err != nil {
  339. return err
  340. }
  341. os.Remove(cpFilePath)
  342. }
  343. // 如果文件不存在则创建,存在不清空,下载分片会重写文件内容
  344. fd, err := os.OpenFile(tempFilePath, os.O_WRONLY|os.O_CREATE, FilePermMode)
  345. if err != nil {
  346. return err
  347. }
  348. fd.Close()
  349. // 未完成的分片
  350. parts := dcp.todoParts()
  351. jobs := make(chan downloadPart, len(parts))
  352. results := make(chan downloadPart, len(parts))
  353. failed := make(chan error)
  354. die := make(chan bool)
  355. completedBytes := dcp.getCompletedBytes()
  356. event := newProgressEvent(TransferStartedEvent, completedBytes, dcp.ObjStat.Size)
  357. publishProgress(listener, event)
  358. // 启动工作协程
  359. arg := downloadWorkerArg{&bucket, objectKey, tempFilePath, options, downloadPartHooker}
  360. for w := 1; w <= routines; w++ {
  361. go downloadWorker(w, arg, jobs, results, failed, die)
  362. }
  363. // 并发下载分片
  364. go downloadScheduler(jobs, parts)
  365. // 等待分片下载完成
  366. completed := 0
  367. for completed < len(parts) {
  368. select {
  369. case part := <-results:
  370. completed++
  371. dcp.PartStat[part.Index] = true
  372. dcp.dump(cpFilePath)
  373. completedBytes += (part.End - part.Start + 1)
  374. event = newProgressEvent(TransferDataEvent, completedBytes, dcp.ObjStat.Size)
  375. publishProgress(listener, event)
  376. case err := <-failed:
  377. close(die)
  378. event = newProgressEvent(TransferFailedEvent, completedBytes, dcp.ObjStat.Size)
  379. publishProgress(listener, event)
  380. return err
  381. }
  382. if completed >= len(parts) {
  383. break
  384. }
  385. }
  386. event = newProgressEvent(TransferCompletedEvent, completedBytes, dcp.ObjStat.Size)
  387. publishProgress(listener, event)
  388. return dcp.complete(cpFilePath, tempFilePath)
  389. }