download.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559
  1. package oss
  2. import (
  3. "crypto/md5"
  4. "encoding/base64"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "hash"
  9. "hash/crc64"
  10. "io"
  11. "io/ioutil"
  12. "net/http"
  13. "os"
  14. "path/filepath"
  15. "strconv"
  16. )
  17. // DownloadFile downloads files with multipart download.
  18. //
  19. // objectKey the object key.
  20. // filePath the local file to download from objectKey in OSS.
  21. // partSize the part size in bytes.
  22. // options object's constraints, check out GetObject for the reference.
  23. //
  24. // error it's nil when the call succeeds, otherwise it's an error object.
  25. //
  26. func (bucket Bucket) DownloadFile(objectKey, filePath string, partSize int64, options ...Option) error {
  27. if partSize < 1 {
  28. return errors.New("oss: part size smaller than 1")
  29. }
  30. uRange, err := getRangeConfig(options)
  31. if err != nil {
  32. return err
  33. }
  34. cpConf := getCpConfig(options)
  35. routines := getRoutines(options)
  36. if cpConf != nil && cpConf.IsEnable && cpConf.cpDir != "" {
  37. src := fmt.Sprintf("oss://%v/%v", bucket.BucketName, objectKey)
  38. absPath, _ := filepath.Abs(filePath)
  39. cpFileName := getCpFileName(src, absPath)
  40. cpFilePath := cpConf.cpDir + string(os.PathSeparator) + cpFileName
  41. return bucket.downloadFileWithCp(objectKey, filePath, partSize, options, cpFilePath, routines, uRange)
  42. }
  43. return bucket.downloadFile(objectKey, filePath, partSize, options, routines, uRange)
  44. }
  45. // getRangeConfig gets the download range from the options.
  46. func getRangeConfig(options []Option) (*unpackedRange, error) {
  47. rangeOpt, err := findOption(options, HTTPHeaderRange, nil)
  48. if err != nil || rangeOpt == nil {
  49. return nil, err
  50. }
  51. return parseRange(rangeOpt.(string))
  52. }
  53. // ----- concurrent download without checkpoint -----
  54. // downloadWorkerArg is download worker's parameters
  55. type downloadWorkerArg struct {
  56. bucket *Bucket
  57. key string
  58. filePath string
  59. options []Option
  60. hook downloadPartHook
  61. enableCRC bool
  62. }
  63. // downloadPartHook is hook for test
  64. type downloadPartHook func(part downloadPart) error
  65. var downloadPartHooker downloadPartHook = defaultDownloadPartHook
  66. func defaultDownloadPartHook(part downloadPart) error {
  67. return nil
  68. }
  69. // defaultDownloadProgressListener defines default ProgressListener, shields the ProgressListener in options of GetObject.
  70. type defaultDownloadProgressListener struct {
  71. }
  72. // ProgressChanged no-ops
  73. func (listener *defaultDownloadProgressListener) ProgressChanged(event *ProgressEvent) {
  74. }
  75. // downloadWorker
  76. func downloadWorker(id int, arg downloadWorkerArg, jobs <-chan downloadPart, results chan<- downloadPart, failed chan<- error, die <-chan bool) {
  77. for part := range jobs {
  78. if err := arg.hook(part); err != nil {
  79. failed <- err
  80. break
  81. }
  82. // Resolve options
  83. r := Range(part.Start, part.End)
  84. p := Progress(&defaultDownloadProgressListener{})
  85. opts := make([]Option, len(arg.options)+2)
  86. // Append orderly, can not be reversed!
  87. opts = append(opts, arg.options...)
  88. opts = append(opts, r, p)
  89. rd, err := arg.bucket.GetObject(arg.key, opts...)
  90. if err != nil {
  91. failed <- err
  92. break
  93. }
  94. defer rd.Close()
  95. var crcCalc hash.Hash64
  96. if arg.enableCRC {
  97. crcCalc = crc64.New(crcTable())
  98. contentLen := part.End - part.Start + 1
  99. rd = ioutil.NopCloser(TeeReader(rd, crcCalc, contentLen, nil, nil))
  100. }
  101. defer rd.Close()
  102. select {
  103. case <-die:
  104. return
  105. default:
  106. }
  107. fd, err := os.OpenFile(arg.filePath, os.O_WRONLY, FilePermMode)
  108. if err != nil {
  109. failed <- err
  110. break
  111. }
  112. _, err = fd.Seek(part.Start-part.Offset, os.SEEK_SET)
  113. if err != nil {
  114. fd.Close()
  115. failed <- err
  116. break
  117. }
  118. _, err = io.Copy(fd, rd)
  119. if err != nil {
  120. fd.Close()
  121. failed <- err
  122. break
  123. }
  124. if arg.enableCRC {
  125. part.CRC64 = crcCalc.Sum64()
  126. }
  127. fd.Close()
  128. results <- part
  129. }
  130. }
  131. // downloadScheduler
  132. func downloadScheduler(jobs chan downloadPart, parts []downloadPart) {
  133. for _, part := range parts {
  134. jobs <- part
  135. }
  136. close(jobs)
  137. }
  138. // downloadPart defines download part
  139. type downloadPart struct {
  140. Index int // Part number, starting from 0
  141. Start int64 // Start index
  142. End int64 // End index
  143. Offset int64 // Offset
  144. CRC64 uint64 // CRC check value of part
  145. }
  146. // getDownloadParts gets download parts
  147. func getDownloadParts(objectSize, partSize int64, uRange *unpackedRange) []downloadPart {
  148. parts := []downloadPart{}
  149. part := downloadPart{}
  150. i := 0
  151. start, end := adjustRange(uRange, objectSize)
  152. for offset := start; offset < end; offset += partSize {
  153. part.Index = i
  154. part.Start = offset
  155. part.End = GetPartEnd(offset, end, partSize)
  156. part.Offset = start
  157. part.CRC64 = 0
  158. parts = append(parts, part)
  159. i++
  160. }
  161. return parts
  162. }
  163. // getObjectBytes gets object bytes length
  164. func getObjectBytes(parts []downloadPart) int64 {
  165. var ob int64
  166. for _, part := range parts {
  167. ob += (part.End - part.Start + 1)
  168. }
  169. return ob
  170. }
  171. // combineCRCInParts caculates the total CRC of continuous parts
  172. func combineCRCInParts(dps []downloadPart) uint64 {
  173. if dps == nil || len(dps) == 0 {
  174. return 0
  175. }
  176. crc := dps[0].CRC64
  177. for i := 1; i < len(dps); i++ {
  178. crc = CRC64Combine(crc, dps[i].CRC64, (uint64)(dps[i].End-dps[i].Start+1))
  179. }
  180. return crc
  181. }
  182. // downloadFile downloads file concurrently without checkpoint.
  183. func (bucket Bucket) downloadFile(objectKey, filePath string, partSize int64, options []Option, routines int, uRange *unpackedRange) error {
  184. tempFilePath := filePath + TempFileSuffix
  185. listener := getProgressListener(options)
  186. payerOptions := []Option{}
  187. payer := getPayer(options)
  188. if payer != "" {
  189. payerOptions = append(payerOptions, RequestPayer(PayerType(payer)))
  190. }
  191. // If the file does not exist, create one. If exists, the download will overwrite it.
  192. fd, err := os.OpenFile(tempFilePath, os.O_WRONLY|os.O_CREATE, FilePermMode)
  193. if err != nil {
  194. return err
  195. }
  196. fd.Close()
  197. meta, err := bucket.GetObjectDetailedMeta(objectKey, payerOptions...)
  198. if err != nil {
  199. return err
  200. }
  201. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  202. if err != nil {
  203. return err
  204. }
  205. enableCRC := false
  206. expectedCRC := (uint64)(0)
  207. if bucket.getConfig().IsEnableCRC && meta.Get(HTTPHeaderOssCRC64) != "" {
  208. if uRange == nil || (!uRange.hasStart && !uRange.hasEnd) {
  209. enableCRC = true
  210. expectedCRC, _ = strconv.ParseUint(meta.Get(HTTPHeaderOssCRC64), 10, 0)
  211. }
  212. }
  213. // Get the parts of the file
  214. parts := getDownloadParts(objectSize, partSize, uRange)
  215. jobs := make(chan downloadPart, len(parts))
  216. results := make(chan downloadPart, len(parts))
  217. failed := make(chan error)
  218. die := make(chan bool)
  219. var completedBytes int64
  220. totalBytes := getObjectBytes(parts)
  221. event := newProgressEvent(TransferStartedEvent, 0, totalBytes)
  222. publishProgress(listener, event)
  223. // Start the download workers
  224. arg := downloadWorkerArg{&bucket, objectKey, tempFilePath, options, downloadPartHooker, enableCRC}
  225. for w := 1; w <= routines; w++ {
  226. go downloadWorker(w, arg, jobs, results, failed, die)
  227. }
  228. // Download parts concurrently
  229. go downloadScheduler(jobs, parts)
  230. // Waiting for parts download finished
  231. completed := 0
  232. for completed < len(parts) {
  233. select {
  234. case part := <-results:
  235. completed++
  236. completedBytes += (part.End - part.Start + 1)
  237. parts[part.Index].CRC64 = part.CRC64
  238. event = newProgressEvent(TransferDataEvent, completedBytes, totalBytes)
  239. publishProgress(listener, event)
  240. case err := <-failed:
  241. close(die)
  242. event = newProgressEvent(TransferFailedEvent, completedBytes, totalBytes)
  243. publishProgress(listener, event)
  244. return err
  245. }
  246. if completed >= len(parts) {
  247. break
  248. }
  249. }
  250. event = newProgressEvent(TransferCompletedEvent, completedBytes, totalBytes)
  251. publishProgress(listener, event)
  252. if enableCRC {
  253. actualCRC := combineCRCInParts(parts)
  254. err = checkDownloadCRC(actualCRC, expectedCRC)
  255. if err != nil {
  256. return err
  257. }
  258. }
  259. return os.Rename(tempFilePath, filePath)
  260. }
  261. // ----- Concurrent download with chcekpoint -----
  262. const downloadCpMagic = "92611BED-89E2-46B6-89E5-72F273D4B0A3"
  263. type downloadCheckpoint struct {
  264. Magic string // Magic
  265. MD5 string // Checkpoint content MD5
  266. FilePath string // Local file
  267. Object string // Key
  268. ObjStat objectStat // Object status
  269. Parts []downloadPart // All download parts
  270. PartStat []bool // Parts' download status
  271. Start int64 // Start point of the file
  272. End int64 // End point of the file
  273. enableCRC bool // Whether has CRC check
  274. CRC uint64 // CRC check value
  275. }
  276. type objectStat struct {
  277. Size int64 // Object size
  278. LastModified string // Last modified time
  279. Etag string // Etag
  280. }
  281. // isValid flags of checkpoint data is valid. It returns true when the data is valid and the checkpoint is valid and the object is not updated.
  282. func (cp downloadCheckpoint) isValid(meta http.Header, uRange *unpackedRange) (bool, error) {
  283. // Compare the CP's Magic and the MD5
  284. cpb := cp
  285. cpb.MD5 = ""
  286. js, _ := json.Marshal(cpb)
  287. sum := md5.Sum(js)
  288. b64 := base64.StdEncoding.EncodeToString(sum[:])
  289. if cp.Magic != downloadCpMagic || b64 != cp.MD5 {
  290. return false, nil
  291. }
  292. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  293. if err != nil {
  294. return false, err
  295. }
  296. // Compare the object size, last modified time and etag
  297. if cp.ObjStat.Size != objectSize ||
  298. cp.ObjStat.LastModified != meta.Get(HTTPHeaderLastModified) ||
  299. cp.ObjStat.Etag != meta.Get(HTTPHeaderEtag) {
  300. return false, nil
  301. }
  302. // Check the download range
  303. if uRange != nil {
  304. start, end := adjustRange(uRange, objectSize)
  305. if start != cp.Start || end != cp.End {
  306. return false, nil
  307. }
  308. }
  309. return true, nil
  310. }
  311. // load checkpoint from local file
  312. func (cp *downloadCheckpoint) load(filePath string) error {
  313. contents, err := ioutil.ReadFile(filePath)
  314. if err != nil {
  315. return err
  316. }
  317. err = json.Unmarshal(contents, cp)
  318. return err
  319. }
  320. // dump funciton dumps to file
  321. func (cp *downloadCheckpoint) dump(filePath string) error {
  322. bcp := *cp
  323. // Calculate MD5
  324. bcp.MD5 = ""
  325. js, err := json.Marshal(bcp)
  326. if err != nil {
  327. return err
  328. }
  329. sum := md5.Sum(js)
  330. b64 := base64.StdEncoding.EncodeToString(sum[:])
  331. bcp.MD5 = b64
  332. // Serialize
  333. js, err = json.Marshal(bcp)
  334. if err != nil {
  335. return err
  336. }
  337. // Dump
  338. return ioutil.WriteFile(filePath, js, FilePermMode)
  339. }
  340. // todoParts gets unfinished parts
  341. func (cp downloadCheckpoint) todoParts() []downloadPart {
  342. dps := []downloadPart{}
  343. for i, ps := range cp.PartStat {
  344. if !ps {
  345. dps = append(dps, cp.Parts[i])
  346. }
  347. }
  348. return dps
  349. }
  350. // getCompletedBytes gets completed size
  351. func (cp downloadCheckpoint) getCompletedBytes() int64 {
  352. var completedBytes int64
  353. for i, part := range cp.Parts {
  354. if cp.PartStat[i] {
  355. completedBytes += (part.End - part.Start + 1)
  356. }
  357. }
  358. return completedBytes
  359. }
  360. // prepare initiates download tasks
  361. func (cp *downloadCheckpoint) prepare(meta http.Header, bucket *Bucket, objectKey, filePath string, partSize int64, uRange *unpackedRange) error {
  362. // CP
  363. cp.Magic = downloadCpMagic
  364. cp.FilePath = filePath
  365. cp.Object = objectKey
  366. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  367. if err != nil {
  368. return err
  369. }
  370. cp.ObjStat.Size = objectSize
  371. cp.ObjStat.LastModified = meta.Get(HTTPHeaderLastModified)
  372. cp.ObjStat.Etag = meta.Get(HTTPHeaderEtag)
  373. if bucket.getConfig().IsEnableCRC && meta.Get(HTTPHeaderOssCRC64) != "" {
  374. if uRange == nil || (!uRange.hasStart && !uRange.hasEnd) {
  375. cp.enableCRC = true
  376. cp.CRC, _ = strconv.ParseUint(meta.Get(HTTPHeaderOssCRC64), 10, 0)
  377. }
  378. }
  379. // Parts
  380. cp.Parts = getDownloadParts(objectSize, partSize, uRange)
  381. cp.PartStat = make([]bool, len(cp.Parts))
  382. for i := range cp.PartStat {
  383. cp.PartStat[i] = false
  384. }
  385. return nil
  386. }
  387. func (cp *downloadCheckpoint) complete(cpFilePath, downFilepath string) error {
  388. os.Remove(cpFilePath)
  389. return os.Rename(downFilepath, cp.FilePath)
  390. }
  391. // downloadFileWithCp downloads files with checkpoint.
  392. func (bucket Bucket) downloadFileWithCp(objectKey, filePath string, partSize int64, options []Option, cpFilePath string, routines int, uRange *unpackedRange) error {
  393. tempFilePath := filePath + TempFileSuffix
  394. listener := getProgressListener(options)
  395. payerOptions := []Option{}
  396. payer := getPayer(options)
  397. if payer != "" {
  398. payerOptions = append(payerOptions, RequestPayer(PayerType(payer)))
  399. }
  400. // Load checkpoint data.
  401. dcp := downloadCheckpoint{}
  402. err := dcp.load(cpFilePath)
  403. if err != nil {
  404. os.Remove(cpFilePath)
  405. }
  406. // Get the object detailed meta.
  407. meta, err := bucket.GetObjectDetailedMeta(objectKey, payerOptions...)
  408. if err != nil {
  409. return err
  410. }
  411. // Load error or data invalid. Re-initialize the download.
  412. valid, err := dcp.isValid(meta, uRange)
  413. if err != nil || !valid {
  414. if err = dcp.prepare(meta, &bucket, objectKey, filePath, partSize, uRange); err != nil {
  415. return err
  416. }
  417. os.Remove(cpFilePath)
  418. }
  419. // Create the file if not exists. Otherwise the parts download will overwrite it.
  420. fd, err := os.OpenFile(tempFilePath, os.O_WRONLY|os.O_CREATE, FilePermMode)
  421. if err != nil {
  422. return err
  423. }
  424. fd.Close()
  425. // Unfinished parts
  426. parts := dcp.todoParts()
  427. jobs := make(chan downloadPart, len(parts))
  428. results := make(chan downloadPart, len(parts))
  429. failed := make(chan error)
  430. die := make(chan bool)
  431. completedBytes := dcp.getCompletedBytes()
  432. event := newProgressEvent(TransferStartedEvent, completedBytes, dcp.ObjStat.Size)
  433. publishProgress(listener, event)
  434. // Start the download workers routine
  435. arg := downloadWorkerArg{&bucket, objectKey, tempFilePath, options, downloadPartHooker, dcp.enableCRC}
  436. for w := 1; w <= routines; w++ {
  437. go downloadWorker(w, arg, jobs, results, failed, die)
  438. }
  439. // Concurrently downloads parts
  440. go downloadScheduler(jobs, parts)
  441. // Wait for the parts download finished
  442. completed := 0
  443. for completed < len(parts) {
  444. select {
  445. case part := <-results:
  446. completed++
  447. dcp.PartStat[part.Index] = true
  448. dcp.Parts[part.Index].CRC64 = part.CRC64
  449. dcp.dump(cpFilePath)
  450. completedBytes += (part.End - part.Start + 1)
  451. event = newProgressEvent(TransferDataEvent, completedBytes, dcp.ObjStat.Size)
  452. publishProgress(listener, event)
  453. case err := <-failed:
  454. close(die)
  455. event = newProgressEvent(TransferFailedEvent, completedBytes, dcp.ObjStat.Size)
  456. publishProgress(listener, event)
  457. return err
  458. }
  459. if completed >= len(parts) {
  460. break
  461. }
  462. }
  463. event = newProgressEvent(TransferCompletedEvent, completedBytes, dcp.ObjStat.Size)
  464. publishProgress(listener, event)
  465. if dcp.enableCRC {
  466. actualCRC := combineCRCInParts(dcp.Parts)
  467. err = checkDownloadCRC(actualCRC, dcp.CRC)
  468. if err != nil {
  469. return err
  470. }
  471. }
  472. return dcp.complete(cpFilePath, tempFilePath)
  473. }