download.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393
  1. package oss
  2. import (
  3. "crypto/md5"
  4. "encoding/base64"
  5. "encoding/json"
  6. "errors"
  7. "io"
  8. "io/ioutil"
  9. "os"
  10. "strconv"
  11. "sync"
  12. "time"
  13. )
  14. //
  15. // DownloadFile 分块下载文件,适合加大Object
  16. //
  17. // objectKey object key。
  18. // filePath 本地文件。objectKey下载到文件。
  19. // partSize 本次上传文件片的大小,字节数。比如100 * 1024为每片100KB。
  20. // options Object的属性限制项。详见GetObject。
  21. //
  22. // error 操作成功error为nil,非nil为错误信息。
  23. //
  24. func (bucket Bucket) DownloadFile(objectKey, filePath string, partSize int64, options ...Option) error {
  25. if partSize < 1 || partSize > MaxPartSize {
  26. return errors.New("oss: part size invalid range (1, 5GB]")
  27. }
  28. cpConf, err := getCpConfig(options, filePath)
  29. if err != nil {
  30. return err
  31. }
  32. routines := getRoutines(options)
  33. if cpConf.IsEnable {
  34. return bucket.downloadFileWithCp(objectKey, filePath, partSize, options, cpConf.FilePath, routines)
  35. }
  36. return bucket.downloadFile(objectKey, filePath, partSize, options, routines)
  37. }
  38. // ----- 并发无断点的下载 -----
  39. // 工作协程参数
  40. type downloadWorkerArg struct {
  41. bucket *Bucket
  42. key string
  43. filePath string
  44. options []Option
  45. hook downloadPartHook
  46. }
  47. // Hook用于测试
  48. type downloadPartHook func(part downloadPart) error
  49. var downloadPartHooker downloadPartHook = defaultDownloadPartHook
  50. func defaultDownloadPartHook(part downloadPart) error {
  51. return nil
  52. }
  53. // 工作协程
  54. func downloadWorker(id int, arg downloadWorkerArg, jobs <-chan downloadPart, results chan<- downloadPart, failed chan<- error) {
  55. for part := range jobs {
  56. if err := arg.hook(part); err != nil {
  57. failed <- err
  58. break
  59. }
  60. opt := Range(part.Start, part.End)
  61. opts := append(arg.options, opt)
  62. rd, err := arg.bucket.GetObject(arg.key, opts...)
  63. if err != nil {
  64. failed <- err
  65. break
  66. }
  67. defer rd.Close()
  68. fd, err := os.OpenFile(arg.filePath, os.O_WRONLY, 0660)
  69. if err != nil {
  70. failed <- err
  71. break
  72. }
  73. defer fd.Close()
  74. _, err = fd.Seek(part.Start, os.SEEK_SET)
  75. if err != nil {
  76. failed <- err
  77. break
  78. }
  79. _, err = io.Copy(fd, rd)
  80. if err != nil {
  81. failed <- err
  82. break
  83. }
  84. results <- part
  85. }
  86. }
  87. // 调度协程
  88. func downloadScheduler(jobs chan downloadPart, parts []downloadPart) {
  89. for _, part := range parts {
  90. jobs <- part
  91. }
  92. close(jobs)
  93. }
  94. // 下载片
  95. type downloadPart struct {
  96. Index int // 片序号,从0开始编号
  97. Start int64 // 片起始位置
  98. End int64 // 片结束位置
  99. }
  100. // 文件分片
  101. func getDownloadPart(bucket *Bucket, objectKey string, partSize int64) ([]downloadPart, error) {
  102. meta, err := bucket.GetObjectDetailedMeta(objectKey)
  103. if err != nil {
  104. return nil, err
  105. }
  106. parts := []downloadPart{}
  107. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  108. if err != nil {
  109. return nil, err
  110. }
  111. part := downloadPart{}
  112. i := 0
  113. for offset := int64(0); offset < objectSize; offset += partSize {
  114. part.Index = i
  115. part.Start = offset
  116. part.End = GetPartEnd(offset, objectSize, partSize)
  117. parts = append(parts, part)
  118. i++
  119. }
  120. return parts, nil
  121. }
  122. // 并发无断点续传的下载
  123. func (bucket Bucket) downloadFile(objectKey, filePath string, partSize int64, options []Option, routines int) error {
  124. // 如果文件不存在则创建
  125. fd, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE, 0660)
  126. if err != nil {
  127. return err
  128. }
  129. fd.Close()
  130. // 分割文件
  131. parts, err := getDownloadPart(&bucket, objectKey, partSize)
  132. if err != nil {
  133. return err
  134. }
  135. jobs := make(chan downloadPart, len(parts))
  136. results := make(chan downloadPart, len(parts))
  137. failed := make(chan error)
  138. // 启动工作协程
  139. arg := downloadWorkerArg{&bucket, objectKey, filePath, options, downloadPartHooker}
  140. for w := 1; w <= routines; w++ {
  141. go downloadWorker(w, arg, jobs, results, failed)
  142. }
  143. // 并发上传分片
  144. go downloadScheduler(jobs, parts)
  145. // 等待分片下载完成
  146. completed := 0
  147. ps := make([]downloadPart, len(parts))
  148. for {
  149. select {
  150. case part := <-results:
  151. completed++
  152. ps[part.Index] = part
  153. case err := <-failed:
  154. return err
  155. default:
  156. time.Sleep(time.Second)
  157. }
  158. if completed >= len(parts) {
  159. break
  160. }
  161. }
  162. return nil
  163. }
  164. // ----- 并发有断点的下载 -----
  165. const downloadCpMagic = "92611BED-89E2-46B6-89E5-72F273D4B0A3"
  166. type downloadCheckpoint struct {
  167. Magic string // magic
  168. MD5 string // cp内容的MD5
  169. FilePath string // 本地文件
  170. Object string // key
  171. ObjStat objectStat // 文件状态
  172. Parts []downloadPart // 全部分片
  173. PartStat []bool // 分片下载是否完成
  174. mutex sync.Mutex // Lock
  175. }
  176. type objectStat struct {
  177. Size int64 // 大小
  178. LastModified string // 最后修改时间
  179. Etag string // etag
  180. }
  181. // CP数据是否有效,CP有效且Object没有更新时有效
  182. func (cp downloadCheckpoint) isValid(bucket *Bucket, objectKey string) (bool, error) {
  183. // 比较CP的Magic及MD5
  184. cpb := cp
  185. cpb.MD5 = ""
  186. js, _ := json.Marshal(cpb)
  187. sum := md5.Sum(js)
  188. b64 := base64.StdEncoding.EncodeToString(sum[:])
  189. if cp.Magic != downloadCpMagic || b64 != cp.MD5 {
  190. return false, nil
  191. }
  192. // 确认object没有更新
  193. meta, err := bucket.GetObjectDetailedMeta(objectKey)
  194. if err != nil {
  195. return false, err
  196. }
  197. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  198. if err != nil {
  199. return false, err
  200. }
  201. // 比较Object的大小/最后修改时间/etag
  202. if cp.ObjStat.Size != objectSize ||
  203. cp.ObjStat.LastModified != meta.Get(HTTPHeaderLastModified) ||
  204. cp.ObjStat.Etag != meta.Get(HTTPHeaderEtag) {
  205. return false, nil
  206. }
  207. return true, nil
  208. }
  209. // 从文件中load
  210. func (cp *downloadCheckpoint) load(filePath string) error {
  211. contents, err := ioutil.ReadFile(filePath)
  212. if err != nil {
  213. return err
  214. }
  215. err = json.Unmarshal(contents, cp)
  216. return err
  217. }
  218. // dump到文件
  219. func (cp *downloadCheckpoint) dump(filePath string) error {
  220. bcp := *cp
  221. // 计算MD5
  222. bcp.MD5 = ""
  223. js, err := json.Marshal(bcp)
  224. if err != nil {
  225. return err
  226. }
  227. sum := md5.Sum(js)
  228. b64 := base64.StdEncoding.EncodeToString(sum[:])
  229. bcp.MD5 = b64
  230. // 序列化
  231. js, err = json.Marshal(bcp)
  232. if err != nil {
  233. return err
  234. }
  235. // dump
  236. return ioutil.WriteFile(filePath, js, 0644)
  237. }
  238. // 未完成的分片
  239. func (cp downloadCheckpoint) todoParts() []downloadPart {
  240. dps := []downloadPart{}
  241. for i, ps := range cp.PartStat {
  242. if !ps {
  243. dps = append(dps, cp.Parts[i])
  244. }
  245. }
  246. return dps
  247. }
  248. // 初始化下载任务
  249. func (cp *downloadCheckpoint) prepare(bucket *Bucket, objectKey, filePath string, partSize int64) error {
  250. // cp
  251. cp.Magic = downloadCpMagic
  252. cp.FilePath = filePath
  253. cp.Object = objectKey
  254. // object
  255. meta, err := bucket.GetObjectDetailedMeta(objectKey)
  256. if err != nil {
  257. return err
  258. }
  259. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  260. if err != nil {
  261. return err
  262. }
  263. cp.ObjStat.Size = objectSize
  264. cp.ObjStat.LastModified = meta.Get(HTTPHeaderLastModified)
  265. cp.ObjStat.Etag = meta.Get(HTTPHeaderEtag)
  266. // parts
  267. cp.Parts, err = getDownloadPart(bucket, objectKey, partSize)
  268. if err != nil {
  269. return err
  270. }
  271. cp.PartStat = make([]bool, len(cp.Parts))
  272. for i := range cp.PartStat {
  273. cp.PartStat[i] = false
  274. }
  275. return nil
  276. }
  277. func (cp *downloadCheckpoint) complete(cpFilePath string) error {
  278. return os.Remove(cpFilePath)
  279. }
  280. // 并发带断点的下载
  281. func (bucket Bucket) downloadFileWithCp(objectKey, filePath string, partSize int64, options []Option, cpFilePath string, routines int) error {
  282. // LOAD CP数据
  283. dcp := downloadCheckpoint{}
  284. err := dcp.load(cpFilePath)
  285. if err != nil {
  286. os.Remove(cpFilePath)
  287. }
  288. // LOAD出错或数据无效重新初始化下载
  289. valid, err := dcp.isValid(&bucket, objectKey)
  290. if err != nil || !valid {
  291. if err = dcp.prepare(&bucket, objectKey, filePath, partSize); err != nil {
  292. return err
  293. }
  294. os.Remove(cpFilePath)
  295. }
  296. // 文件不存在创建
  297. fd, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE, 0660)
  298. if err != nil {
  299. return err
  300. }
  301. fd.Close()
  302. // 未完成的分片
  303. parts := dcp.todoParts()
  304. jobs := make(chan downloadPart, len(parts))
  305. results := make(chan downloadPart, len(parts))
  306. failed := make(chan error)
  307. // 启动工作协程
  308. arg := downloadWorkerArg{&bucket, objectKey, filePath, options, downloadPartHooker}
  309. for w := 1; w <= routines; w++ {
  310. go downloadWorker(w, arg, jobs, results, failed)
  311. }
  312. // 并发下载分片
  313. go downloadScheduler(jobs, parts)
  314. // 等待分片下载完成
  315. completed := 0
  316. for {
  317. select {
  318. case part := <-results:
  319. completed++
  320. dcp.PartStat[part.Index] = true
  321. dcp.dump(cpFilePath)
  322. case err := <-failed:
  323. return err
  324. }
  325. if completed >= len(parts) {
  326. break
  327. }
  328. }
  329. return dcp.complete(cpFilePath)
  330. }