download.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563
  1. package oss
  2. import (
  3. "crypto/md5"
  4. "encoding/base64"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "hash"
  9. "hash/crc64"
  10. "io"
  11. "io/ioutil"
  12. "net/http"
  13. "os"
  14. "path/filepath"
  15. "strconv"
  16. )
  17. // DownloadFile downloads files with multipart download.
  18. //
  19. // objectKey the object key.
  20. // filePath the local file to download from objectKey in OSS.
  21. // partSize the part size in bytes.
  22. // options object's constraints, check out GetObject for the reference.
  23. //
  24. // error it's nil when the call succeeds, otherwise it's an error object.
  25. //
  26. func (bucket Bucket) DownloadFile(objectKey, filePath string, partSize int64, options ...Option) error {
  27. if partSize < 1 {
  28. return errors.New("oss: part size smaller than 1")
  29. }
  30. uRange, err := getRangeConfig(options)
  31. if err != nil {
  32. return err
  33. }
  34. cpConf := getCpConfig(options)
  35. routines := getRoutines(options)
  36. if cpConf != nil && cpConf.IsEnable {
  37. cpFilePath := getDownloadCpFilePath(cpConf, bucket.BucketName, objectKey, filePath)
  38. if cpFilePath != "" {
  39. return bucket.downloadFileWithCp(objectKey, filePath, partSize, options, cpFilePath, routines, uRange)
  40. }
  41. }
  42. return bucket.downloadFile(objectKey, filePath, partSize, options, routines, uRange)
  43. }
  44. func getDownloadCpFilePath(cpConf *cpConfig, srcBucket, srcObject, destFile string) string {
  45. if cpConf.FilePath == "" && cpConf.DirPath != "" {
  46. src := fmt.Sprintf("oss://%v/%v", srcBucket, srcObject)
  47. absPath, _ := filepath.Abs(destFile)
  48. cpFileName := getCpFileName(src, absPath)
  49. cpConf.FilePath = cpConf.DirPath + string(os.PathSeparator) + cpFileName
  50. }
  51. return cpConf.FilePath
  52. }
  53. // getRangeConfig gets the download range from the options.
  54. func getRangeConfig(options []Option) (*unpackedRange, error) {
  55. rangeOpt, err := findOption(options, HTTPHeaderRange, nil)
  56. if err != nil || rangeOpt == nil {
  57. return nil, err
  58. }
  59. return parseRange(rangeOpt.(string))
  60. }
  61. // ----- concurrent download without checkpoint -----
  62. // downloadWorkerArg is download worker's parameters
  63. type downloadWorkerArg struct {
  64. bucket *Bucket
  65. key string
  66. filePath string
  67. options []Option
  68. hook downloadPartHook
  69. enableCRC bool
  70. }
  71. // downloadPartHook is hook for test
  72. type downloadPartHook func(part downloadPart) error
  73. var downloadPartHooker downloadPartHook = defaultDownloadPartHook
  74. func defaultDownloadPartHook(part downloadPart) error {
  75. return nil
  76. }
  77. // defaultDownloadProgressListener defines default ProgressListener, shields the ProgressListener in options of GetObject.
  78. type defaultDownloadProgressListener struct {
  79. }
  80. // ProgressChanged no-ops
  81. func (listener *defaultDownloadProgressListener) ProgressChanged(event *ProgressEvent) {
  82. }
  83. // downloadWorker
  84. func downloadWorker(id int, arg downloadWorkerArg, jobs <-chan downloadPart, results chan<- downloadPart, failed chan<- error, die <-chan bool) {
  85. for part := range jobs {
  86. if err := arg.hook(part); err != nil {
  87. failed <- err
  88. break
  89. }
  90. // Resolve options
  91. r := Range(part.Start, part.End)
  92. p := Progress(&defaultDownloadProgressListener{})
  93. opts := make([]Option, len(arg.options)+2)
  94. // Append orderly, can not be reversed!
  95. opts = append(opts, arg.options...)
  96. opts = append(opts, r, p)
  97. rd, err := arg.bucket.GetObject(arg.key, opts...)
  98. if err != nil {
  99. failed <- err
  100. break
  101. }
  102. defer rd.Close()
  103. var crcCalc hash.Hash64
  104. if arg.enableCRC {
  105. crcCalc = crc64.New(crcTable())
  106. contentLen := part.End - part.Start + 1
  107. rd = ioutil.NopCloser(TeeReader(rd, crcCalc, contentLen, nil, nil))
  108. }
  109. defer rd.Close()
  110. select {
  111. case <-die:
  112. return
  113. default:
  114. }
  115. fd, err := os.OpenFile(arg.filePath, os.O_WRONLY, FilePermMode)
  116. if err != nil {
  117. failed <- err
  118. break
  119. }
  120. _, err = fd.Seek(part.Start-part.Offset, os.SEEK_SET)
  121. if err != nil {
  122. fd.Close()
  123. failed <- err
  124. break
  125. }
  126. _, err = io.Copy(fd, rd)
  127. if err != nil {
  128. fd.Close()
  129. failed <- err
  130. break
  131. }
  132. if arg.enableCRC {
  133. part.CRC64 = crcCalc.Sum64()
  134. }
  135. fd.Close()
  136. results <- part
  137. }
  138. }
  139. // downloadScheduler
  140. func downloadScheduler(jobs chan downloadPart, parts []downloadPart) {
  141. for _, part := range parts {
  142. jobs <- part
  143. }
  144. close(jobs)
  145. }
  146. // downloadPart defines download part
  147. type downloadPart struct {
  148. Index int // Part number, starting from 0
  149. Start int64 // Start index
  150. End int64 // End index
  151. Offset int64 // Offset
  152. CRC64 uint64 // CRC check value of part
  153. }
  154. // getDownloadParts gets download parts
  155. func getDownloadParts(objectSize, partSize int64, uRange *unpackedRange) []downloadPart {
  156. parts := []downloadPart{}
  157. part := downloadPart{}
  158. i := 0
  159. start, end := adjustRange(uRange, objectSize)
  160. for offset := start; offset < end; offset += partSize {
  161. part.Index = i
  162. part.Start = offset
  163. part.End = GetPartEnd(offset, end, partSize)
  164. part.Offset = start
  165. part.CRC64 = 0
  166. parts = append(parts, part)
  167. i++
  168. }
  169. return parts
  170. }
  171. // getObjectBytes gets object bytes length
  172. func getObjectBytes(parts []downloadPart) int64 {
  173. var ob int64
  174. for _, part := range parts {
  175. ob += (part.End - part.Start + 1)
  176. }
  177. return ob
  178. }
  179. // combineCRCInParts caculates the total CRC of continuous parts
  180. func combineCRCInParts(dps []downloadPart) uint64 {
  181. if dps == nil || len(dps) == 0 {
  182. return 0
  183. }
  184. crc := dps[0].CRC64
  185. for i := 1; i < len(dps); i++ {
  186. crc = CRC64Combine(crc, dps[i].CRC64, (uint64)(dps[i].End-dps[i].Start+1))
  187. }
  188. return crc
  189. }
  190. // downloadFile downloads file concurrently without checkpoint.
  191. func (bucket Bucket) downloadFile(objectKey, filePath string, partSize int64, options []Option, routines int, uRange *unpackedRange) error {
  192. tempFilePath := filePath + TempFileSuffix
  193. listener := getProgressListener(options)
  194. // If the file does not exist, create one. If exists, the download will overwrite it.
  195. fd, err := os.OpenFile(tempFilePath, os.O_WRONLY|os.O_CREATE, FilePermMode)
  196. if err != nil {
  197. return err
  198. }
  199. fd.Close()
  200. // Get the object detailed meta for object whole size
  201. // must delete header:range to get whole object size
  202. skipOptions := deleteOption(options, HTTPHeaderRange)
  203. meta, err := bucket.GetObjectDetailedMeta(objectKey, skipOptions...)
  204. if err != nil {
  205. return err
  206. }
  207. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  208. if err != nil {
  209. return err
  210. }
  211. enableCRC := false
  212. expectedCRC := (uint64)(0)
  213. if bucket.getConfig().IsEnableCRC && meta.Get(HTTPHeaderOssCRC64) != "" {
  214. if uRange == nil || (!uRange.hasStart && !uRange.hasEnd) {
  215. enableCRC = true
  216. expectedCRC, _ = strconv.ParseUint(meta.Get(HTTPHeaderOssCRC64), 10, 0)
  217. }
  218. }
  219. // Get the parts of the file
  220. parts := getDownloadParts(objectSize, partSize, uRange)
  221. jobs := make(chan downloadPart, len(parts))
  222. results := make(chan downloadPart, len(parts))
  223. failed := make(chan error)
  224. die := make(chan bool)
  225. var completedBytes int64
  226. totalBytes := getObjectBytes(parts)
  227. event := newProgressEvent(TransferStartedEvent, 0, totalBytes, 0)
  228. publishProgress(listener, event)
  229. // Start the download workers
  230. arg := downloadWorkerArg{&bucket, objectKey, tempFilePath, options, downloadPartHooker, enableCRC}
  231. for w := 1; w <= routines; w++ {
  232. go downloadWorker(w, arg, jobs, results, failed, die)
  233. }
  234. // Download parts concurrently
  235. go downloadScheduler(jobs, parts)
  236. // Waiting for parts download finished
  237. completed := 0
  238. for completed < len(parts) {
  239. select {
  240. case part := <-results:
  241. completed++
  242. downBytes := (part.End - part.Start + 1)
  243. completedBytes += downBytes
  244. parts[part.Index].CRC64 = part.CRC64
  245. event = newProgressEvent(TransferDataEvent, completedBytes, totalBytes, downBytes)
  246. publishProgress(listener, event)
  247. case err := <-failed:
  248. close(die)
  249. event = newProgressEvent(TransferFailedEvent, completedBytes, totalBytes, 0)
  250. publishProgress(listener, event)
  251. return err
  252. }
  253. if completed >= len(parts) {
  254. break
  255. }
  256. }
  257. event = newProgressEvent(TransferCompletedEvent, completedBytes, totalBytes, 0)
  258. publishProgress(listener, event)
  259. if enableCRC {
  260. actualCRC := combineCRCInParts(parts)
  261. err = checkDownloadCRC(actualCRC, expectedCRC)
  262. if err != nil {
  263. return err
  264. }
  265. }
  266. return os.Rename(tempFilePath, filePath)
  267. }
  268. // ----- Concurrent download with chcekpoint -----
  269. const downloadCpMagic = "92611BED-89E2-46B6-89E5-72F273D4B0A3"
  270. type downloadCheckpoint struct {
  271. Magic string // Magic
  272. MD5 string // Checkpoint content MD5
  273. FilePath string // Local file
  274. Object string // Key
  275. ObjStat objectStat // Object status
  276. Parts []downloadPart // All download parts
  277. PartStat []bool // Parts' download status
  278. Start int64 // Start point of the file
  279. End int64 // End point of the file
  280. enableCRC bool // Whether has CRC check
  281. CRC uint64 // CRC check value
  282. }
  283. type objectStat struct {
  284. Size int64 // Object size
  285. LastModified string // Last modified time
  286. Etag string // Etag
  287. }
  288. // isValid flags of checkpoint data is valid. It returns true when the data is valid and the checkpoint is valid and the object is not updated.
  289. func (cp downloadCheckpoint) isValid(meta http.Header, uRange *unpackedRange) (bool, error) {
  290. // Compare the CP's Magic and the MD5
  291. cpb := cp
  292. cpb.MD5 = ""
  293. js, _ := json.Marshal(cpb)
  294. sum := md5.Sum(js)
  295. b64 := base64.StdEncoding.EncodeToString(sum[:])
  296. if cp.Magic != downloadCpMagic || b64 != cp.MD5 {
  297. return false, nil
  298. }
  299. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  300. if err != nil {
  301. return false, err
  302. }
  303. // Compare the object size, last modified time and etag
  304. if cp.ObjStat.Size != objectSize ||
  305. cp.ObjStat.LastModified != meta.Get(HTTPHeaderLastModified) ||
  306. cp.ObjStat.Etag != meta.Get(HTTPHeaderEtag) {
  307. return false, nil
  308. }
  309. // Check the download range
  310. if uRange != nil {
  311. start, end := adjustRange(uRange, objectSize)
  312. if start != cp.Start || end != cp.End {
  313. return false, nil
  314. }
  315. }
  316. return true, nil
  317. }
  318. // load checkpoint from local file
  319. func (cp *downloadCheckpoint) load(filePath string) error {
  320. contents, err := ioutil.ReadFile(filePath)
  321. if err != nil {
  322. return err
  323. }
  324. err = json.Unmarshal(contents, cp)
  325. return err
  326. }
  327. // dump funciton dumps to file
  328. func (cp *downloadCheckpoint) dump(filePath string) error {
  329. bcp := *cp
  330. // Calculate MD5
  331. bcp.MD5 = ""
  332. js, err := json.Marshal(bcp)
  333. if err != nil {
  334. return err
  335. }
  336. sum := md5.Sum(js)
  337. b64 := base64.StdEncoding.EncodeToString(sum[:])
  338. bcp.MD5 = b64
  339. // Serialize
  340. js, err = json.Marshal(bcp)
  341. if err != nil {
  342. return err
  343. }
  344. // Dump
  345. return ioutil.WriteFile(filePath, js, FilePermMode)
  346. }
  347. // todoParts gets unfinished parts
  348. func (cp downloadCheckpoint) todoParts() []downloadPart {
  349. dps := []downloadPart{}
  350. for i, ps := range cp.PartStat {
  351. if !ps {
  352. dps = append(dps, cp.Parts[i])
  353. }
  354. }
  355. return dps
  356. }
  357. // getCompletedBytes gets completed size
  358. func (cp downloadCheckpoint) getCompletedBytes() int64 {
  359. var completedBytes int64
  360. for i, part := range cp.Parts {
  361. if cp.PartStat[i] {
  362. completedBytes += (part.End - part.Start + 1)
  363. }
  364. }
  365. return completedBytes
  366. }
  367. // prepare initiates download tasks
  368. func (cp *downloadCheckpoint) prepare(meta http.Header, bucket *Bucket, objectKey, filePath string, partSize int64, uRange *unpackedRange) error {
  369. // CP
  370. cp.Magic = downloadCpMagic
  371. cp.FilePath = filePath
  372. cp.Object = objectKey
  373. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  374. if err != nil {
  375. return err
  376. }
  377. cp.ObjStat.Size = objectSize
  378. cp.ObjStat.LastModified = meta.Get(HTTPHeaderLastModified)
  379. cp.ObjStat.Etag = meta.Get(HTTPHeaderEtag)
  380. if bucket.getConfig().IsEnableCRC && meta.Get(HTTPHeaderOssCRC64) != "" {
  381. if uRange == nil || (!uRange.hasStart && !uRange.hasEnd) {
  382. cp.enableCRC = true
  383. cp.CRC, _ = strconv.ParseUint(meta.Get(HTTPHeaderOssCRC64), 10, 0)
  384. }
  385. }
  386. // Parts
  387. cp.Parts = getDownloadParts(objectSize, partSize, uRange)
  388. cp.PartStat = make([]bool, len(cp.Parts))
  389. for i := range cp.PartStat {
  390. cp.PartStat[i] = false
  391. }
  392. return nil
  393. }
  394. func (cp *downloadCheckpoint) complete(cpFilePath, downFilepath string) error {
  395. os.Remove(cpFilePath)
  396. return os.Rename(downFilepath, cp.FilePath)
  397. }
  398. // downloadFileWithCp downloads files with checkpoint.
  399. func (bucket Bucket) downloadFileWithCp(objectKey, filePath string, partSize int64, options []Option, cpFilePath string, routines int, uRange *unpackedRange) error {
  400. tempFilePath := filePath + TempFileSuffix
  401. listener := getProgressListener(options)
  402. // Load checkpoint data.
  403. dcp := downloadCheckpoint{}
  404. err := dcp.load(cpFilePath)
  405. if err != nil {
  406. os.Remove(cpFilePath)
  407. }
  408. // Get the object detailed meta for object whole size
  409. // must delete header:range to get whole object size
  410. skipOptions := deleteOption(options, HTTPHeaderRange)
  411. meta, err := bucket.GetObjectDetailedMeta(objectKey, skipOptions...)
  412. if err != nil {
  413. return err
  414. }
  415. // Load error or data invalid. Re-initialize the download.
  416. valid, err := dcp.isValid(meta, uRange)
  417. if err != nil || !valid {
  418. if err = dcp.prepare(meta, &bucket, objectKey, filePath, partSize, uRange); err != nil {
  419. return err
  420. }
  421. os.Remove(cpFilePath)
  422. }
  423. // Create the file if not exists. Otherwise the parts download will overwrite it.
  424. fd, err := os.OpenFile(tempFilePath, os.O_WRONLY|os.O_CREATE, FilePermMode)
  425. if err != nil {
  426. return err
  427. }
  428. fd.Close()
  429. // Unfinished parts
  430. parts := dcp.todoParts()
  431. jobs := make(chan downloadPart, len(parts))
  432. results := make(chan downloadPart, len(parts))
  433. failed := make(chan error)
  434. die := make(chan bool)
  435. completedBytes := dcp.getCompletedBytes()
  436. event := newProgressEvent(TransferStartedEvent, completedBytes, dcp.ObjStat.Size, 0)
  437. publishProgress(listener, event)
  438. // Start the download workers routine
  439. arg := downloadWorkerArg{&bucket, objectKey, tempFilePath, options, downloadPartHooker, dcp.enableCRC}
  440. for w := 1; w <= routines; w++ {
  441. go downloadWorker(w, arg, jobs, results, failed, die)
  442. }
  443. // Concurrently downloads parts
  444. go downloadScheduler(jobs, parts)
  445. // Wait for the parts download finished
  446. completed := 0
  447. for completed < len(parts) {
  448. select {
  449. case part := <-results:
  450. completed++
  451. dcp.PartStat[part.Index] = true
  452. dcp.Parts[part.Index].CRC64 = part.CRC64
  453. dcp.dump(cpFilePath)
  454. downBytes := (part.End - part.Start + 1)
  455. completedBytes += downBytes
  456. event = newProgressEvent(TransferDataEvent, completedBytes, dcp.ObjStat.Size, downBytes)
  457. publishProgress(listener, event)
  458. case err := <-failed:
  459. close(die)
  460. event = newProgressEvent(TransferFailedEvent, completedBytes, dcp.ObjStat.Size, 0)
  461. publishProgress(listener, event)
  462. return err
  463. }
  464. if completed >= len(parts) {
  465. break
  466. }
  467. }
  468. event = newProgressEvent(TransferCompletedEvent, completedBytes, dcp.ObjStat.Size, 0)
  469. publishProgress(listener, event)
  470. if dcp.enableCRC {
  471. actualCRC := combineCRCInParts(dcp.Parts)
  472. err = checkDownloadCRC(actualCRC, dcp.CRC)
  473. if err != nil {
  474. return err
  475. }
  476. }
  477. return dcp.complete(cpFilePath, tempFilePath)
  478. }