download.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561
  1. package oss
  2. import (
  3. "crypto/md5"
  4. "encoding/base64"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "hash"
  9. "hash/crc64"
  10. "io"
  11. "io/ioutil"
  12. "net/http"
  13. "os"
  14. "path/filepath"
  15. "strconv"
  16. )
  17. // DownloadFile downloads files with multipart download.
  18. //
  19. // objectKey the object key.
  20. // filePath the local file to download from objectKey in OSS.
  21. // partSize the part size in bytes.
  22. // options object's constraints, check out GetObject for the reference.
  23. //
  24. // error it's nil when the call succeeds, otherwise it's an error object.
  25. //
  26. func (bucket Bucket) DownloadFile(objectKey, filePath string, partSize int64, options ...Option) error {
  27. if partSize < 1 {
  28. return errors.New("oss: part size smaller than 1")
  29. }
  30. uRange, err := getRangeConfig(options)
  31. if err != nil {
  32. return err
  33. }
  34. cpConf := getCpConfig(options)
  35. routines := getRoutines(options)
  36. if cpConf != nil && cpConf.IsEnable {
  37. cpFilePath := getDownloadCpFilePath(cpConf, bucket.BucketName, objectKey, filePath)
  38. if cpFilePath != "" {
  39. return bucket.downloadFileWithCp(objectKey, filePath, partSize, options, cpFilePath, routines, uRange)
  40. }
  41. }
  42. return bucket.downloadFile(objectKey, filePath, partSize, options, routines, uRange)
  43. }
  44. func getDownloadCpFilePath(cpConf *cpConfig, srcBucket, srcObject, destFile string) string {
  45. if cpConf.FilePath == "" && cpConf.DirPath != "" {
  46. src := fmt.Sprintf("oss://%v/%v", srcBucket, srcObject)
  47. absPath, _ := filepath.Abs(destFile)
  48. cpFileName := getCpFileName(src, absPath)
  49. cpConf.FilePath = cpConf.DirPath + string(os.PathSeparator) + cpFileName
  50. }
  51. return cpConf.FilePath
  52. }
  53. // getRangeConfig gets the download range from the options.
  54. func getRangeConfig(options []Option) (*unpackedRange, error) {
  55. rangeOpt, err := findOption(options, HTTPHeaderRange, nil)
  56. if err != nil || rangeOpt == nil {
  57. return nil, err
  58. }
  59. return parseRange(rangeOpt.(string))
  60. }
  61. // ----- concurrent download without checkpoint -----
  62. // downloadWorkerArg is download worker's parameters
  63. type downloadWorkerArg struct {
  64. bucket *Bucket
  65. key string
  66. filePath string
  67. options []Option
  68. hook downloadPartHook
  69. enableCRC bool
  70. }
  71. // downloadPartHook is hook for test
  72. type downloadPartHook func(part downloadPart) error
  73. var downloadPartHooker downloadPartHook = defaultDownloadPartHook
  74. func defaultDownloadPartHook(part downloadPart) error {
  75. return nil
  76. }
  77. // defaultDownloadProgressListener defines default ProgressListener, shields the ProgressListener in options of GetObject.
  78. type defaultDownloadProgressListener struct {
  79. }
  80. // ProgressChanged no-ops
  81. func (listener *defaultDownloadProgressListener) ProgressChanged(event *ProgressEvent) {
  82. }
  83. // downloadWorker
  84. func downloadWorker(id int, arg downloadWorkerArg, jobs <-chan downloadPart, results chan<- downloadPart, failed chan<- error, die <-chan bool) {
  85. for part := range jobs {
  86. if err := arg.hook(part); err != nil {
  87. failed <- err
  88. break
  89. }
  90. // Resolve options
  91. r := Range(part.Start, part.End)
  92. p := Progress(&defaultDownloadProgressListener{})
  93. opts := make([]Option, len(arg.options)+2)
  94. // Append orderly, can not be reversed!
  95. opts = append(opts, arg.options...)
  96. opts = append(opts, r, p)
  97. rd, err := arg.bucket.GetObject(arg.key, opts...)
  98. if err != nil {
  99. failed <- err
  100. break
  101. }
  102. defer rd.Close()
  103. var crcCalc hash.Hash64
  104. if arg.enableCRC {
  105. crcCalc = crc64.New(crcTable())
  106. contentLen := part.End - part.Start + 1
  107. rd = ioutil.NopCloser(TeeReader(rd, crcCalc, contentLen, nil, nil))
  108. }
  109. defer rd.Close()
  110. select {
  111. case <-die:
  112. return
  113. default:
  114. }
  115. fd, err := os.OpenFile(arg.filePath, os.O_WRONLY, FilePermMode)
  116. if err != nil {
  117. failed <- err
  118. break
  119. }
  120. _, err = fd.Seek(part.Start-part.Offset, os.SEEK_SET)
  121. if err != nil {
  122. fd.Close()
  123. failed <- err
  124. break
  125. }
  126. _, err = io.Copy(fd, rd)
  127. if err != nil {
  128. fd.Close()
  129. failed <- err
  130. break
  131. }
  132. if arg.enableCRC {
  133. part.CRC64 = crcCalc.Sum64()
  134. }
  135. fd.Close()
  136. results <- part
  137. }
  138. }
  139. // downloadScheduler
  140. func downloadScheduler(jobs chan downloadPart, parts []downloadPart) {
  141. for _, part := range parts {
  142. jobs <- part
  143. }
  144. close(jobs)
  145. }
  146. // downloadPart defines download part
  147. type downloadPart struct {
  148. Index int // Part number, starting from 0
  149. Start int64 // Start index
  150. End int64 // End index
  151. Offset int64 // Offset
  152. CRC64 uint64 // CRC check value of part
  153. }
  154. // getDownloadParts gets download parts
  155. func getDownloadParts(objectSize, partSize int64, uRange *unpackedRange) []downloadPart {
  156. parts := []downloadPart{}
  157. part := downloadPart{}
  158. i := 0
  159. start, end := adjustRange(uRange, objectSize)
  160. for offset := start; offset < end; offset += partSize {
  161. part.Index = i
  162. part.Start = offset
  163. part.End = GetPartEnd(offset, end, partSize)
  164. part.Offset = start
  165. part.CRC64 = 0
  166. parts = append(parts, part)
  167. i++
  168. }
  169. return parts
  170. }
  171. // getObjectBytes gets object bytes length
  172. func getObjectBytes(parts []downloadPart) int64 {
  173. var ob int64
  174. for _, part := range parts {
  175. ob += (part.End - part.Start + 1)
  176. }
  177. return ob
  178. }
  179. // combineCRCInParts caculates the total CRC of continuous parts
  180. func combineCRCInParts(dps []downloadPart) uint64 {
  181. if dps == nil || len(dps) == 0 {
  182. return 0
  183. }
  184. crc := dps[0].CRC64
  185. for i := 1; i < len(dps); i++ {
  186. crc = CRC64Combine(crc, dps[i].CRC64, (uint64)(dps[i].End-dps[i].Start+1))
  187. }
  188. return crc
  189. }
  190. // downloadFile downloads file concurrently without checkpoint.
  191. func (bucket Bucket) downloadFile(objectKey, filePath string, partSize int64, options []Option, routines int, uRange *unpackedRange) error {
  192. tempFilePath := filePath + TempFileSuffix
  193. listener := getProgressListener(options)
  194. // If the file does not exist, create one. If exists, the download will overwrite it.
  195. fd, err := os.OpenFile(tempFilePath, os.O_WRONLY|os.O_CREATE, FilePermMode)
  196. if err != nil {
  197. return err
  198. }
  199. fd.Close()
  200. // Get the object detailed meta for object whole size
  201. // must delete header:range to get whole object size
  202. skipOptions := deleteOption(options, HTTPHeaderRange)
  203. meta, err := bucket.GetObjectDetailedMeta(objectKey, skipOptions...)
  204. if err != nil {
  205. return err
  206. }
  207. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  208. if err != nil {
  209. return err
  210. }
  211. enableCRC := false
  212. expectedCRC := (uint64)(0)
  213. if bucket.getConfig().IsEnableCRC && meta.Get(HTTPHeaderOssCRC64) != "" {
  214. if uRange == nil || (!uRange.hasStart && !uRange.hasEnd) {
  215. enableCRC = true
  216. expectedCRC, _ = strconv.ParseUint(meta.Get(HTTPHeaderOssCRC64), 10, 0)
  217. }
  218. }
  219. // Get the parts of the file
  220. parts := getDownloadParts(objectSize, partSize, uRange)
  221. jobs := make(chan downloadPart, len(parts))
  222. results := make(chan downloadPart, len(parts))
  223. failed := make(chan error)
  224. die := make(chan bool)
  225. var completedBytes int64
  226. totalBytes := getObjectBytes(parts)
  227. event := newProgressEvent(TransferStartedEvent, 0, totalBytes)
  228. publishProgress(listener, event)
  229. // Start the download workers
  230. arg := downloadWorkerArg{&bucket, objectKey, tempFilePath, options, downloadPartHooker, enableCRC}
  231. for w := 1; w <= routines; w++ {
  232. go downloadWorker(w, arg, jobs, results, failed, die)
  233. }
  234. // Download parts concurrently
  235. go downloadScheduler(jobs, parts)
  236. // Waiting for parts download finished
  237. completed := 0
  238. for completed < len(parts) {
  239. select {
  240. case part := <-results:
  241. completed++
  242. completedBytes += (part.End - part.Start + 1)
  243. parts[part.Index].CRC64 = part.CRC64
  244. event = newProgressEvent(TransferDataEvent, completedBytes, totalBytes)
  245. publishProgress(listener, event)
  246. case err := <-failed:
  247. close(die)
  248. event = newProgressEvent(TransferFailedEvent, completedBytes, totalBytes)
  249. publishProgress(listener, event)
  250. return err
  251. }
  252. if completed >= len(parts) {
  253. break
  254. }
  255. }
  256. event = newProgressEvent(TransferCompletedEvent, completedBytes, totalBytes)
  257. publishProgress(listener, event)
  258. if enableCRC {
  259. actualCRC := combineCRCInParts(parts)
  260. err = checkDownloadCRC(actualCRC, expectedCRC)
  261. if err != nil {
  262. return err
  263. }
  264. }
  265. return os.Rename(tempFilePath, filePath)
  266. }
  267. // ----- Concurrent download with chcekpoint -----
  268. const downloadCpMagic = "92611BED-89E2-46B6-89E5-72F273D4B0A3"
  269. type downloadCheckpoint struct {
  270. Magic string // Magic
  271. MD5 string // Checkpoint content MD5
  272. FilePath string // Local file
  273. Object string // Key
  274. ObjStat objectStat // Object status
  275. Parts []downloadPart // All download parts
  276. PartStat []bool // Parts' download status
  277. Start int64 // Start point of the file
  278. End int64 // End point of the file
  279. enableCRC bool // Whether has CRC check
  280. CRC uint64 // CRC check value
  281. }
  282. type objectStat struct {
  283. Size int64 // Object size
  284. LastModified string // Last modified time
  285. Etag string // Etag
  286. }
  287. // isValid flags of checkpoint data is valid. It returns true when the data is valid and the checkpoint is valid and the object is not updated.
  288. func (cp downloadCheckpoint) isValid(meta http.Header, uRange *unpackedRange) (bool, error) {
  289. // Compare the CP's Magic and the MD5
  290. cpb := cp
  291. cpb.MD5 = ""
  292. js, _ := json.Marshal(cpb)
  293. sum := md5.Sum(js)
  294. b64 := base64.StdEncoding.EncodeToString(sum[:])
  295. if cp.Magic != downloadCpMagic || b64 != cp.MD5 {
  296. return false, nil
  297. }
  298. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  299. if err != nil {
  300. return false, err
  301. }
  302. // Compare the object size, last modified time and etag
  303. if cp.ObjStat.Size != objectSize ||
  304. cp.ObjStat.LastModified != meta.Get(HTTPHeaderLastModified) ||
  305. cp.ObjStat.Etag != meta.Get(HTTPHeaderEtag) {
  306. return false, nil
  307. }
  308. // Check the download range
  309. if uRange != nil {
  310. start, end := adjustRange(uRange, objectSize)
  311. if start != cp.Start || end != cp.End {
  312. return false, nil
  313. }
  314. }
  315. return true, nil
  316. }
  317. // load checkpoint from local file
  318. func (cp *downloadCheckpoint) load(filePath string) error {
  319. contents, err := ioutil.ReadFile(filePath)
  320. if err != nil {
  321. return err
  322. }
  323. err = json.Unmarshal(contents, cp)
  324. return err
  325. }
  326. // dump funciton dumps to file
  327. func (cp *downloadCheckpoint) dump(filePath string) error {
  328. bcp := *cp
  329. // Calculate MD5
  330. bcp.MD5 = ""
  331. js, err := json.Marshal(bcp)
  332. if err != nil {
  333. return err
  334. }
  335. sum := md5.Sum(js)
  336. b64 := base64.StdEncoding.EncodeToString(sum[:])
  337. bcp.MD5 = b64
  338. // Serialize
  339. js, err = json.Marshal(bcp)
  340. if err != nil {
  341. return err
  342. }
  343. // Dump
  344. return ioutil.WriteFile(filePath, js, FilePermMode)
  345. }
  346. // todoParts gets unfinished parts
  347. func (cp downloadCheckpoint) todoParts() []downloadPart {
  348. dps := []downloadPart{}
  349. for i, ps := range cp.PartStat {
  350. if !ps {
  351. dps = append(dps, cp.Parts[i])
  352. }
  353. }
  354. return dps
  355. }
  356. // getCompletedBytes gets completed size
  357. func (cp downloadCheckpoint) getCompletedBytes() int64 {
  358. var completedBytes int64
  359. for i, part := range cp.Parts {
  360. if cp.PartStat[i] {
  361. completedBytes += (part.End - part.Start + 1)
  362. }
  363. }
  364. return completedBytes
  365. }
  366. // prepare initiates download tasks
  367. func (cp *downloadCheckpoint) prepare(meta http.Header, bucket *Bucket, objectKey, filePath string, partSize int64, uRange *unpackedRange) error {
  368. // CP
  369. cp.Magic = downloadCpMagic
  370. cp.FilePath = filePath
  371. cp.Object = objectKey
  372. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  373. if err != nil {
  374. return err
  375. }
  376. cp.ObjStat.Size = objectSize
  377. cp.ObjStat.LastModified = meta.Get(HTTPHeaderLastModified)
  378. cp.ObjStat.Etag = meta.Get(HTTPHeaderEtag)
  379. if bucket.getConfig().IsEnableCRC && meta.Get(HTTPHeaderOssCRC64) != "" {
  380. if uRange == nil || (!uRange.hasStart && !uRange.hasEnd) {
  381. cp.enableCRC = true
  382. cp.CRC, _ = strconv.ParseUint(meta.Get(HTTPHeaderOssCRC64), 10, 0)
  383. }
  384. }
  385. // Parts
  386. cp.Parts = getDownloadParts(objectSize, partSize, uRange)
  387. cp.PartStat = make([]bool, len(cp.Parts))
  388. for i := range cp.PartStat {
  389. cp.PartStat[i] = false
  390. }
  391. return nil
  392. }
  393. func (cp *downloadCheckpoint) complete(cpFilePath, downFilepath string) error {
  394. os.Remove(cpFilePath)
  395. return os.Rename(downFilepath, cp.FilePath)
  396. }
  397. // downloadFileWithCp downloads files with checkpoint.
  398. func (bucket Bucket) downloadFileWithCp(objectKey, filePath string, partSize int64, options []Option, cpFilePath string, routines int, uRange *unpackedRange) error {
  399. tempFilePath := filePath + TempFileSuffix
  400. listener := getProgressListener(options)
  401. // Load checkpoint data.
  402. dcp := downloadCheckpoint{}
  403. err := dcp.load(cpFilePath)
  404. if err != nil {
  405. os.Remove(cpFilePath)
  406. }
  407. // Get the object detailed meta for object whole size
  408. // must delete header:range to get whole object size
  409. skipOptions := deleteOption(options, HTTPHeaderRange)
  410. meta, err := bucket.GetObjectDetailedMeta(objectKey, skipOptions...)
  411. if err != nil {
  412. return err
  413. }
  414. // Load error or data invalid. Re-initialize the download.
  415. valid, err := dcp.isValid(meta, uRange)
  416. if err != nil || !valid {
  417. if err = dcp.prepare(meta, &bucket, objectKey, filePath, partSize, uRange); err != nil {
  418. return err
  419. }
  420. os.Remove(cpFilePath)
  421. }
  422. // Create the file if not exists. Otherwise the parts download will overwrite it.
  423. fd, err := os.OpenFile(tempFilePath, os.O_WRONLY|os.O_CREATE, FilePermMode)
  424. if err != nil {
  425. return err
  426. }
  427. fd.Close()
  428. // Unfinished parts
  429. parts := dcp.todoParts()
  430. jobs := make(chan downloadPart, len(parts))
  431. results := make(chan downloadPart, len(parts))
  432. failed := make(chan error)
  433. die := make(chan bool)
  434. completedBytes := dcp.getCompletedBytes()
  435. event := newProgressEvent(TransferStartedEvent, completedBytes, dcp.ObjStat.Size)
  436. publishProgress(listener, event)
  437. // Start the download workers routine
  438. arg := downloadWorkerArg{&bucket, objectKey, tempFilePath, options, downloadPartHooker, dcp.enableCRC}
  439. for w := 1; w <= routines; w++ {
  440. go downloadWorker(w, arg, jobs, results, failed, die)
  441. }
  442. // Concurrently downloads parts
  443. go downloadScheduler(jobs, parts)
  444. // Wait for the parts download finished
  445. completed := 0
  446. for completed < len(parts) {
  447. select {
  448. case part := <-results:
  449. completed++
  450. dcp.PartStat[part.Index] = true
  451. dcp.Parts[part.Index].CRC64 = part.CRC64
  452. dcp.dump(cpFilePath)
  453. completedBytes += (part.End - part.Start + 1)
  454. event = newProgressEvent(TransferDataEvent, completedBytes, dcp.ObjStat.Size)
  455. publishProgress(listener, event)
  456. case err := <-failed:
  457. close(die)
  458. event = newProgressEvent(TransferFailedEvent, completedBytes, dcp.ObjStat.Size)
  459. publishProgress(listener, event)
  460. return err
  461. }
  462. if completed >= len(parts) {
  463. break
  464. }
  465. }
  466. event = newProgressEvent(TransferCompletedEvent, completedBytes, dcp.ObjStat.Size)
  467. publishProgress(listener, event)
  468. if dcp.enableCRC {
  469. actualCRC := combineCRCInParts(dcp.Parts)
  470. err = checkDownloadCRC(actualCRC, dcp.CRC)
  471. if err != nil {
  472. return err
  473. }
  474. }
  475. return dcp.complete(cpFilePath, tempFilePath)
  476. }