download.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569
  1. package oss
  2. import (
  3. "crypto/md5"
  4. "encoding/base64"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "hash"
  9. "hash/crc64"
  10. "io"
  11. "io/ioutil"
  12. "net/http"
  13. "os"
  14. "path/filepath"
  15. "strconv"
  16. )
  17. // DownloadFile downloads files with multipart download.
  18. //
  19. // objectKey the object key.
  20. // filePath the local file to download from objectKey in OSS.
  21. // partSize the part size in bytes.
  22. // options object's constraints, check out GetObject for the reference.
  23. //
  24. // error it's nil when the call succeeds, otherwise it's an error object.
  25. //
  26. func (bucket Bucket) DownloadFile(objectKey, filePath string, partSize int64, options ...Option) error {
  27. if partSize < 1 {
  28. return errors.New("oss: part size smaller than 1")
  29. }
  30. uRange, err := getRangeConfig(options)
  31. if err != nil {
  32. return err
  33. }
  34. cpConf := getCpConfig(options)
  35. routines := getRoutines(options)
  36. var strVersionId string
  37. versionId, _ := findOption(options, "versionId", nil)
  38. if versionId != nil {
  39. strVersionId = versionId.(string)
  40. }
  41. if cpConf != nil && cpConf.IsEnable {
  42. cpFilePath := getDownloadCpFilePath(cpConf, bucket.BucketName, objectKey, strVersionId, filePath)
  43. if cpFilePath != "" {
  44. return bucket.downloadFileWithCp(objectKey, filePath, partSize, options, cpFilePath, routines, uRange)
  45. }
  46. }
  47. return bucket.downloadFile(objectKey, filePath, partSize, options, routines, uRange)
  48. }
  49. func getDownloadCpFilePath(cpConf *cpConfig, srcBucket, srcObject, versionId, destFile string) string {
  50. if cpConf.FilePath == "" && cpConf.DirPath != "" {
  51. src := fmt.Sprintf("oss://%v/%v", srcBucket, srcObject)
  52. absPath, _ := filepath.Abs(destFile)
  53. cpFileName := getCpFileName(src, absPath, versionId)
  54. cpConf.FilePath = cpConf.DirPath + string(os.PathSeparator) + cpFileName
  55. }
  56. return cpConf.FilePath
  57. }
  58. // getRangeConfig gets the download range from the options.
  59. func getRangeConfig(options []Option) (*unpackedRange, error) {
  60. rangeOpt, err := findOption(options, HTTPHeaderRange, nil)
  61. if err != nil || rangeOpt == nil {
  62. return nil, err
  63. }
  64. return parseRange(rangeOpt.(string))
  65. }
  66. // ----- concurrent download without checkpoint -----
  67. // downloadWorkerArg is download worker's parameters
  68. type downloadWorkerArg struct {
  69. bucket *Bucket
  70. key string
  71. filePath string
  72. options []Option
  73. hook downloadPartHook
  74. enableCRC bool
  75. }
  76. // downloadPartHook is hook for test
  77. type downloadPartHook func(part downloadPart) error
  78. var downloadPartHooker downloadPartHook = defaultDownloadPartHook
  79. func defaultDownloadPartHook(part downloadPart) error {
  80. return nil
  81. }
  82. // defaultDownloadProgressListener defines default ProgressListener, shields the ProgressListener in options of GetObject.
  83. type defaultDownloadProgressListener struct {
  84. }
  85. // ProgressChanged no-ops
  86. func (listener *defaultDownloadProgressListener) ProgressChanged(event *ProgressEvent) {
  87. }
  88. // downloadWorker
  89. func downloadWorker(id int, arg downloadWorkerArg, jobs <-chan downloadPart, results chan<- downloadPart, failed chan<- error, die <-chan bool) {
  90. for part := range jobs {
  91. if err := arg.hook(part); err != nil {
  92. failed <- err
  93. break
  94. }
  95. // Resolve options
  96. r := Range(part.Start, part.End)
  97. p := Progress(&defaultDownloadProgressListener{})
  98. opts := make([]Option, len(arg.options)+2)
  99. // Append orderly, can not be reversed!
  100. opts = append(opts, arg.options...)
  101. opts = append(opts, r, p)
  102. rd, err := arg.bucket.GetObject(arg.key, opts...)
  103. if err != nil {
  104. failed <- err
  105. break
  106. }
  107. defer rd.Close()
  108. var crcCalc hash.Hash64
  109. if arg.enableCRC {
  110. crcCalc = crc64.New(crcTable())
  111. contentLen := part.End - part.Start + 1
  112. rd = ioutil.NopCloser(TeeReader(rd, crcCalc, contentLen, nil, nil))
  113. }
  114. defer rd.Close()
  115. select {
  116. case <-die:
  117. return
  118. default:
  119. }
  120. fd, err := os.OpenFile(arg.filePath, os.O_WRONLY, FilePermMode)
  121. if err != nil {
  122. failed <- err
  123. break
  124. }
  125. _, err = fd.Seek(part.Start-part.Offset, os.SEEK_SET)
  126. if err != nil {
  127. fd.Close()
  128. failed <- err
  129. break
  130. }
  131. _, err = io.Copy(fd, rd)
  132. if err != nil {
  133. fd.Close()
  134. failed <- err
  135. break
  136. }
  137. if arg.enableCRC {
  138. part.CRC64 = crcCalc.Sum64()
  139. }
  140. fd.Close()
  141. results <- part
  142. }
  143. }
  144. // downloadScheduler
  145. func downloadScheduler(jobs chan downloadPart, parts []downloadPart) {
  146. for _, part := range parts {
  147. jobs <- part
  148. }
  149. close(jobs)
  150. }
  151. // downloadPart defines download part
  152. type downloadPart struct {
  153. Index int // Part number, starting from 0
  154. Start int64 // Start index
  155. End int64 // End index
  156. Offset int64 // Offset
  157. CRC64 uint64 // CRC check value of part
  158. }
  159. // getDownloadParts gets download parts
  160. func getDownloadParts(objectSize, partSize int64, uRange *unpackedRange) []downloadPart {
  161. parts := []downloadPart{}
  162. part := downloadPart{}
  163. i := 0
  164. start, end := adjustRange(uRange, objectSize)
  165. for offset := start; offset < end; offset += partSize {
  166. part.Index = i
  167. part.Start = offset
  168. part.End = GetPartEnd(offset, end, partSize)
  169. part.Offset = start
  170. part.CRC64 = 0
  171. parts = append(parts, part)
  172. i++
  173. }
  174. return parts
  175. }
  176. // getObjectBytes gets object bytes length
  177. func getObjectBytes(parts []downloadPart) int64 {
  178. var ob int64
  179. for _, part := range parts {
  180. ob += (part.End - part.Start + 1)
  181. }
  182. return ob
  183. }
  184. // combineCRCInParts caculates the total CRC of continuous parts
  185. func combineCRCInParts(dps []downloadPart) uint64 {
  186. if dps == nil || len(dps) == 0 {
  187. return 0
  188. }
  189. crc := dps[0].CRC64
  190. for i := 1; i < len(dps); i++ {
  191. crc = CRC64Combine(crc, dps[i].CRC64, (uint64)(dps[i].End-dps[i].Start+1))
  192. }
  193. return crc
  194. }
  195. // downloadFile downloads file concurrently without checkpoint.
  196. func (bucket Bucket) downloadFile(objectKey, filePath string, partSize int64, options []Option, routines int, uRange *unpackedRange) error {
  197. tempFilePath := filePath + TempFileSuffix
  198. listener := getProgressListener(options)
  199. // If the file does not exist, create one. If exists, the download will overwrite it.
  200. fd, err := os.OpenFile(tempFilePath, os.O_WRONLY|os.O_CREATE, FilePermMode)
  201. if err != nil {
  202. return err
  203. }
  204. fd.Close()
  205. // Get the object detailed meta for object whole size
  206. // must delete header:range to get whole object size
  207. skipOptions := deleteOption(options, HTTPHeaderRange)
  208. meta, err := bucket.GetObjectDetailedMeta(objectKey, skipOptions...)
  209. if err != nil {
  210. return err
  211. }
  212. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  213. if err != nil {
  214. return err
  215. }
  216. enableCRC := false
  217. expectedCRC := (uint64)(0)
  218. if bucket.getConfig().IsEnableCRC && meta.Get(HTTPHeaderOssCRC64) != "" {
  219. if uRange == nil || (!uRange.hasStart && !uRange.hasEnd) {
  220. enableCRC = true
  221. expectedCRC, _ = strconv.ParseUint(meta.Get(HTTPHeaderOssCRC64), 10, 0)
  222. }
  223. }
  224. // Get the parts of the file
  225. parts := getDownloadParts(objectSize, partSize, uRange)
  226. jobs := make(chan downloadPart, len(parts))
  227. results := make(chan downloadPart, len(parts))
  228. failed := make(chan error)
  229. die := make(chan bool)
  230. var completedBytes int64
  231. totalBytes := getObjectBytes(parts)
  232. event := newProgressEvent(TransferStartedEvent, 0, totalBytes, 0)
  233. publishProgress(listener, event)
  234. // Start the download workers
  235. arg := downloadWorkerArg{&bucket, objectKey, tempFilePath, options, downloadPartHooker, enableCRC}
  236. for w := 1; w <= routines; w++ {
  237. go downloadWorker(w, arg, jobs, results, failed, die)
  238. }
  239. // Download parts concurrently
  240. go downloadScheduler(jobs, parts)
  241. // Waiting for parts download finished
  242. completed := 0
  243. for completed < len(parts) {
  244. select {
  245. case part := <-results:
  246. completed++
  247. downBytes := (part.End - part.Start + 1)
  248. completedBytes += downBytes
  249. parts[part.Index].CRC64 = part.CRC64
  250. event = newProgressEvent(TransferDataEvent, completedBytes, totalBytes, downBytes)
  251. publishProgress(listener, event)
  252. case err := <-failed:
  253. close(die)
  254. event = newProgressEvent(TransferFailedEvent, completedBytes, totalBytes, 0)
  255. publishProgress(listener, event)
  256. return err
  257. }
  258. if completed >= len(parts) {
  259. break
  260. }
  261. }
  262. event = newProgressEvent(TransferCompletedEvent, completedBytes, totalBytes, 0)
  263. publishProgress(listener, event)
  264. if enableCRC {
  265. actualCRC := combineCRCInParts(parts)
  266. err = checkDownloadCRC(actualCRC, expectedCRC)
  267. if err != nil {
  268. return err
  269. }
  270. }
  271. return os.Rename(tempFilePath, filePath)
  272. }
  273. // ----- Concurrent download with chcekpoint -----
  274. const downloadCpMagic = "92611BED-89E2-46B6-89E5-72F273D4B0A3"
  275. type downloadCheckpoint struct {
  276. Magic string // Magic
  277. MD5 string // Checkpoint content MD5
  278. FilePath string // Local file
  279. Object string // Key
  280. ObjStat objectStat // Object status
  281. Parts []downloadPart // All download parts
  282. PartStat []bool // Parts' download status
  283. Start int64 // Start point of the file
  284. End int64 // End point of the file
  285. enableCRC bool // Whether has CRC check
  286. CRC uint64 // CRC check value
  287. }
  288. type objectStat struct {
  289. Size int64 // Object size
  290. LastModified string // Last modified time
  291. Etag string // Etag
  292. }
  293. // isValid flags of checkpoint data is valid. It returns true when the data is valid and the checkpoint is valid and the object is not updated.
  294. func (cp downloadCheckpoint) isValid(meta http.Header, uRange *unpackedRange) (bool, error) {
  295. // Compare the CP's Magic and the MD5
  296. cpb := cp
  297. cpb.MD5 = ""
  298. js, _ := json.Marshal(cpb)
  299. sum := md5.Sum(js)
  300. b64 := base64.StdEncoding.EncodeToString(sum[:])
  301. if cp.Magic != downloadCpMagic || b64 != cp.MD5 {
  302. return false, nil
  303. }
  304. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  305. if err != nil {
  306. return false, err
  307. }
  308. // Compare the object size, last modified time and etag
  309. if cp.ObjStat.Size != objectSize ||
  310. cp.ObjStat.LastModified != meta.Get(HTTPHeaderLastModified) ||
  311. cp.ObjStat.Etag != meta.Get(HTTPHeaderEtag) {
  312. return false, nil
  313. }
  314. // Check the download range
  315. if uRange != nil {
  316. start, end := adjustRange(uRange, objectSize)
  317. if start != cp.Start || end != cp.End {
  318. return false, nil
  319. }
  320. }
  321. return true, nil
  322. }
  323. // load checkpoint from local file
  324. func (cp *downloadCheckpoint) load(filePath string) error {
  325. contents, err := ioutil.ReadFile(filePath)
  326. if err != nil {
  327. return err
  328. }
  329. err = json.Unmarshal(contents, cp)
  330. return err
  331. }
  332. // dump funciton dumps to file
  333. func (cp *downloadCheckpoint) dump(filePath string) error {
  334. bcp := *cp
  335. // Calculate MD5
  336. bcp.MD5 = ""
  337. js, err := json.Marshal(bcp)
  338. if err != nil {
  339. return err
  340. }
  341. sum := md5.Sum(js)
  342. b64 := base64.StdEncoding.EncodeToString(sum[:])
  343. bcp.MD5 = b64
  344. // Serialize
  345. js, err = json.Marshal(bcp)
  346. if err != nil {
  347. return err
  348. }
  349. // Dump
  350. return ioutil.WriteFile(filePath, js, FilePermMode)
  351. }
  352. // todoParts gets unfinished parts
  353. func (cp downloadCheckpoint) todoParts() []downloadPart {
  354. dps := []downloadPart{}
  355. for i, ps := range cp.PartStat {
  356. if !ps {
  357. dps = append(dps, cp.Parts[i])
  358. }
  359. }
  360. return dps
  361. }
  362. // getCompletedBytes gets completed size
  363. func (cp downloadCheckpoint) getCompletedBytes() int64 {
  364. var completedBytes int64
  365. for i, part := range cp.Parts {
  366. if cp.PartStat[i] {
  367. completedBytes += (part.End - part.Start + 1)
  368. }
  369. }
  370. return completedBytes
  371. }
  372. // prepare initiates download tasks
  373. func (cp *downloadCheckpoint) prepare(meta http.Header, bucket *Bucket, objectKey, filePath string, partSize int64, uRange *unpackedRange) error {
  374. // CP
  375. cp.Magic = downloadCpMagic
  376. cp.FilePath = filePath
  377. cp.Object = objectKey
  378. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  379. if err != nil {
  380. return err
  381. }
  382. cp.ObjStat.Size = objectSize
  383. cp.ObjStat.LastModified = meta.Get(HTTPHeaderLastModified)
  384. cp.ObjStat.Etag = meta.Get(HTTPHeaderEtag)
  385. if bucket.getConfig().IsEnableCRC && meta.Get(HTTPHeaderOssCRC64) != "" {
  386. if uRange == nil || (!uRange.hasStart && !uRange.hasEnd) {
  387. cp.enableCRC = true
  388. cp.CRC, _ = strconv.ParseUint(meta.Get(HTTPHeaderOssCRC64), 10, 0)
  389. }
  390. }
  391. // Parts
  392. cp.Parts = getDownloadParts(objectSize, partSize, uRange)
  393. cp.PartStat = make([]bool, len(cp.Parts))
  394. for i := range cp.PartStat {
  395. cp.PartStat[i] = false
  396. }
  397. return nil
  398. }
  399. func (cp *downloadCheckpoint) complete(cpFilePath, downFilepath string) error {
  400. os.Remove(cpFilePath)
  401. return os.Rename(downFilepath, cp.FilePath)
  402. }
  403. // downloadFileWithCp downloads files with checkpoint.
  404. func (bucket Bucket) downloadFileWithCp(objectKey, filePath string, partSize int64, options []Option, cpFilePath string, routines int, uRange *unpackedRange) error {
  405. tempFilePath := filePath + TempFileSuffix
  406. listener := getProgressListener(options)
  407. // Load checkpoint data.
  408. dcp := downloadCheckpoint{}
  409. err := dcp.load(cpFilePath)
  410. if err != nil {
  411. os.Remove(cpFilePath)
  412. }
  413. // Get the object detailed meta for object whole size
  414. // must delete header:range to get whole object size
  415. skipOptions := deleteOption(options, HTTPHeaderRange)
  416. meta, err := bucket.GetObjectDetailedMeta(objectKey, skipOptions...)
  417. if err != nil {
  418. return err
  419. }
  420. // Load error or data invalid. Re-initialize the download.
  421. valid, err := dcp.isValid(meta, uRange)
  422. if err != nil || !valid {
  423. if err = dcp.prepare(meta, &bucket, objectKey, filePath, partSize, uRange); err != nil {
  424. return err
  425. }
  426. os.Remove(cpFilePath)
  427. }
  428. // Create the file if not exists. Otherwise the parts download will overwrite it.
  429. fd, err := os.OpenFile(tempFilePath, os.O_WRONLY|os.O_CREATE, FilePermMode)
  430. if err != nil {
  431. return err
  432. }
  433. fd.Close()
  434. // Unfinished parts
  435. parts := dcp.todoParts()
  436. jobs := make(chan downloadPart, len(parts))
  437. results := make(chan downloadPart, len(parts))
  438. failed := make(chan error)
  439. die := make(chan bool)
  440. completedBytes := dcp.getCompletedBytes()
  441. event := newProgressEvent(TransferStartedEvent, completedBytes, dcp.ObjStat.Size, 0)
  442. publishProgress(listener, event)
  443. // Start the download workers routine
  444. arg := downloadWorkerArg{&bucket, objectKey, tempFilePath, options, downloadPartHooker, dcp.enableCRC}
  445. for w := 1; w <= routines; w++ {
  446. go downloadWorker(w, arg, jobs, results, failed, die)
  447. }
  448. // Concurrently downloads parts
  449. go downloadScheduler(jobs, parts)
  450. // Wait for the parts download finished
  451. completed := 0
  452. for completed < len(parts) {
  453. select {
  454. case part := <-results:
  455. completed++
  456. dcp.PartStat[part.Index] = true
  457. dcp.Parts[part.Index].CRC64 = part.CRC64
  458. dcp.dump(cpFilePath)
  459. downBytes := (part.End - part.Start + 1)
  460. completedBytes += downBytes
  461. event = newProgressEvent(TransferDataEvent, completedBytes, dcp.ObjStat.Size, downBytes)
  462. publishProgress(listener, event)
  463. case err := <-failed:
  464. close(die)
  465. event = newProgressEvent(TransferFailedEvent, completedBytes, dcp.ObjStat.Size, 0)
  466. publishProgress(listener, event)
  467. return err
  468. }
  469. if completed >= len(parts) {
  470. break
  471. }
  472. }
  473. event = newProgressEvent(TransferCompletedEvent, completedBytes, dcp.ObjStat.Size, 0)
  474. publishProgress(listener, event)
  475. if dcp.enableCRC {
  476. actualCRC := combineCRCInParts(dcp.Parts)
  477. err = checkDownloadCRC(actualCRC, dcp.CRC)
  478. if err != nil {
  479. return err
  480. }
  481. }
  482. return dcp.complete(cpFilePath, tempFilePath)
  483. }