download.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568
  1. package oss
  2. import (
  3. "crypto/md5"
  4. "encoding/base64"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "hash"
  9. "hash/crc64"
  10. "io"
  11. "io/ioutil"
  12. "net/http"
  13. "os"
  14. "path/filepath"
  15. "strconv"
  16. )
  17. // DownloadFile downloads files with multipart download.
  18. //
  19. // objectKey the object key.
  20. // filePath the local file to download from objectKey in OSS.
  21. // partSize the part size in bytes.
  22. // options object's constraints, check out GetObject for the reference.
  23. //
  24. // error it's nil when the call succeeds, otherwise it's an error object.
  25. //
  26. func (bucket Bucket) DownloadFile(objectKey, filePath string, partSize int64, options ...Option) error {
  27. if partSize < 1 {
  28. return errors.New("oss: part size smaller than 1")
  29. }
  30. uRange, err := getRangeConfig(options)
  31. if err != nil {
  32. return err
  33. }
  34. cpConf := getCpConfig(options)
  35. routines := getRoutines(options)
  36. if cpConf != nil && cpConf.IsEnable {
  37. cpFilePath := getDownloadCpFilePath(cpConf, bucket.BucketName, objectKey, filePath)
  38. if cpFilePath != "" {
  39. return bucket.downloadFileWithCp(objectKey, filePath, partSize, options, cpFilePath, routines, uRange)
  40. }
  41. }
  42. return bucket.downloadFile(objectKey, filePath, partSize, options, routines, uRange)
  43. }
  44. func getDownloadCpFilePath(cpConf *cpConfig, srcBucket, srcObject, destFile string) string {
  45. if cpConf.FilePath == "" && cpConf.DirPath != "" {
  46. src := fmt.Sprintf("oss://%v/%v", srcBucket, srcObject)
  47. absPath, _ := filepath.Abs(destFile)
  48. cpFileName := getCpFileName(src, absPath)
  49. cpConf.FilePath = cpConf.DirPath + string(os.PathSeparator) + cpFileName
  50. }
  51. return cpConf.FilePath
  52. }
  53. // getRangeConfig gets the download range from the options.
  54. func getRangeConfig(options []Option) (*unpackedRange, error) {
  55. rangeOpt, err := findOption(options, HTTPHeaderRange, nil)
  56. if err != nil || rangeOpt == nil {
  57. return nil, err
  58. }
  59. return parseRange(rangeOpt.(string))
  60. }
  61. // ----- concurrent download without checkpoint -----
  62. // downloadWorkerArg is download worker's parameters
  63. type downloadWorkerArg struct {
  64. bucket *Bucket
  65. key string
  66. filePath string
  67. options []Option
  68. hook downloadPartHook
  69. enableCRC bool
  70. }
  71. // downloadPartHook is hook for test
  72. type downloadPartHook func(part downloadPart) error
  73. var downloadPartHooker downloadPartHook = defaultDownloadPartHook
  74. func defaultDownloadPartHook(part downloadPart) error {
  75. return nil
  76. }
  77. // defaultDownloadProgressListener defines default ProgressListener, shields the ProgressListener in options of GetObject.
  78. type defaultDownloadProgressListener struct {
  79. }
  80. // ProgressChanged no-ops
  81. func (listener *defaultDownloadProgressListener) ProgressChanged(event *ProgressEvent) {
  82. }
  83. // downloadWorker
  84. func downloadWorker(id int, arg downloadWorkerArg, jobs <-chan downloadPart, results chan<- downloadPart, failed chan<- error, die <-chan bool) {
  85. for part := range jobs {
  86. if err := arg.hook(part); err != nil {
  87. failed <- err
  88. break
  89. }
  90. // Resolve options
  91. r := Range(part.Start, part.End)
  92. p := Progress(&defaultDownloadProgressListener{})
  93. opts := make([]Option, len(arg.options)+2)
  94. // Append orderly, can not be reversed!
  95. opts = append(opts, arg.options...)
  96. opts = append(opts, r, p)
  97. rd, err := arg.bucket.GetObject(arg.key, opts...)
  98. if err != nil {
  99. failed <- err
  100. break
  101. }
  102. defer rd.Close()
  103. var crcCalc hash.Hash64
  104. if arg.enableCRC {
  105. crcCalc = crc64.New(crcTable())
  106. contentLen := part.End - part.Start + 1
  107. rd = ioutil.NopCloser(TeeReader(rd, crcCalc, contentLen, nil, nil))
  108. }
  109. defer rd.Close()
  110. select {
  111. case <-die:
  112. return
  113. default:
  114. }
  115. fd, err := os.OpenFile(arg.filePath, os.O_WRONLY, FilePermMode)
  116. if err != nil {
  117. failed <- err
  118. break
  119. }
  120. _, err = fd.Seek(part.Start-part.Offset, os.SEEK_SET)
  121. if err != nil {
  122. fd.Close()
  123. failed <- err
  124. break
  125. }
  126. _, err = io.Copy(fd, rd)
  127. if err != nil {
  128. fd.Close()
  129. failed <- err
  130. break
  131. }
  132. if arg.enableCRC {
  133. part.CRC64 = crcCalc.Sum64()
  134. }
  135. fd.Close()
  136. results <- part
  137. }
  138. }
  139. // downloadScheduler
  140. func downloadScheduler(jobs chan downloadPart, parts []downloadPart) {
  141. for _, part := range parts {
  142. jobs <- part
  143. }
  144. close(jobs)
  145. }
  146. // downloadPart defines download part
  147. type downloadPart struct {
  148. Index int // Part number, starting from 0
  149. Start int64 // Start index
  150. End int64 // End index
  151. Offset int64 // Offset
  152. CRC64 uint64 // CRC check value of part
  153. }
  154. // getDownloadParts gets download parts
  155. func getDownloadParts(objectSize, partSize int64, uRange *unpackedRange) []downloadPart {
  156. parts := []downloadPart{}
  157. part := downloadPart{}
  158. i := 0
  159. start, end := adjustRange(uRange, objectSize)
  160. for offset := start; offset < end; offset += partSize {
  161. part.Index = i
  162. part.Start = offset
  163. part.End = GetPartEnd(offset, end, partSize)
  164. part.Offset = start
  165. part.CRC64 = 0
  166. parts = append(parts, part)
  167. i++
  168. }
  169. return parts
  170. }
  171. // getObjectBytes gets object bytes length
  172. func getObjectBytes(parts []downloadPart) int64 {
  173. var ob int64
  174. for _, part := range parts {
  175. ob += (part.End - part.Start + 1)
  176. }
  177. return ob
  178. }
  179. // combineCRCInParts caculates the total CRC of continuous parts
  180. func combineCRCInParts(dps []downloadPart) uint64 {
  181. if dps == nil || len(dps) == 0 {
  182. return 0
  183. }
  184. crc := dps[0].CRC64
  185. for i := 1; i < len(dps); i++ {
  186. crc = CRC64Combine(crc, dps[i].CRC64, (uint64)(dps[i].End-dps[i].Start+1))
  187. }
  188. return crc
  189. }
  190. // downloadFile downloads file concurrently without checkpoint.
  191. func (bucket Bucket) downloadFile(objectKey, filePath string, partSize int64, options []Option, routines int, uRange *unpackedRange) error {
  192. tempFilePath := filePath + TempFileSuffix
  193. listener := getProgressListener(options)
  194. payerOptions := []Option{}
  195. payer := getPayer(options)
  196. if payer != "" {
  197. payerOptions = append(payerOptions, RequestPayer(PayerType(payer)))
  198. }
  199. // If the file does not exist, create one. If exists, the download will overwrite it.
  200. fd, err := os.OpenFile(tempFilePath, os.O_WRONLY|os.O_CREATE, FilePermMode)
  201. if err != nil {
  202. return err
  203. }
  204. fd.Close()
  205. meta, err := bucket.GetObjectDetailedMeta(objectKey, payerOptions...)
  206. if err != nil {
  207. return err
  208. }
  209. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  210. if err != nil {
  211. return err
  212. }
  213. enableCRC := false
  214. expectedCRC := (uint64)(0)
  215. if bucket.getConfig().IsEnableCRC && meta.Get(HTTPHeaderOssCRC64) != "" {
  216. if uRange == nil || (!uRange.hasStart && !uRange.hasEnd) {
  217. enableCRC = true
  218. expectedCRC, _ = strconv.ParseUint(meta.Get(HTTPHeaderOssCRC64), 10, 0)
  219. }
  220. }
  221. // Get the parts of the file
  222. parts := getDownloadParts(objectSize, partSize, uRange)
  223. jobs := make(chan downloadPart, len(parts))
  224. results := make(chan downloadPart, len(parts))
  225. failed := make(chan error)
  226. die := make(chan bool)
  227. var completedBytes int64
  228. totalBytes := getObjectBytes(parts)
  229. event := newProgressEvent(TransferStartedEvent, 0, totalBytes)
  230. publishProgress(listener, event)
  231. // Start the download workers
  232. arg := downloadWorkerArg{&bucket, objectKey, tempFilePath, options, downloadPartHooker, enableCRC}
  233. for w := 1; w <= routines; w++ {
  234. go downloadWorker(w, arg, jobs, results, failed, die)
  235. }
  236. // Download parts concurrently
  237. go downloadScheduler(jobs, parts)
  238. // Waiting for parts download finished
  239. completed := 0
  240. for completed < len(parts) {
  241. select {
  242. case part := <-results:
  243. completed++
  244. completedBytes += (part.End - part.Start + 1)
  245. parts[part.Index].CRC64 = part.CRC64
  246. event = newProgressEvent(TransferDataEvent, completedBytes, totalBytes)
  247. publishProgress(listener, event)
  248. case err := <-failed:
  249. close(die)
  250. event = newProgressEvent(TransferFailedEvent, completedBytes, totalBytes)
  251. publishProgress(listener, event)
  252. return err
  253. }
  254. if completed >= len(parts) {
  255. break
  256. }
  257. }
  258. event = newProgressEvent(TransferCompletedEvent, completedBytes, totalBytes)
  259. publishProgress(listener, event)
  260. if enableCRC {
  261. actualCRC := combineCRCInParts(parts)
  262. err = checkDownloadCRC(actualCRC, expectedCRC)
  263. if err != nil {
  264. return err
  265. }
  266. }
  267. return os.Rename(tempFilePath, filePath)
  268. }
  269. // ----- Concurrent download with chcekpoint -----
  270. const downloadCpMagic = "92611BED-89E2-46B6-89E5-72F273D4B0A3"
  271. type downloadCheckpoint struct {
  272. Magic string // Magic
  273. MD5 string // Checkpoint content MD5
  274. FilePath string // Local file
  275. Object string // Key
  276. ObjStat objectStat // Object status
  277. Parts []downloadPart // All download parts
  278. PartStat []bool // Parts' download status
  279. Start int64 // Start point of the file
  280. End int64 // End point of the file
  281. enableCRC bool // Whether has CRC check
  282. CRC uint64 // CRC check value
  283. }
  284. type objectStat struct {
  285. Size int64 // Object size
  286. LastModified string // Last modified time
  287. Etag string // Etag
  288. }
  289. // isValid flags of checkpoint data is valid. It returns true when the data is valid and the checkpoint is valid and the object is not updated.
  290. func (cp downloadCheckpoint) isValid(meta http.Header, uRange *unpackedRange) (bool, error) {
  291. // Compare the CP's Magic and the MD5
  292. cpb := cp
  293. cpb.MD5 = ""
  294. js, _ := json.Marshal(cpb)
  295. sum := md5.Sum(js)
  296. b64 := base64.StdEncoding.EncodeToString(sum[:])
  297. if cp.Magic != downloadCpMagic || b64 != cp.MD5 {
  298. return false, nil
  299. }
  300. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  301. if err != nil {
  302. return false, err
  303. }
  304. // Compare the object size, last modified time and etag
  305. if cp.ObjStat.Size != objectSize ||
  306. cp.ObjStat.LastModified != meta.Get(HTTPHeaderLastModified) ||
  307. cp.ObjStat.Etag != meta.Get(HTTPHeaderEtag) {
  308. return false, nil
  309. }
  310. // Check the download range
  311. if uRange != nil {
  312. start, end := adjustRange(uRange, objectSize)
  313. if start != cp.Start || end != cp.End {
  314. return false, nil
  315. }
  316. }
  317. return true, nil
  318. }
  319. // load checkpoint from local file
  320. func (cp *downloadCheckpoint) load(filePath string) error {
  321. contents, err := ioutil.ReadFile(filePath)
  322. if err != nil {
  323. return err
  324. }
  325. err = json.Unmarshal(contents, cp)
  326. return err
  327. }
  328. // dump funciton dumps to file
  329. func (cp *downloadCheckpoint) dump(filePath string) error {
  330. bcp := *cp
  331. // Calculate MD5
  332. bcp.MD5 = ""
  333. js, err := json.Marshal(bcp)
  334. if err != nil {
  335. return err
  336. }
  337. sum := md5.Sum(js)
  338. b64 := base64.StdEncoding.EncodeToString(sum[:])
  339. bcp.MD5 = b64
  340. // Serialize
  341. js, err = json.Marshal(bcp)
  342. if err != nil {
  343. return err
  344. }
  345. // Dump
  346. return ioutil.WriteFile(filePath, js, FilePermMode)
  347. }
  348. // todoParts gets unfinished parts
  349. func (cp downloadCheckpoint) todoParts() []downloadPart {
  350. dps := []downloadPart{}
  351. for i, ps := range cp.PartStat {
  352. if !ps {
  353. dps = append(dps, cp.Parts[i])
  354. }
  355. }
  356. return dps
  357. }
  358. // getCompletedBytes gets completed size
  359. func (cp downloadCheckpoint) getCompletedBytes() int64 {
  360. var completedBytes int64
  361. for i, part := range cp.Parts {
  362. if cp.PartStat[i] {
  363. completedBytes += (part.End - part.Start + 1)
  364. }
  365. }
  366. return completedBytes
  367. }
  368. // prepare initiates download tasks
  369. func (cp *downloadCheckpoint) prepare(meta http.Header, bucket *Bucket, objectKey, filePath string, partSize int64, uRange *unpackedRange) error {
  370. // CP
  371. cp.Magic = downloadCpMagic
  372. cp.FilePath = filePath
  373. cp.Object = objectKey
  374. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  375. if err != nil {
  376. return err
  377. }
  378. cp.ObjStat.Size = objectSize
  379. cp.ObjStat.LastModified = meta.Get(HTTPHeaderLastModified)
  380. cp.ObjStat.Etag = meta.Get(HTTPHeaderEtag)
  381. if bucket.getConfig().IsEnableCRC && meta.Get(HTTPHeaderOssCRC64) != "" {
  382. if uRange == nil || (!uRange.hasStart && !uRange.hasEnd) {
  383. cp.enableCRC = true
  384. cp.CRC, _ = strconv.ParseUint(meta.Get(HTTPHeaderOssCRC64), 10, 0)
  385. }
  386. }
  387. // Parts
  388. cp.Parts = getDownloadParts(objectSize, partSize, uRange)
  389. cp.PartStat = make([]bool, len(cp.Parts))
  390. for i := range cp.PartStat {
  391. cp.PartStat[i] = false
  392. }
  393. return nil
  394. }
  395. func (cp *downloadCheckpoint) complete(cpFilePath, downFilepath string) error {
  396. os.Remove(cpFilePath)
  397. return os.Rename(downFilepath, cp.FilePath)
  398. }
  399. // downloadFileWithCp downloads files with checkpoint.
  400. func (bucket Bucket) downloadFileWithCp(objectKey, filePath string, partSize int64, options []Option, cpFilePath string, routines int, uRange *unpackedRange) error {
  401. tempFilePath := filePath + TempFileSuffix
  402. listener := getProgressListener(options)
  403. payerOptions := []Option{}
  404. payer := getPayer(options)
  405. if payer != "" {
  406. payerOptions = append(payerOptions, RequestPayer(PayerType(payer)))
  407. }
  408. // Load checkpoint data.
  409. dcp := downloadCheckpoint{}
  410. err := dcp.load(cpFilePath)
  411. if err != nil {
  412. os.Remove(cpFilePath)
  413. }
  414. // Get the object detailed meta.
  415. meta, err := bucket.GetObjectDetailedMeta(objectKey, payerOptions...)
  416. if err != nil {
  417. return err
  418. }
  419. // Load error or data invalid. Re-initialize the download.
  420. valid, err := dcp.isValid(meta, uRange)
  421. if err != nil || !valid {
  422. if err = dcp.prepare(meta, &bucket, objectKey, filePath, partSize, uRange); err != nil {
  423. return err
  424. }
  425. os.Remove(cpFilePath)
  426. }
  427. // Create the file if not exists. Otherwise the parts download will overwrite it.
  428. fd, err := os.OpenFile(tempFilePath, os.O_WRONLY|os.O_CREATE, FilePermMode)
  429. if err != nil {
  430. return err
  431. }
  432. fd.Close()
  433. // Unfinished parts
  434. parts := dcp.todoParts()
  435. jobs := make(chan downloadPart, len(parts))
  436. results := make(chan downloadPart, len(parts))
  437. failed := make(chan error)
  438. die := make(chan bool)
  439. completedBytes := dcp.getCompletedBytes()
  440. event := newProgressEvent(TransferStartedEvent, completedBytes, dcp.ObjStat.Size)
  441. publishProgress(listener, event)
  442. // Start the download workers routine
  443. arg := downloadWorkerArg{&bucket, objectKey, tempFilePath, options, downloadPartHooker, dcp.enableCRC}
  444. for w := 1; w <= routines; w++ {
  445. go downloadWorker(w, arg, jobs, results, failed, die)
  446. }
  447. // Concurrently downloads parts
  448. go downloadScheduler(jobs, parts)
  449. // Wait for the parts download finished
  450. completed := 0
  451. for completed < len(parts) {
  452. select {
  453. case part := <-results:
  454. completed++
  455. dcp.PartStat[part.Index] = true
  456. dcp.Parts[part.Index].CRC64 = part.CRC64
  457. dcp.dump(cpFilePath)
  458. completedBytes += (part.End - part.Start + 1)
  459. event = newProgressEvent(TransferDataEvent, completedBytes, dcp.ObjStat.Size)
  460. publishProgress(listener, event)
  461. case err := <-failed:
  462. close(die)
  463. event = newProgressEvent(TransferFailedEvent, completedBytes, dcp.ObjStat.Size)
  464. publishProgress(listener, event)
  465. return err
  466. }
  467. if completed >= len(parts) {
  468. break
  469. }
  470. }
  471. event = newProgressEvent(TransferCompletedEvent, completedBytes, dcp.ObjStat.Size)
  472. publishProgress(listener, event)
  473. if dcp.enableCRC {
  474. actualCRC := combineCRCInParts(dcp.Parts)
  475. err = checkDownloadCRC(actualCRC, dcp.CRC)
  476. if err != nil {
  477. return err
  478. }
  479. }
  480. return dcp.complete(cpFilePath, tempFilePath)
  481. }