download.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399
  1. package oss
  2. import (
  3. "crypto/md5"
  4. "encoding/base64"
  5. "encoding/json"
  6. "errors"
  7. "io"
  8. "io/ioutil"
  9. "os"
  10. "strconv"
  11. )
  12. //
  13. // DownloadFile 分片下载文件
  14. //
  15. // objectKey object key。
  16. // filePath 本地文件。objectKey下载到文件。
  17. // partSize 本次上传文件片的大小,字节数。比如100 * 1024为每片100KB。
  18. // options Object的属性限制项。详见GetObject。
  19. //
  20. // error 操作成功error为nil,非nil为错误信息。
  21. //
  22. func (bucket Bucket) DownloadFile(objectKey, filePath string, partSize int64, options ...Option) error {
  23. if partSize < 1 || partSize > MaxPartSize {
  24. return errors.New("oss: part size invalid range (1, 5GB]")
  25. }
  26. cpConf, err := getCpConfig(options, filePath)
  27. if err != nil {
  28. return err
  29. }
  30. routines := getRoutines(options)
  31. if cpConf.IsEnable {
  32. return bucket.downloadFileWithCp(objectKey, filePath, partSize, options, cpConf.FilePath, routines)
  33. }
  34. return bucket.downloadFile(objectKey, filePath, partSize, options, routines)
  35. }
  36. // ----- 并发无断点的下载 -----
  37. // 工作协程参数
  38. type downloadWorkerArg struct {
  39. bucket *Bucket
  40. key string
  41. filePath string
  42. options []Option
  43. hook downloadPartHook
  44. }
  45. // Hook用于测试
  46. type downloadPartHook func(part downloadPart) error
  47. var downloadPartHooker downloadPartHook = defaultDownloadPartHook
  48. func defaultDownloadPartHook(part downloadPart) error {
  49. return nil
  50. }
  51. // 工作协程
  52. func downloadWorker(id int, arg downloadWorkerArg, jobs <-chan downloadPart, results chan<- downloadPart, failed chan<- error, die <- chan bool) {
  53. for part := range jobs {
  54. if err := arg.hook(part); err != nil {
  55. failed <- err
  56. break
  57. }
  58. opt := Range(part.Start, part.End)
  59. opts := append(arg.options, opt)
  60. rd, err := arg.bucket.GetObject(arg.key, opts...)
  61. if err != nil {
  62. failed <- err
  63. break
  64. }
  65. defer rd.Close()
  66. select {
  67. case <-die:
  68. return
  69. default:
  70. }
  71. fd, err := os.OpenFile(arg.filePath, os.O_WRONLY, 0660)
  72. if err != nil {
  73. failed <- err
  74. break
  75. }
  76. defer fd.Close()
  77. _, err = fd.Seek(part.Start, os.SEEK_SET)
  78. if err != nil {
  79. failed <- err
  80. break
  81. }
  82. _, err = io.Copy(fd, rd)
  83. if err != nil {
  84. failed <- err
  85. break
  86. }
  87. results <- part
  88. }
  89. }
  90. // 调度协程
  91. func downloadScheduler(jobs chan downloadPart, parts []downloadPart) {
  92. for _, part := range parts {
  93. jobs <- part
  94. }
  95. close(jobs)
  96. }
  97. // 下载片
  98. type downloadPart struct {
  99. Index int // 片序号,从0开始编号
  100. Start int64 // 片起始位置
  101. End int64 // 片结束位置
  102. }
  103. // 文件分片
  104. func getDownloadParts(bucket *Bucket, objectKey string, partSize int64) ([]downloadPart, error) {
  105. meta, err := bucket.GetObjectDetailedMeta(objectKey)
  106. if err != nil {
  107. return nil, err
  108. }
  109. parts := []downloadPart{}
  110. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  111. if err != nil {
  112. return nil, err
  113. }
  114. part := downloadPart{}
  115. i := 0
  116. for offset := int64(0); offset < objectSize; offset += partSize {
  117. part.Index = i
  118. part.Start = offset
  119. part.End = GetPartEnd(offset, objectSize, partSize)
  120. parts = append(parts, part)
  121. i++
  122. }
  123. return parts, nil
  124. }
  125. // 并发无断点续传的下载
  126. func (bucket Bucket) downloadFile(objectKey, filePath string, partSize int64, options []Option, routines int) error {
  127. // 如果文件不存在则创建,存在不清空,下载分片会重写文件内容
  128. fd, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE, 0660)
  129. if err != nil {
  130. return err
  131. }
  132. fd.Close()
  133. // 分割文件
  134. parts, err := getDownloadParts(&bucket, objectKey, partSize)
  135. if err != nil {
  136. return err
  137. }
  138. jobs := make(chan downloadPart, len(parts))
  139. results := make(chan downloadPart, len(parts))
  140. failed := make(chan error)
  141. die := make(chan bool)
  142. // 启动工作协程
  143. arg := downloadWorkerArg{&bucket, objectKey, filePath, options, downloadPartHooker}
  144. for w := 1; w <= routines; w++ {
  145. go downloadWorker(w, arg, jobs, results, failed, die)
  146. }
  147. // 并发上传分片
  148. go downloadScheduler(jobs, parts)
  149. // 等待分片下载完成
  150. completed := 0
  151. ps := make([]downloadPart, len(parts))
  152. for completed < len(parts) {
  153. select {
  154. case part := <-results:
  155. completed++
  156. ps[part.Index] = part
  157. case err := <-failed:
  158. close(die)
  159. return err
  160. }
  161. if completed >= len(parts) {
  162. break
  163. }
  164. }
  165. return nil
  166. }
  167. // ----- 并发有断点的下载 -----
  168. const downloadCpMagic = "92611BED-89E2-46B6-89E5-72F273D4B0A3"
  169. type downloadCheckpoint struct {
  170. Magic string // magic
  171. MD5 string // cp内容的MD5
  172. FilePath string // 本地文件
  173. Object string // key
  174. ObjStat objectStat // 文件状态
  175. Parts []downloadPart // 全部分片
  176. PartStat []bool // 分片下载是否完成
  177. }
  178. type objectStat struct {
  179. Size int64 // 大小
  180. LastModified string // 最后修改时间
  181. Etag string // etag
  182. }
  183. // CP数据是否有效,CP有效且Object没有更新时有效
  184. func (cp downloadCheckpoint) isValid(bucket *Bucket, objectKey string) (bool, error) {
  185. // 比较CP的Magic及MD5
  186. cpb := cp
  187. cpb.MD5 = ""
  188. js, _ := json.Marshal(cpb)
  189. sum := md5.Sum(js)
  190. b64 := base64.StdEncoding.EncodeToString(sum[:])
  191. if cp.Magic != downloadCpMagic || b64 != cp.MD5 {
  192. return false, nil
  193. }
  194. // 确认object没有更新
  195. meta, err := bucket.GetObjectDetailedMeta(objectKey)
  196. if err != nil {
  197. return false, err
  198. }
  199. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  200. if err != nil {
  201. return false, err
  202. }
  203. // 比较Object的大小/最后修改时间/etag
  204. if cp.ObjStat.Size != objectSize ||
  205. cp.ObjStat.LastModified != meta.Get(HTTPHeaderLastModified) ||
  206. cp.ObjStat.Etag != meta.Get(HTTPHeaderEtag) {
  207. return false, nil
  208. }
  209. return true, nil
  210. }
  211. // 从文件中load
  212. func (cp *downloadCheckpoint) load(filePath string) error {
  213. contents, err := ioutil.ReadFile(filePath)
  214. if err != nil {
  215. return err
  216. }
  217. err = json.Unmarshal(contents, cp)
  218. return err
  219. }
  220. // dump到文件
  221. func (cp *downloadCheckpoint) dump(filePath string) error {
  222. bcp := *cp
  223. // 计算MD5
  224. bcp.MD5 = ""
  225. js, err := json.Marshal(bcp)
  226. if err != nil {
  227. return err
  228. }
  229. sum := md5.Sum(js)
  230. b64 := base64.StdEncoding.EncodeToString(sum[:])
  231. bcp.MD5 = b64
  232. // 序列化
  233. js, err = json.Marshal(bcp)
  234. if err != nil {
  235. return err
  236. }
  237. // dump
  238. return ioutil.WriteFile(filePath, js, 0644)
  239. }
  240. // 未完成的分片
  241. func (cp downloadCheckpoint) todoParts() []downloadPart {
  242. dps := []downloadPart{}
  243. for i, ps := range cp.PartStat {
  244. if !ps {
  245. dps = append(dps, cp.Parts[i])
  246. }
  247. }
  248. return dps
  249. }
  250. // 初始化下载任务
  251. func (cp *downloadCheckpoint) prepare(bucket *Bucket, objectKey, filePath string, partSize int64) error {
  252. // cp
  253. cp.Magic = downloadCpMagic
  254. cp.FilePath = filePath
  255. cp.Object = objectKey
  256. // object
  257. meta, err := bucket.GetObjectDetailedMeta(objectKey)
  258. if err != nil {
  259. return err
  260. }
  261. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  262. if err != nil {
  263. return err
  264. }
  265. cp.ObjStat.Size = objectSize
  266. cp.ObjStat.LastModified = meta.Get(HTTPHeaderLastModified)
  267. cp.ObjStat.Etag = meta.Get(HTTPHeaderEtag)
  268. // parts
  269. cp.Parts, err = getDownloadParts(bucket, objectKey, partSize)
  270. if err != nil {
  271. return err
  272. }
  273. cp.PartStat = make([]bool, len(cp.Parts))
  274. for i := range cp.PartStat {
  275. cp.PartStat[i] = false
  276. }
  277. return nil
  278. }
  279. func (cp *downloadCheckpoint) complete(cpFilePath string) error {
  280. os.Remove(cpFilePath)
  281. return nil
  282. }
  283. // 并发带断点的下载
  284. func (bucket Bucket) downloadFileWithCp(objectKey, filePath string, partSize int64, options []Option, cpFilePath string, routines int) error {
  285. // LOAD CP数据
  286. dcp := downloadCheckpoint{}
  287. err := dcp.load(cpFilePath)
  288. if err != nil {
  289. os.Remove(cpFilePath)
  290. }
  291. // LOAD出错或数据无效重新初始化下载
  292. valid, err := dcp.isValid(&bucket, objectKey)
  293. if err != nil || !valid {
  294. if err = dcp.prepare(&bucket, objectKey, filePath, partSize); err != nil {
  295. return err
  296. }
  297. os.Remove(cpFilePath)
  298. }
  299. // 如果文件不存在则创建,存在不清空,下载分片会重写文件内容
  300. fd, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE, 0660)
  301. if err != nil {
  302. return err
  303. }
  304. fd.Close()
  305. // 未完成的分片
  306. parts := dcp.todoParts()
  307. jobs := make(chan downloadPart, len(parts))
  308. results := make(chan downloadPart, len(parts))
  309. failed := make(chan error)
  310. die := make(chan bool)
  311. // 启动工作协程
  312. arg := downloadWorkerArg{&bucket, objectKey, filePath, options, downloadPartHooker}
  313. for w := 1; w <= routines; w++ {
  314. go downloadWorker(w, arg, jobs, results, failed, die)
  315. }
  316. // 并发下载分片
  317. go downloadScheduler(jobs, parts)
  318. // 等待分片下载完成
  319. completed := 0
  320. for completed < len(parts) {
  321. select {
  322. case part := <-results:
  323. completed++
  324. dcp.PartStat[part.Index] = true
  325. dcp.dump(cpFilePath)
  326. case err := <-failed:
  327. close(die)
  328. return err
  329. }
  330. if completed >= len(parts) {
  331. break
  332. }
  333. }
  334. return dcp.complete(cpFilePath)
  335. }