download.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550
  1. package oss
  2. import (
  3. "crypto/md5"
  4. "encoding/base64"
  5. "encoding/json"
  6. "errors"
  7. "hash"
  8. "hash/crc64"
  9. "io"
  10. "io/ioutil"
  11. "os"
  12. "strconv"
  13. )
  14. // DownloadFile downloads files with multipart download.
  15. //
  16. // objectKey the object key.
  17. // filePath the local file to download from objectKey in OSS.
  18. // partSize the part size in bytes.
  19. // options object's constraints, check out GetObject for the reference.
  20. //
  21. // error it's nil when the call succeeds, otherwise it's an error object.
  22. //
  23. func (bucket Bucket) DownloadFile(objectKey, filePath string, partSize int64, options ...Option) error {
  24. if partSize < 1 {
  25. return errors.New("oss: part size smaller than 1")
  26. }
  27. cpConf, err := getCpConfig(options, filePath)
  28. if err != nil {
  29. return err
  30. }
  31. uRange, err := getRangeConfig(options)
  32. if err != nil {
  33. return err
  34. }
  35. routines := getRoutines(options)
  36. if cpConf.IsEnable {
  37. return bucket.downloadFileWithCp(objectKey, filePath, partSize, options, cpConf.FilePath, routines, uRange)
  38. }
  39. return bucket.downloadFile(objectKey, filePath, partSize, options, routines, uRange)
  40. }
  41. // getRangeConfig gets the download range from the options.
  42. func getRangeConfig(options []Option) (*unpackedRange, error) {
  43. rangeOpt, err := findOption(options, HTTPHeaderRange, nil)
  44. if err != nil || rangeOpt == nil {
  45. return nil, err
  46. }
  47. return parseRange(rangeOpt.(string))
  48. }
  49. // ----- concurrent download without checkpoint -----
  50. // downloadWorkerArg is download worker's parameters
  51. type downloadWorkerArg struct {
  52. bucket *Bucket
  53. key string
  54. filePath string
  55. options []Option
  56. hook downloadPartHook
  57. enableCRC bool
  58. }
  59. // downloadPartHook is hook for test
  60. type downloadPartHook func(part downloadPart) error
  61. var downloadPartHooker downloadPartHook = defaultDownloadPartHook
  62. func defaultDownloadPartHook(part downloadPart) error {
  63. return nil
  64. }
  65. // defaultDownloadProgressListener defines default ProgressListener, shields the ProgressListener in options of GetObject.
  66. type defaultDownloadProgressListener struct {
  67. }
  68. // ProgressChanged no-ops
  69. func (listener *defaultDownloadProgressListener) ProgressChanged(event *ProgressEvent) {
  70. }
  71. // downloadWorker
  72. func downloadWorker(id int, arg downloadWorkerArg, jobs <-chan downloadPart, results chan<- downloadPart, failed chan<- error, die <-chan bool) {
  73. for part := range jobs {
  74. if err := arg.hook(part); err != nil {
  75. failed <- err
  76. break
  77. }
  78. // Resolve options
  79. r := Range(part.Start, part.End)
  80. p := Progress(&defaultDownloadProgressListener{})
  81. opts := make([]Option, len(arg.options)+2)
  82. // Append orderly, can not be reversed!
  83. opts = append(opts, arg.options...)
  84. opts = append(opts, r, p)
  85. rd, err := arg.bucket.GetObject(arg.key, opts...)
  86. if err != nil {
  87. failed <- err
  88. break
  89. }
  90. defer rd.Close()
  91. var crcCalc hash.Hash64
  92. if arg.enableCRC {
  93. crcCalc = crc64.New(crcTable())
  94. contentLen := part.End - part.Start + 1
  95. rd = ioutil.NopCloser(TeeReader(rd, crcCalc, contentLen, nil, nil))
  96. }
  97. defer rd.Close()
  98. select {
  99. case <-die:
  100. return
  101. default:
  102. }
  103. fd, err := os.OpenFile(arg.filePath, os.O_WRONLY, FilePermMode)
  104. if err != nil {
  105. failed <- err
  106. break
  107. }
  108. _, err = fd.Seek(part.Start-part.Offset, os.SEEK_SET)
  109. if err != nil {
  110. fd.Close()
  111. failed <- err
  112. break
  113. }
  114. _, err = io.Copy(fd, rd)
  115. if err != nil {
  116. fd.Close()
  117. failed <- err
  118. break
  119. }
  120. if arg.enableCRC {
  121. part.CRC64 = crcCalc.Sum64()
  122. }
  123. fd.Close()
  124. results <- part
  125. }
  126. }
  127. // downloadScheduler
  128. func downloadScheduler(jobs chan downloadPart, parts []downloadPart) {
  129. for _, part := range parts {
  130. jobs <- part
  131. }
  132. close(jobs)
  133. }
  134. // downloadPart defines download part
  135. type downloadPart struct {
  136. Index int // Part number, starting from 0
  137. Start int64 // Start index
  138. End int64 // End index
  139. Offset int64 // Offset
  140. CRC64 uint64 // CRC check value of part
  141. }
  142. // getDownloadParts gets download parts
  143. func getDownloadParts(bucket *Bucket, objectKey string, partSize int64, uRange *unpackedRange) ([]downloadPart, bool, uint64, error) {
  144. meta, err := bucket.GetObjectDetailedMeta(objectKey)
  145. if err != nil {
  146. return nil, false, 0, err
  147. }
  148. parts := []downloadPart{}
  149. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  150. if err != nil {
  151. return nil, false, 0, err
  152. }
  153. enableCRC := false
  154. crcVal := (uint64)(0)
  155. if bucket.getConfig().IsEnableCRC && meta.Get(HTTPHeaderOssCRC64) != "" {
  156. if uRange == nil || (!uRange.hasStart && !uRange.hasEnd) {
  157. enableCRC = true
  158. crcVal, _ = strconv.ParseUint(meta.Get(HTTPHeaderOssCRC64), 10, 0)
  159. }
  160. }
  161. part := downloadPart{}
  162. i := 0
  163. start, end := adjustRange(uRange, objectSize)
  164. for offset := start; offset < end; offset += partSize {
  165. part.Index = i
  166. part.Start = offset
  167. part.End = GetPartEnd(offset, end, partSize)
  168. part.Offset = start
  169. part.CRC64 = 0
  170. parts = append(parts, part)
  171. i++
  172. }
  173. return parts, enableCRC, crcVal, nil
  174. }
  175. // getObjectBytes gets object bytes length
  176. func getObjectBytes(parts []downloadPart) int64 {
  177. var ob int64
  178. for _, part := range parts {
  179. ob += (part.End - part.Start + 1)
  180. }
  181. return ob
  182. }
  183. // combineCRCInParts caculates the total CRC of continuous parts
  184. func combineCRCInParts(dps []downloadPart) uint64 {
  185. if dps == nil || len(dps) == 0 {
  186. return 0
  187. }
  188. crc := dps[0].CRC64
  189. for i := 1; i < len(dps); i++ {
  190. crc = CRC64Combine(crc, dps[i].CRC64, (uint64)(dps[i].End-dps[i].Start+1))
  191. }
  192. return crc
  193. }
  194. // downloadFile downloads file concurrently without checkpoint.
  195. func (bucket Bucket) downloadFile(objectKey, filePath string, partSize int64, options []Option, routines int, uRange *unpackedRange) error {
  196. tempFilePath := filePath + TempFileSuffix
  197. listener := getProgressListener(options)
  198. // If the file does not exist, create one. If exists, the download will overwrite it.
  199. fd, err := os.OpenFile(tempFilePath, os.O_WRONLY|os.O_CREATE, FilePermMode)
  200. if err != nil {
  201. return err
  202. }
  203. fd.Close()
  204. // Get the parts of the file
  205. parts, enableCRC, expectedCRC, err := getDownloadParts(&bucket, objectKey, partSize, uRange)
  206. if err != nil {
  207. return err
  208. }
  209. jobs := make(chan downloadPart, len(parts))
  210. results := make(chan downloadPart, len(parts))
  211. failed := make(chan error)
  212. die := make(chan bool)
  213. var completedBytes int64
  214. totalBytes := getObjectBytes(parts)
  215. event := newProgressEvent(TransferStartedEvent, 0, totalBytes)
  216. publishProgress(listener, event)
  217. // Start the download workers
  218. arg := downloadWorkerArg{&bucket, objectKey, tempFilePath, options, downloadPartHooker, enableCRC}
  219. for w := 1; w <= routines; w++ {
  220. go downloadWorker(w, arg, jobs, results, failed, die)
  221. }
  222. // Download parts concurrently
  223. go downloadScheduler(jobs, parts)
  224. // Waiting for parts download finished
  225. completed := 0
  226. for completed < len(parts) {
  227. select {
  228. case part := <-results:
  229. completed++
  230. completedBytes += (part.End - part.Start + 1)
  231. parts[part.Index].CRC64 = part.CRC64
  232. event = newProgressEvent(TransferDataEvent, completedBytes, totalBytes)
  233. publishProgress(listener, event)
  234. case err := <-failed:
  235. close(die)
  236. event = newProgressEvent(TransferFailedEvent, completedBytes, totalBytes)
  237. publishProgress(listener, event)
  238. return err
  239. }
  240. if completed >= len(parts) {
  241. break
  242. }
  243. }
  244. event = newProgressEvent(TransferCompletedEvent, completedBytes, totalBytes)
  245. publishProgress(listener, event)
  246. if enableCRC {
  247. actualCRC := combineCRCInParts(parts)
  248. err = checkDownloadCRC(actualCRC, expectedCRC)
  249. if err != nil {
  250. return err
  251. }
  252. }
  253. return os.Rename(tempFilePath, filePath)
  254. }
  255. // ----- Concurrent download with chcekpoint -----
  256. const downloadCpMagic = "92611BED-89E2-46B6-89E5-72F273D4B0A3"
  257. type downloadCheckpoint struct {
  258. Magic string // Magic
  259. MD5 string // Checkpoint content MD5
  260. FilePath string // Local file
  261. Object string // Key
  262. ObjStat objectStat // Object status
  263. Parts []downloadPart // All download parts
  264. PartStat []bool // Parts' download status
  265. Start int64 // Start point of the file
  266. End int64 // End point of the file
  267. enableCRC bool // Whether has CRC check
  268. CRC uint64 // CRC check value
  269. }
  270. type objectStat struct {
  271. Size int64 // Object size
  272. LastModified string // Last modified time
  273. Etag string // Etag
  274. }
  275. // isValid flags of checkpoint data is valid. It returns true when the data is valid and the checkpoint is valid and the object is not updated.
  276. func (cp downloadCheckpoint) isValid(bucket *Bucket, objectKey string, uRange *unpackedRange) (bool, error) {
  277. // Compare the CP's Magic and the MD5
  278. cpb := cp
  279. cpb.MD5 = ""
  280. js, _ := json.Marshal(cpb)
  281. sum := md5.Sum(js)
  282. b64 := base64.StdEncoding.EncodeToString(sum[:])
  283. if cp.Magic != downloadCpMagic || b64 != cp.MD5 {
  284. return false, nil
  285. }
  286. // Ensure the object is not updated.
  287. meta, err := bucket.GetObjectDetailedMeta(objectKey)
  288. if err != nil {
  289. return false, err
  290. }
  291. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  292. if err != nil {
  293. return false, err
  294. }
  295. // Compare the object size, last modified time and etag
  296. if cp.ObjStat.Size != objectSize ||
  297. cp.ObjStat.LastModified != meta.Get(HTTPHeaderLastModified) ||
  298. cp.ObjStat.Etag != meta.Get(HTTPHeaderEtag) {
  299. return false, nil
  300. }
  301. // Check the download range
  302. if uRange != nil {
  303. start, end := adjustRange(uRange, objectSize)
  304. if start != cp.Start || end != cp.End {
  305. return false, nil
  306. }
  307. }
  308. return true, nil
  309. }
  310. // load checkpoint from local file
  311. func (cp *downloadCheckpoint) load(filePath string) error {
  312. contents, err := ioutil.ReadFile(filePath)
  313. if err != nil {
  314. return err
  315. }
  316. err = json.Unmarshal(contents, cp)
  317. return err
  318. }
  319. // dump funciton dumps to file
  320. func (cp *downloadCheckpoint) dump(filePath string) error {
  321. bcp := *cp
  322. // Calculate MD5
  323. bcp.MD5 = ""
  324. js, err := json.Marshal(bcp)
  325. if err != nil {
  326. return err
  327. }
  328. sum := md5.Sum(js)
  329. b64 := base64.StdEncoding.EncodeToString(sum[:])
  330. bcp.MD5 = b64
  331. // Serialize
  332. js, err = json.Marshal(bcp)
  333. if err != nil {
  334. return err
  335. }
  336. // Dump
  337. return ioutil.WriteFile(filePath, js, FilePermMode)
  338. }
  339. // todoParts gets unfinished parts
  340. func (cp downloadCheckpoint) todoParts() []downloadPart {
  341. dps := []downloadPart{}
  342. for i, ps := range cp.PartStat {
  343. if !ps {
  344. dps = append(dps, cp.Parts[i])
  345. }
  346. }
  347. return dps
  348. }
  349. // getCompletedBytes gets completed size
  350. func (cp downloadCheckpoint) getCompletedBytes() int64 {
  351. var completedBytes int64
  352. for i, part := range cp.Parts {
  353. if cp.PartStat[i] {
  354. completedBytes += (part.End - part.Start + 1)
  355. }
  356. }
  357. return completedBytes
  358. }
  359. // prepare initiates download tasks
  360. func (cp *downloadCheckpoint) prepare(bucket *Bucket, objectKey, filePath string, partSize int64, uRange *unpackedRange) error {
  361. // CP
  362. cp.Magic = downloadCpMagic
  363. cp.FilePath = filePath
  364. cp.Object = objectKey
  365. // Object
  366. meta, err := bucket.GetObjectDetailedMeta(objectKey)
  367. if err != nil {
  368. return err
  369. }
  370. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  371. if err != nil {
  372. return err
  373. }
  374. cp.ObjStat.Size = objectSize
  375. cp.ObjStat.LastModified = meta.Get(HTTPHeaderLastModified)
  376. cp.ObjStat.Etag = meta.Get(HTTPHeaderEtag)
  377. // Parts
  378. cp.Parts, cp.enableCRC, cp.CRC, err = getDownloadParts(bucket, objectKey, partSize, uRange)
  379. if err != nil {
  380. return err
  381. }
  382. cp.PartStat = make([]bool, len(cp.Parts))
  383. for i := range cp.PartStat {
  384. cp.PartStat[i] = false
  385. }
  386. return nil
  387. }
  388. func (cp *downloadCheckpoint) complete(cpFilePath, downFilepath string) error {
  389. os.Remove(cpFilePath)
  390. return os.Rename(downFilepath, cp.FilePath)
  391. }
  392. // downloadFileWithCp downloads files with checkpoint.
  393. func (bucket Bucket) downloadFileWithCp(objectKey, filePath string, partSize int64, options []Option, cpFilePath string, routines int, uRange *unpackedRange) error {
  394. tempFilePath := filePath + TempFileSuffix
  395. listener := getProgressListener(options)
  396. // Load checkpoint data.
  397. dcp := downloadCheckpoint{}
  398. err := dcp.load(cpFilePath)
  399. if err != nil {
  400. os.Remove(cpFilePath)
  401. }
  402. // Load error or data invalid. Re-initialize the download.
  403. valid, err := dcp.isValid(&bucket, objectKey, uRange)
  404. if err != nil || !valid {
  405. if err = dcp.prepare(&bucket, objectKey, filePath, partSize, uRange); err != nil {
  406. return err
  407. }
  408. os.Remove(cpFilePath)
  409. }
  410. // Create the file if not exists. Otherwise the parts download will overwrite it.
  411. fd, err := os.OpenFile(tempFilePath, os.O_WRONLY|os.O_CREATE, FilePermMode)
  412. if err != nil {
  413. return err
  414. }
  415. fd.Close()
  416. // Unfinished parts
  417. parts := dcp.todoParts()
  418. jobs := make(chan downloadPart, len(parts))
  419. results := make(chan downloadPart, len(parts))
  420. failed := make(chan error)
  421. die := make(chan bool)
  422. completedBytes := dcp.getCompletedBytes()
  423. event := newProgressEvent(TransferStartedEvent, completedBytes, dcp.ObjStat.Size)
  424. publishProgress(listener, event)
  425. // Start the download workers routine
  426. arg := downloadWorkerArg{&bucket, objectKey, tempFilePath, options, downloadPartHooker, dcp.enableCRC}
  427. for w := 1; w <= routines; w++ {
  428. go downloadWorker(w, arg, jobs, results, failed, die)
  429. }
  430. // Concurrently downloads parts
  431. go downloadScheduler(jobs, parts)
  432. // Wait for the parts download finished
  433. completed := 0
  434. for completed < len(parts) {
  435. select {
  436. case part := <-results:
  437. completed++
  438. dcp.PartStat[part.Index] = true
  439. dcp.Parts[part.Index].CRC64 = part.CRC64
  440. dcp.dump(cpFilePath)
  441. completedBytes += (part.End - part.Start + 1)
  442. event = newProgressEvent(TransferDataEvent, completedBytes, dcp.ObjStat.Size)
  443. publishProgress(listener, event)
  444. case err := <-failed:
  445. close(die)
  446. event = newProgressEvent(TransferFailedEvent, completedBytes, dcp.ObjStat.Size)
  447. publishProgress(listener, event)
  448. return err
  449. }
  450. if completed >= len(parts) {
  451. break
  452. }
  453. }
  454. event = newProgressEvent(TransferCompletedEvent, completedBytes, dcp.ObjStat.Size)
  455. publishProgress(listener, event)
  456. if dcp.enableCRC {
  457. actualCRC := combineCRCInParts(dcp.Parts)
  458. err = checkDownloadCRC(actualCRC, dcp.CRC)
  459. if err != nil {
  460. return err
  461. }
  462. }
  463. return dcp.complete(cpFilePath, tempFilePath)
  464. }