download.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390
  1. package oss
  2. import (
  3. "crypto/md5"
  4. "encoding/base64"
  5. "encoding/json"
  6. "errors"
  7. "io"
  8. "io/ioutil"
  9. "os"
  10. "strconv"
  11. "sync"
  12. )
  13. //
  14. // DownloadFile 分块下载文件,适合加大Object
  15. //
  16. // objectKey object key。
  17. // filePath 本地文件。objectKey下载到文件。
  18. // partSize 本次上传文件片的大小,字节数。比如100 * 1024为每片100KB。
  19. // options Object的属性限制项。详见GetObject。
  20. //
  21. // error 操作成功error为nil,非nil为错误信息。
  22. //
  23. func (bucket Bucket) DownloadFile(objectKey, filePath string, partSize int64, options ...Option) error {
  24. if partSize < 1 || partSize > MaxPartSize {
  25. return errors.New("oss: part size invalid range (1, 5GB]")
  26. }
  27. cpConf, err := getCpConfig(options, filePath)
  28. if err != nil {
  29. return err
  30. }
  31. routines := getRoutines(options)
  32. if cpConf.IsEnable {
  33. return bucket.downloadFileWithCp(objectKey, filePath, partSize, options, cpConf.FilePath, routines)
  34. }
  35. return bucket.downloadFile(objectKey, filePath, partSize, options, routines)
  36. }
  37. // ----- 并发无断点的下载 -----
  38. // 工作协程参数
  39. type downloadWorkerArg struct {
  40. bucket *Bucket
  41. key string
  42. filePath string
  43. options []Option
  44. hook downloadPartHook
  45. }
  46. // Hook用于测试
  47. type downloadPartHook func(part downloadPart) error
  48. var downloadPartHooker downloadPartHook = defaultDownloadPartHook
  49. func defaultDownloadPartHook(part downloadPart) error {
  50. return nil
  51. }
  52. // 工作协程
  53. func downloadWorker(id int, arg downloadWorkerArg, jobs <-chan downloadPart, results chan<- downloadPart, failed chan<- error) {
  54. for part := range jobs {
  55. if err := arg.hook(part); err != nil {
  56. failed <- err
  57. break
  58. }
  59. opt := Range(part.Start, part.End)
  60. opts := append(arg.options, opt)
  61. rd, err := arg.bucket.GetObject(arg.key, opts...)
  62. if err != nil {
  63. failed <- err
  64. break
  65. }
  66. defer rd.Close()
  67. fd, err := os.OpenFile(arg.filePath, os.O_WRONLY, 0660)
  68. if err != nil {
  69. failed <- err
  70. break
  71. }
  72. defer fd.Close()
  73. _, err = fd.Seek(part.Start, os.SEEK_SET)
  74. if err != nil {
  75. failed <- err
  76. break
  77. }
  78. _, err = io.Copy(fd, rd)
  79. if err != nil {
  80. failed <- err
  81. break
  82. }
  83. results <- part
  84. }
  85. }
  86. // 调度协程
  87. func downloadScheduler(jobs chan downloadPart, parts []downloadPart) {
  88. for _, part := range parts {
  89. jobs <- part
  90. }
  91. close(jobs)
  92. }
  93. // 下载片
  94. type downloadPart struct {
  95. Index int // 片序号,从0开始编号
  96. Start int64 // 片起始位置
  97. End int64 // 片结束位置
  98. }
  99. // 文件分片
  100. func getDownloadPart(bucket *Bucket, objectKey string, partSize int64) ([]downloadPart, error) {
  101. meta, err := bucket.GetObjectDetailedMeta(objectKey)
  102. if err != nil {
  103. return nil, err
  104. }
  105. parts := []downloadPart{}
  106. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  107. if err != nil {
  108. return nil, err
  109. }
  110. part := downloadPart{}
  111. i := 0
  112. for offset := int64(0); offset < objectSize; offset += partSize {
  113. part.Index = i
  114. part.Start = offset
  115. part.End = GetPartEnd(offset, objectSize, partSize)
  116. parts = append(parts, part)
  117. i++
  118. }
  119. return parts, nil
  120. }
  121. // 并发无断点续传的下载
  122. func (bucket Bucket) downloadFile(objectKey, filePath string, partSize int64, options []Option, routines int) error {
  123. // 如果文件不存在则创建
  124. fd, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE, 0660)
  125. if err != nil {
  126. return err
  127. }
  128. fd.Close()
  129. // 分割文件
  130. parts, err := getDownloadPart(&bucket, objectKey, partSize)
  131. if err != nil {
  132. return err
  133. }
  134. jobs := make(chan downloadPart, len(parts))
  135. results := make(chan downloadPart, len(parts))
  136. failed := make(chan error)
  137. // 启动工作协程
  138. arg := downloadWorkerArg{&bucket, objectKey, filePath, options, downloadPartHooker}
  139. for w := 1; w <= routines; w++ {
  140. go downloadWorker(w, arg, jobs, results, failed)
  141. }
  142. // 并发上传分片
  143. go downloadScheduler(jobs, parts)
  144. // 等待分片下载完成
  145. completed := 0
  146. ps := make([]downloadPart, len(parts))
  147. for {
  148. select {
  149. case part := <-results:
  150. completed++
  151. ps[part.Index] = part
  152. case err := <-failed:
  153. return err
  154. }
  155. if completed >= len(parts) {
  156. break
  157. }
  158. }
  159. return nil
  160. }
  161. // ----- 并发有断点的下载 -----
  162. const downloadCpMagic = "92611BED-89E2-46B6-89E5-72F273D4B0A3"
  163. type downloadCheckpoint struct {
  164. Magic string // magic
  165. MD5 string // cp内容的MD5
  166. FilePath string // 本地文件
  167. Object string // key
  168. ObjStat objectStat // 文件状态
  169. Parts []downloadPart // 全部分片
  170. PartStat []bool // 分片下载是否完成
  171. mutex sync.Mutex // Lock
  172. }
  173. type objectStat struct {
  174. Size int64 // 大小
  175. LastModified string // 最后修改时间
  176. Etag string // etag
  177. }
  178. // CP数据是否有效,CP有效且Object没有更新时有效
  179. func (cp downloadCheckpoint) isValid(bucket *Bucket, objectKey string) (bool, error) {
  180. // 比较CP的Magic及MD5
  181. cpb := cp
  182. cpb.MD5 = ""
  183. js, _ := json.Marshal(cpb)
  184. sum := md5.Sum(js)
  185. b64 := base64.StdEncoding.EncodeToString(sum[:])
  186. if cp.Magic != downloadCpMagic || b64 != cp.MD5 {
  187. return false, nil
  188. }
  189. // 确认object没有更新
  190. meta, err := bucket.GetObjectDetailedMeta(objectKey)
  191. if err != nil {
  192. return false, err
  193. }
  194. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  195. if err != nil {
  196. return false, err
  197. }
  198. // 比较Object的大小/最后修改时间/etag
  199. if cp.ObjStat.Size != objectSize ||
  200. cp.ObjStat.LastModified != meta.Get(HTTPHeaderLastModified) ||
  201. cp.ObjStat.Etag != meta.Get(HTTPHeaderEtag) {
  202. return false, nil
  203. }
  204. return true, nil
  205. }
  206. // 从文件中load
  207. func (cp *downloadCheckpoint) load(filePath string) error {
  208. contents, err := ioutil.ReadFile(filePath)
  209. if err != nil {
  210. return err
  211. }
  212. err = json.Unmarshal(contents, cp)
  213. return err
  214. }
  215. // dump到文件
  216. func (cp *downloadCheckpoint) dump(filePath string) error {
  217. bcp := *cp
  218. // 计算MD5
  219. bcp.MD5 = ""
  220. js, err := json.Marshal(bcp)
  221. if err != nil {
  222. return err
  223. }
  224. sum := md5.Sum(js)
  225. b64 := base64.StdEncoding.EncodeToString(sum[:])
  226. bcp.MD5 = b64
  227. // 序列化
  228. js, err = json.Marshal(bcp)
  229. if err != nil {
  230. return err
  231. }
  232. // dump
  233. return ioutil.WriteFile(filePath, js, 0644)
  234. }
  235. // 未完成的分片
  236. func (cp downloadCheckpoint) todoParts() []downloadPart {
  237. dps := []downloadPart{}
  238. for i, ps := range cp.PartStat {
  239. if !ps {
  240. dps = append(dps, cp.Parts[i])
  241. }
  242. }
  243. return dps
  244. }
  245. // 初始化下载任务
  246. func (cp *downloadCheckpoint) prepare(bucket *Bucket, objectKey, filePath string, partSize int64) error {
  247. // cp
  248. cp.Magic = downloadCpMagic
  249. cp.FilePath = filePath
  250. cp.Object = objectKey
  251. // object
  252. meta, err := bucket.GetObjectDetailedMeta(objectKey)
  253. if err != nil {
  254. return err
  255. }
  256. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  257. if err != nil {
  258. return err
  259. }
  260. cp.ObjStat.Size = objectSize
  261. cp.ObjStat.LastModified = meta.Get(HTTPHeaderLastModified)
  262. cp.ObjStat.Etag = meta.Get(HTTPHeaderEtag)
  263. // parts
  264. cp.Parts, err = getDownloadPart(bucket, objectKey, partSize)
  265. if err != nil {
  266. return err
  267. }
  268. cp.PartStat = make([]bool, len(cp.Parts))
  269. for i := range cp.PartStat {
  270. cp.PartStat[i] = false
  271. }
  272. return nil
  273. }
  274. func (cp *downloadCheckpoint) complete(cpFilePath string) error {
  275. return os.Remove(cpFilePath)
  276. }
  277. // 并发带断点的下载
  278. func (bucket Bucket) downloadFileWithCp(objectKey, filePath string, partSize int64, options []Option, cpFilePath string, routines int) error {
  279. // LOAD CP数据
  280. dcp := downloadCheckpoint{}
  281. err := dcp.load(cpFilePath)
  282. if err != nil {
  283. os.Remove(cpFilePath)
  284. }
  285. // LOAD出错或数据无效重新初始化下载
  286. valid, err := dcp.isValid(&bucket, objectKey)
  287. if err != nil || !valid {
  288. if err = dcp.prepare(&bucket, objectKey, filePath, partSize); err != nil {
  289. return err
  290. }
  291. os.Remove(cpFilePath)
  292. }
  293. // 文件不存在创建
  294. fd, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE, 0660)
  295. if err != nil {
  296. return err
  297. }
  298. fd.Close()
  299. // 未完成的分片
  300. parts := dcp.todoParts()
  301. jobs := make(chan downloadPart, len(parts))
  302. results := make(chan downloadPart, len(parts))
  303. failed := make(chan error)
  304. // 启动工作协程
  305. arg := downloadWorkerArg{&bucket, objectKey, filePath, options, downloadPartHooker}
  306. for w := 1; w <= routines; w++ {
  307. go downloadWorker(w, arg, jobs, results, failed)
  308. }
  309. // 并发下载分片
  310. go downloadScheduler(jobs, parts)
  311. // 等待分片下载完成
  312. completed := 0
  313. for {
  314. select {
  315. case part := <-results:
  316. completed++
  317. dcp.PartStat[part.Index] = true
  318. dcp.dump(cpFilePath)
  319. case err := <-failed:
  320. return err
  321. }
  322. if completed >= len(parts) {
  323. break
  324. }
  325. }
  326. return dcp.complete(cpFilePath)
  327. }