download.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552
  1. package oss
  2. import (
  3. "crypto/md5"
  4. "encoding/base64"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "hash"
  9. "hash/crc64"
  10. "io"
  11. "io/ioutil"
  12. "os"
  13. "path/filepath"
  14. "strconv"
  15. )
  16. // DownloadFile downloads files with multipart download.
  17. //
  18. // objectKey the object key.
  19. // filePath the local file to download from objectKey in OSS.
  20. // partSize the part size in bytes.
  21. // options object's constraints, check out GetObject for the reference.
  22. //
  23. // error it's nil when the call succeeds, otherwise it's an error object.
  24. //
  25. func (bucket Bucket) DownloadFile(objectKey, filePath string, partSize int64, options ...Option) error {
  26. if partSize < 1 {
  27. return errors.New("oss: part size smaller than 1")
  28. }
  29. uRange, err := getRangeConfig(options)
  30. if err != nil {
  31. return err
  32. }
  33. cpConf := getCpConfig(options)
  34. routines := getRoutines(options)
  35. if cpConf != nil && cpConf.IsEnable && cpConf.cpDir != "" {
  36. src := fmt.Sprintf("oss://%v/%v", bucket.BucketName, objectKey)
  37. absPath, _ := filepath.Abs(filePath)
  38. cpFileName := getCpFileName(src, absPath)
  39. cpFilePath := cpConf.cpDir + string(os.PathSeparator) + cpFileName
  40. return bucket.downloadFileWithCp(objectKey, filePath, partSize, options, cpFilePath, routines, uRange)
  41. }
  42. return bucket.downloadFile(objectKey, filePath, partSize, options, routines, uRange)
  43. }
  44. // getRangeConfig gets the download range from the options.
  45. func getRangeConfig(options []Option) (*unpackedRange, error) {
  46. rangeOpt, err := findOption(options, HTTPHeaderRange, nil)
  47. if err != nil || rangeOpt == nil {
  48. return nil, err
  49. }
  50. return parseRange(rangeOpt.(string))
  51. }
  52. // ----- concurrent download without checkpoint -----
  53. // downloadWorkerArg is download worker's parameters
  54. type downloadWorkerArg struct {
  55. bucket *Bucket
  56. key string
  57. filePath string
  58. options []Option
  59. hook downloadPartHook
  60. enableCRC bool
  61. }
  62. // downloadPartHook is hook for test
  63. type downloadPartHook func(part downloadPart) error
  64. var downloadPartHooker downloadPartHook = defaultDownloadPartHook
  65. func defaultDownloadPartHook(part downloadPart) error {
  66. return nil
  67. }
  68. // defaultDownloadProgressListener defines default ProgressListener, shields the ProgressListener in options of GetObject.
  69. type defaultDownloadProgressListener struct {
  70. }
  71. // ProgressChanged no-ops
  72. func (listener *defaultDownloadProgressListener) ProgressChanged(event *ProgressEvent) {
  73. }
  74. // downloadWorker
  75. func downloadWorker(id int, arg downloadWorkerArg, jobs <-chan downloadPart, results chan<- downloadPart, failed chan<- error, die <-chan bool) {
  76. for part := range jobs {
  77. if err := arg.hook(part); err != nil {
  78. failed <- err
  79. break
  80. }
  81. // Resolve options
  82. r := Range(part.Start, part.End)
  83. p := Progress(&defaultDownloadProgressListener{})
  84. opts := make([]Option, len(arg.options)+2)
  85. // Append orderly, can not be reversed!
  86. opts = append(opts, arg.options...)
  87. opts = append(opts, r, p)
  88. rd, err := arg.bucket.GetObject(arg.key, opts...)
  89. if err != nil {
  90. failed <- err
  91. break
  92. }
  93. defer rd.Close()
  94. var crcCalc hash.Hash64
  95. if arg.enableCRC {
  96. crcCalc = crc64.New(crcTable())
  97. contentLen := part.End - part.Start + 1
  98. rd = ioutil.NopCloser(TeeReader(rd, crcCalc, contentLen, nil, nil))
  99. }
  100. defer rd.Close()
  101. select {
  102. case <-die:
  103. return
  104. default:
  105. }
  106. fd, err := os.OpenFile(arg.filePath, os.O_WRONLY, FilePermMode)
  107. if err != nil {
  108. failed <- err
  109. break
  110. }
  111. _, err = fd.Seek(part.Start-part.Offset, os.SEEK_SET)
  112. if err != nil {
  113. fd.Close()
  114. failed <- err
  115. break
  116. }
  117. _, err = io.Copy(fd, rd)
  118. if err != nil {
  119. fd.Close()
  120. failed <- err
  121. break
  122. }
  123. if arg.enableCRC {
  124. part.CRC64 = crcCalc.Sum64()
  125. }
  126. fd.Close()
  127. results <- part
  128. }
  129. }
  130. // downloadScheduler
  131. func downloadScheduler(jobs chan downloadPart, parts []downloadPart) {
  132. for _, part := range parts {
  133. jobs <- part
  134. }
  135. close(jobs)
  136. }
  137. // downloadPart defines download part
  138. type downloadPart struct {
  139. Index int // Part number, starting from 0
  140. Start int64 // Start index
  141. End int64 // End index
  142. Offset int64 // Offset
  143. CRC64 uint64 // CRC check value of part
  144. }
  145. // getDownloadParts gets download parts
  146. func getDownloadParts(objectSize, partSize int64, uRange *unpackedRange) []downloadPart {
  147. parts := []downloadPart{}
  148. part := downloadPart{}
  149. i := 0
  150. start, end := adjustRange(uRange, objectSize)
  151. for offset := start; offset < end; offset += partSize {
  152. part.Index = i
  153. part.Start = offset
  154. part.End = GetPartEnd(offset, end, partSize)
  155. part.Offset = start
  156. part.CRC64 = 0
  157. parts = append(parts, part)
  158. i++
  159. }
  160. return parts
  161. }
  162. // getObjectBytes gets object bytes length
  163. func getObjectBytes(parts []downloadPart) int64 {
  164. var ob int64
  165. for _, part := range parts {
  166. ob += (part.End - part.Start + 1)
  167. }
  168. return ob
  169. }
  170. // combineCRCInParts caculates the total CRC of continuous parts
  171. func combineCRCInParts(dps []downloadPart) uint64 {
  172. if dps == nil || len(dps) == 0 {
  173. return 0
  174. }
  175. crc := dps[0].CRC64
  176. for i := 1; i < len(dps); i++ {
  177. crc = CRC64Combine(crc, dps[i].CRC64, (uint64)(dps[i].End-dps[i].Start+1))
  178. }
  179. return crc
  180. }
  181. // downloadFile downloads file concurrently without checkpoint.
  182. func (bucket Bucket) downloadFile(objectKey, filePath string, partSize int64, options []Option, routines int, uRange *unpackedRange) error {
  183. tempFilePath := filePath + TempFileSuffix
  184. listener := getProgressListener(options)
  185. // If the file does not exist, create one. If exists, the download will overwrite it.
  186. fd, err := os.OpenFile(tempFilePath, os.O_WRONLY|os.O_CREATE, FilePermMode)
  187. if err != nil {
  188. return err
  189. }
  190. fd.Close()
  191. meta, err := bucket.GetObjectDetailedMeta(objectKey, options...)
  192. if err != nil {
  193. return err
  194. }
  195. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  196. if err != nil {
  197. return err
  198. }
  199. enableCRC := false
  200. expectedCRC := (uint64)(0)
  201. if bucket.getConfig().IsEnableCRC && meta.Get(HTTPHeaderOssCRC64) != "" {
  202. if uRange == nil || (!uRange.hasStart && !uRange.hasEnd) {
  203. enableCRC = true
  204. expectedCRC, _ = strconv.ParseUint(meta.Get(HTTPHeaderOssCRC64), 10, 0)
  205. }
  206. }
  207. // Get the parts of the file
  208. parts := getDownloadParts(objectSize, partSize, uRange)
  209. jobs := make(chan downloadPart, len(parts))
  210. results := make(chan downloadPart, len(parts))
  211. failed := make(chan error)
  212. die := make(chan bool)
  213. var completedBytes int64
  214. totalBytes := getObjectBytes(parts)
  215. event := newProgressEvent(TransferStartedEvent, 0, totalBytes)
  216. publishProgress(listener, event)
  217. // Start the download workers
  218. arg := downloadWorkerArg{&bucket, objectKey, tempFilePath, options, downloadPartHooker, enableCRC}
  219. for w := 1; w <= routines; w++ {
  220. go downloadWorker(w, arg, jobs, results, failed, die)
  221. }
  222. // Download parts concurrently
  223. go downloadScheduler(jobs, parts)
  224. // Waiting for parts download finished
  225. completed := 0
  226. for completed < len(parts) {
  227. select {
  228. case part := <-results:
  229. completed++
  230. completedBytes += (part.End - part.Start + 1)
  231. parts[part.Index].CRC64 = part.CRC64
  232. event = newProgressEvent(TransferDataEvent, completedBytes, totalBytes)
  233. publishProgress(listener, event)
  234. case err := <-failed:
  235. close(die)
  236. event = newProgressEvent(TransferFailedEvent, completedBytes, totalBytes)
  237. publishProgress(listener, event)
  238. return err
  239. }
  240. if completed >= len(parts) {
  241. break
  242. }
  243. }
  244. event = newProgressEvent(TransferCompletedEvent, completedBytes, totalBytes)
  245. publishProgress(listener, event)
  246. if enableCRC {
  247. actualCRC := combineCRCInParts(parts)
  248. err = checkDownloadCRC(actualCRC, expectedCRC)
  249. if err != nil {
  250. return err
  251. }
  252. }
  253. return os.Rename(tempFilePath, filePath)
  254. }
  255. // ----- Concurrent download with chcekpoint -----
  256. const downloadCpMagic = "92611BED-89E2-46B6-89E5-72F273D4B0A3"
  257. type downloadCheckpoint struct {
  258. Magic string // Magic
  259. MD5 string // Checkpoint content MD5
  260. FilePath string // Local file
  261. Object string // Key
  262. ObjStat objectStat // Object status
  263. Parts []downloadPart // All download parts
  264. PartStat []bool // Parts' download status
  265. Start int64 // Start point of the file
  266. End int64 // End point of the file
  267. enableCRC bool // Whether has CRC check
  268. CRC uint64 // CRC check value
  269. }
  270. type objectStat struct {
  271. Size int64 // Object size
  272. LastModified string // Last modified time
  273. Etag string // Etag
  274. }
  275. // isValid flags of checkpoint data is valid. It returns true when the data is valid and the checkpoint is valid and the object is not updated.
  276. func (cp downloadCheckpoint) isValid(bucket *Bucket, objectKey string, uRange *unpackedRange, options []Option) (bool, error) {
  277. // Compare the CP's Magic and the MD5
  278. cpb := cp
  279. cpb.MD5 = ""
  280. js, _ := json.Marshal(cpb)
  281. sum := md5.Sum(js)
  282. b64 := base64.StdEncoding.EncodeToString(sum[:])
  283. if cp.Magic != downloadCpMagic || b64 != cp.MD5 {
  284. return false, nil
  285. }
  286. // Ensure the object is not updated.
  287. meta, err := bucket.GetObjectDetailedMeta(objectKey, options...)
  288. if err != nil {
  289. return false, err
  290. }
  291. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  292. if err != nil {
  293. return false, err
  294. }
  295. // Compare the object size, last modified time and etag
  296. if cp.ObjStat.Size != objectSize ||
  297. cp.ObjStat.LastModified != meta.Get(HTTPHeaderLastModified) ||
  298. cp.ObjStat.Etag != meta.Get(HTTPHeaderEtag) {
  299. return false, nil
  300. }
  301. // Check the download range
  302. if uRange != nil {
  303. start, end := adjustRange(uRange, objectSize)
  304. if start != cp.Start || end != cp.End {
  305. return false, nil
  306. }
  307. }
  308. return true, nil
  309. }
  310. // load checkpoint from local file
  311. func (cp *downloadCheckpoint) load(filePath string) error {
  312. contents, err := ioutil.ReadFile(filePath)
  313. if err != nil {
  314. return err
  315. }
  316. err = json.Unmarshal(contents, cp)
  317. return err
  318. }
  319. // dump funciton dumps to file
  320. func (cp *downloadCheckpoint) dump(filePath string) error {
  321. bcp := *cp
  322. // Calculate MD5
  323. bcp.MD5 = ""
  324. js, err := json.Marshal(bcp)
  325. if err != nil {
  326. return err
  327. }
  328. sum := md5.Sum(js)
  329. b64 := base64.StdEncoding.EncodeToString(sum[:])
  330. bcp.MD5 = b64
  331. // Serialize
  332. js, err = json.Marshal(bcp)
  333. if err != nil {
  334. return err
  335. }
  336. // Dump
  337. return ioutil.WriteFile(filePath, js, FilePermMode)
  338. }
  339. // todoParts gets unfinished parts
  340. func (cp downloadCheckpoint) todoParts() []downloadPart {
  341. dps := []downloadPart{}
  342. for i, ps := range cp.PartStat {
  343. if !ps {
  344. dps = append(dps, cp.Parts[i])
  345. }
  346. }
  347. return dps
  348. }
  349. // getCompletedBytes gets completed size
  350. func (cp downloadCheckpoint) getCompletedBytes() int64 {
  351. var completedBytes int64
  352. for i, part := range cp.Parts {
  353. if cp.PartStat[i] {
  354. completedBytes += (part.End - part.Start + 1)
  355. }
  356. }
  357. return completedBytes
  358. }
  359. // prepare initiates download tasks
  360. func (cp *downloadCheckpoint) prepare(bucket *Bucket, objectKey, filePath string, partSize int64, uRange *unpackedRange, options []Option) error {
  361. // CP
  362. cp.Magic = downloadCpMagic
  363. cp.FilePath = filePath
  364. cp.Object = objectKey
  365. // Object
  366. meta, err := bucket.GetObjectDetailedMeta(objectKey, options...)
  367. if err != nil {
  368. return err
  369. }
  370. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  371. if err != nil {
  372. return err
  373. }
  374. cp.ObjStat.Size = objectSize
  375. cp.ObjStat.LastModified = meta.Get(HTTPHeaderLastModified)
  376. cp.ObjStat.Etag = meta.Get(HTTPHeaderEtag)
  377. if bucket.getConfig().IsEnableCRC && meta.Get(HTTPHeaderOssCRC64) != "" {
  378. if uRange == nil || (!uRange.hasStart && !uRange.hasEnd) {
  379. cp.enableCRC = true
  380. cp.CRC, _ = strconv.ParseUint(meta.Get(HTTPHeaderOssCRC64), 10, 0)
  381. }
  382. }
  383. // Parts
  384. cp.Parts = getDownloadParts(objectSize, partSize, uRange)
  385. cp.PartStat = make([]bool, len(cp.Parts))
  386. for i := range cp.PartStat {
  387. cp.PartStat[i] = false
  388. }
  389. return nil
  390. }
  391. func (cp *downloadCheckpoint) complete(cpFilePath, downFilepath string) error {
  392. os.Remove(cpFilePath)
  393. return os.Rename(downFilepath, cp.FilePath)
  394. }
  395. // downloadFileWithCp downloads files with checkpoint.
  396. func (bucket Bucket) downloadFileWithCp(objectKey, filePath string, partSize int64, options []Option, cpFilePath string, routines int, uRange *unpackedRange) error {
  397. tempFilePath := filePath + TempFileSuffix
  398. listener := getProgressListener(options)
  399. // Load checkpoint data.
  400. dcp := downloadCheckpoint{}
  401. err := dcp.load(cpFilePath)
  402. if err != nil {
  403. os.Remove(cpFilePath)
  404. }
  405. // Load error or data invalid. Re-initialize the download.
  406. valid, err := dcp.isValid(&bucket, objectKey, uRange, options)
  407. if err != nil || !valid {
  408. if err = dcp.prepare(&bucket, objectKey, filePath, partSize, uRange, options); err != nil {
  409. return err
  410. }
  411. os.Remove(cpFilePath)
  412. }
  413. // Create the file if not exists. Otherwise the parts download will overwrite it.
  414. fd, err := os.OpenFile(tempFilePath, os.O_WRONLY|os.O_CREATE, FilePermMode)
  415. if err != nil {
  416. return err
  417. }
  418. fd.Close()
  419. // Unfinished parts
  420. parts := dcp.todoParts()
  421. jobs := make(chan downloadPart, len(parts))
  422. results := make(chan downloadPart, len(parts))
  423. failed := make(chan error)
  424. die := make(chan bool)
  425. completedBytes := dcp.getCompletedBytes()
  426. event := newProgressEvent(TransferStartedEvent, completedBytes, dcp.ObjStat.Size)
  427. publishProgress(listener, event)
  428. // Start the download workers routine
  429. arg := downloadWorkerArg{&bucket, objectKey, tempFilePath, options, downloadPartHooker, dcp.enableCRC}
  430. for w := 1; w <= routines; w++ {
  431. go downloadWorker(w, arg, jobs, results, failed, die)
  432. }
  433. // Concurrently downloads parts
  434. go downloadScheduler(jobs, parts)
  435. // Wait for the parts download finished
  436. completed := 0
  437. for completed < len(parts) {
  438. select {
  439. case part := <-results:
  440. completed++
  441. dcp.PartStat[part.Index] = true
  442. dcp.Parts[part.Index].CRC64 = part.CRC64
  443. dcp.dump(cpFilePath)
  444. completedBytes += (part.End - part.Start + 1)
  445. event = newProgressEvent(TransferDataEvent, completedBytes, dcp.ObjStat.Size)
  446. publishProgress(listener, event)
  447. case err := <-failed:
  448. close(die)
  449. event = newProgressEvent(TransferFailedEvent, completedBytes, dcp.ObjStat.Size)
  450. publishProgress(listener, event)
  451. return err
  452. }
  453. if completed >= len(parts) {
  454. break
  455. }
  456. }
  457. event = newProgressEvent(TransferCompletedEvent, completedBytes, dcp.ObjStat.Size)
  458. publishProgress(listener, event)
  459. if dcp.enableCRC {
  460. actualCRC := combineCRCInParts(dcp.Parts)
  461. err = checkDownloadCRC(actualCRC, dcp.CRC)
  462. if err != nil {
  463. return err
  464. }
  465. }
  466. return dcp.complete(cpFilePath, tempFilePath)
  467. }