download.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554
  1. package oss
  2. import (
  3. "crypto/md5"
  4. "encoding/base64"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "hash"
  9. "hash/crc64"
  10. "io"
  11. "io/ioutil"
  12. "net/http"
  13. "os"
  14. "path/filepath"
  15. "strconv"
  16. )
  17. // DownloadFile downloads files with multipart download.
  18. //
  19. // objectKey the object key.
  20. // filePath the local file to download from objectKey in OSS.
  21. // partSize the part size in bytes.
  22. // options object's constraints, check out GetObject for the reference.
  23. //
  24. // error it's nil when the call succeeds, otherwise it's an error object.
  25. //
  26. func (bucket Bucket) DownloadFile(objectKey, filePath string, partSize int64, options ...Option) error {
  27. if partSize < 1 {
  28. return errors.New("oss: part size smaller than 1")
  29. }
  30. uRange, err := GetRangeConfig(options)
  31. if err != nil {
  32. return err
  33. }
  34. cpConf := getCpConfig(options)
  35. routines := getRoutines(options)
  36. if cpConf != nil && cpConf.IsEnable {
  37. cpFilePath := getDownloadCpFilePath(cpConf, bucket.BucketName, objectKey, filePath)
  38. if cpFilePath != "" {
  39. return bucket.downloadFileWithCp(objectKey, filePath, partSize, options, cpFilePath, routines, uRange)
  40. }
  41. }
  42. return bucket.downloadFile(objectKey, filePath, partSize, options, routines, uRange)
  43. }
  44. func getDownloadCpFilePath(cpConf *cpConfig, srcBucket, srcObject, destFile string) string {
  45. if cpConf.FilePath == "" && cpConf.DirPath != "" {
  46. src := fmt.Sprintf("oss://%v/%v", srcBucket, srcObject)
  47. absPath, _ := filepath.Abs(destFile)
  48. cpFileName := getCpFileName(src, absPath)
  49. cpConf.FilePath = cpConf.DirPath + string(os.PathSeparator) + cpFileName
  50. }
  51. return cpConf.FilePath
  52. }
  53. // ----- concurrent download without checkpoint -----
  54. // downloadWorkerArg is download worker's parameters
  55. type downloadWorkerArg struct {
  56. bucket *Bucket
  57. key string
  58. filePath string
  59. options []Option
  60. hook downloadPartHook
  61. enableCRC bool
  62. }
  63. // downloadPartHook is hook for test
  64. type downloadPartHook func(part downloadPart) error
  65. var downloadPartHooker downloadPartHook = defaultDownloadPartHook
  66. func defaultDownloadPartHook(part downloadPart) error {
  67. return nil
  68. }
  69. // defaultDownloadProgressListener defines default ProgressListener, shields the ProgressListener in options of GetObject.
  70. type defaultDownloadProgressListener struct {
  71. }
  72. // ProgressChanged no-ops
  73. func (listener *defaultDownloadProgressListener) ProgressChanged(event *ProgressEvent) {
  74. }
  75. // downloadWorker
  76. func downloadWorker(id int, arg downloadWorkerArg, jobs <-chan downloadPart, results chan<- downloadPart, failed chan<- error, die <-chan bool) {
  77. for part := range jobs {
  78. if err := arg.hook(part); err != nil {
  79. failed <- err
  80. break
  81. }
  82. // Resolve options
  83. r := Range(part.Start, part.End)
  84. p := Progress(&defaultDownloadProgressListener{})
  85. opts := make([]Option, len(arg.options)+2)
  86. // Append orderly, can not be reversed!
  87. opts = append(opts, arg.options...)
  88. opts = append(opts, r, p)
  89. rd, err := arg.bucket.GetObject(arg.key, opts...)
  90. if err != nil {
  91. failed <- err
  92. break
  93. }
  94. defer rd.Close()
  95. var crcCalc hash.Hash64
  96. if arg.enableCRC {
  97. crcCalc = crc64.New(CrcTable())
  98. contentLen := part.End - part.Start + 1
  99. rd = ioutil.NopCloser(TeeReader(rd, crcCalc, contentLen, nil, nil))
  100. }
  101. defer rd.Close()
  102. select {
  103. case <-die:
  104. return
  105. default:
  106. }
  107. fd, err := os.OpenFile(arg.filePath, os.O_WRONLY, FilePermMode)
  108. if err != nil {
  109. failed <- err
  110. break
  111. }
  112. _, err = fd.Seek(part.Start-part.Offset, os.SEEK_SET)
  113. if err != nil {
  114. fd.Close()
  115. failed <- err
  116. break
  117. }
  118. _, err = io.Copy(fd, rd)
  119. if err != nil {
  120. fd.Close()
  121. failed <- err
  122. break
  123. }
  124. if arg.enableCRC {
  125. part.CRC64 = crcCalc.Sum64()
  126. }
  127. fd.Close()
  128. results <- part
  129. }
  130. }
  131. // downloadScheduler
  132. func downloadScheduler(jobs chan downloadPart, parts []downloadPart) {
  133. for _, part := range parts {
  134. jobs <- part
  135. }
  136. close(jobs)
  137. }
  138. // downloadPart defines download part
  139. type downloadPart struct {
  140. Index int // Part number, starting from 0
  141. Start int64 // Start index
  142. End int64 // End index
  143. Offset int64 // Offset
  144. CRC64 uint64 // CRC check value of part
  145. }
  146. // getDownloadParts gets download parts
  147. func getDownloadParts(objectSize, partSize int64, uRange *UnpackedRange) []downloadPart {
  148. parts := []downloadPart{}
  149. part := downloadPart{}
  150. i := 0
  151. start, end := AdjustRange(uRange, objectSize)
  152. for offset := start; offset < end; offset += partSize {
  153. part.Index = i
  154. part.Start = offset
  155. part.End = GetPartEnd(offset, end, partSize)
  156. part.Offset = start
  157. part.CRC64 = 0
  158. parts = append(parts, part)
  159. i++
  160. }
  161. return parts
  162. }
  163. // getObjectBytes gets object bytes length
  164. func getObjectBytes(parts []downloadPart) int64 {
  165. var ob int64
  166. for _, part := range parts {
  167. ob += (part.End - part.Start + 1)
  168. }
  169. return ob
  170. }
  171. // combineCRCInParts caculates the total CRC of continuous parts
  172. func combineCRCInParts(dps []downloadPart) uint64 {
  173. if dps == nil || len(dps) == 0 {
  174. return 0
  175. }
  176. crc := dps[0].CRC64
  177. for i := 1; i < len(dps); i++ {
  178. crc = CRC64Combine(crc, dps[i].CRC64, (uint64)(dps[i].End-dps[i].Start+1))
  179. }
  180. return crc
  181. }
  182. // downloadFile downloads file concurrently without checkpoint.
  183. func (bucket Bucket) downloadFile(objectKey, filePath string, partSize int64, options []Option, routines int, uRange *UnpackedRange) error {
  184. tempFilePath := filePath + TempFileSuffix
  185. listener := GetProgressListener(options)
  186. // If the file does not exist, create one. If exists, the download will overwrite it.
  187. fd, err := os.OpenFile(tempFilePath, os.O_WRONLY|os.O_CREATE, FilePermMode)
  188. if err != nil {
  189. return err
  190. }
  191. fd.Close()
  192. // Get the object detailed meta for object whole size
  193. // must delete header:range to get whole object size
  194. skipOptions := DeleteOption(options, HTTPHeaderRange)
  195. meta, err := bucket.GetObjectDetailedMeta(objectKey, skipOptions...)
  196. if err != nil {
  197. return err
  198. }
  199. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 64)
  200. if err != nil {
  201. return err
  202. }
  203. enableCRC := false
  204. expectedCRC := (uint64)(0)
  205. if bucket.GetConfig().IsEnableCRC && meta.Get(HTTPHeaderOssCRC64) != "" {
  206. if uRange == nil || (!uRange.HasStart && !uRange.HasEnd) {
  207. enableCRC = true
  208. expectedCRC, _ = strconv.ParseUint(meta.Get(HTTPHeaderOssCRC64), 10, 64)
  209. }
  210. }
  211. // Get the parts of the file
  212. parts := getDownloadParts(objectSize, partSize, uRange)
  213. jobs := make(chan downloadPart, len(parts))
  214. results := make(chan downloadPart, len(parts))
  215. failed := make(chan error)
  216. die := make(chan bool)
  217. var completedBytes int64
  218. totalBytes := getObjectBytes(parts)
  219. event := newProgressEvent(TransferStartedEvent, 0, totalBytes, 0)
  220. publishProgress(listener, event)
  221. // Start the download workers
  222. arg := downloadWorkerArg{&bucket, objectKey, tempFilePath, options, downloadPartHooker, enableCRC}
  223. for w := 1; w <= routines; w++ {
  224. go downloadWorker(w, arg, jobs, results, failed, die)
  225. }
  226. // Download parts concurrently
  227. go downloadScheduler(jobs, parts)
  228. // Waiting for parts download finished
  229. completed := 0
  230. for completed < len(parts) {
  231. select {
  232. case part := <-results:
  233. completed++
  234. downBytes := (part.End - part.Start + 1)
  235. completedBytes += downBytes
  236. parts[part.Index].CRC64 = part.CRC64
  237. event = newProgressEvent(TransferDataEvent, completedBytes, totalBytes, downBytes)
  238. publishProgress(listener, event)
  239. case err := <-failed:
  240. close(die)
  241. event = newProgressEvent(TransferFailedEvent, completedBytes, totalBytes, 0)
  242. publishProgress(listener, event)
  243. return err
  244. }
  245. if completed >= len(parts) {
  246. break
  247. }
  248. }
  249. event = newProgressEvent(TransferCompletedEvent, completedBytes, totalBytes, 0)
  250. publishProgress(listener, event)
  251. if enableCRC {
  252. actualCRC := combineCRCInParts(parts)
  253. err = CheckDownloadCRC(actualCRC, expectedCRC)
  254. if err != nil {
  255. return err
  256. }
  257. }
  258. return os.Rename(tempFilePath, filePath)
  259. }
  260. // ----- Concurrent download with chcekpoint -----
  261. const downloadCpMagic = "92611BED-89E2-46B6-89E5-72F273D4B0A3"
  262. type downloadCheckpoint struct {
  263. Magic string // Magic
  264. MD5 string // Checkpoint content MD5
  265. FilePath string // Local file
  266. Object string // Key
  267. ObjStat objectStat // Object status
  268. Parts []downloadPart // All download parts
  269. PartStat []bool // Parts' download status
  270. Start int64 // Start point of the file
  271. End int64 // End point of the file
  272. enableCRC bool // Whether has CRC check
  273. CRC uint64 // CRC check value
  274. }
  275. type objectStat struct {
  276. Size int64 // Object size
  277. LastModified string // Last modified time
  278. Etag string // Etag
  279. }
  280. // isValid flags of checkpoint data is valid. It returns true when the data is valid and the checkpoint is valid and the object is not updated.
  281. func (cp downloadCheckpoint) isValid(meta http.Header, uRange *UnpackedRange) (bool, error) {
  282. // Compare the CP's Magic and the MD5
  283. cpb := cp
  284. cpb.MD5 = ""
  285. js, _ := json.Marshal(cpb)
  286. sum := md5.Sum(js)
  287. b64 := base64.StdEncoding.EncodeToString(sum[:])
  288. if cp.Magic != downloadCpMagic || b64 != cp.MD5 {
  289. return false, nil
  290. }
  291. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 64)
  292. if err != nil {
  293. return false, err
  294. }
  295. // Compare the object size, last modified time and etag
  296. if cp.ObjStat.Size != objectSize ||
  297. cp.ObjStat.LastModified != meta.Get(HTTPHeaderLastModified) ||
  298. cp.ObjStat.Etag != meta.Get(HTTPHeaderEtag) {
  299. return false, nil
  300. }
  301. // Check the download range
  302. if uRange != nil {
  303. start, end := AdjustRange(uRange, objectSize)
  304. if start != cp.Start || end != cp.End {
  305. return false, nil
  306. }
  307. }
  308. return true, nil
  309. }
  310. // load checkpoint from local file
  311. func (cp *downloadCheckpoint) load(filePath string) error {
  312. contents, err := ioutil.ReadFile(filePath)
  313. if err != nil {
  314. return err
  315. }
  316. err = json.Unmarshal(contents, cp)
  317. return err
  318. }
  319. // dump funciton dumps to file
  320. func (cp *downloadCheckpoint) dump(filePath string) error {
  321. bcp := *cp
  322. // Calculate MD5
  323. bcp.MD5 = ""
  324. js, err := json.Marshal(bcp)
  325. if err != nil {
  326. return err
  327. }
  328. sum := md5.Sum(js)
  329. b64 := base64.StdEncoding.EncodeToString(sum[:])
  330. bcp.MD5 = b64
  331. // Serialize
  332. js, err = json.Marshal(bcp)
  333. if err != nil {
  334. return err
  335. }
  336. // Dump
  337. return ioutil.WriteFile(filePath, js, FilePermMode)
  338. }
  339. // todoParts gets unfinished parts
  340. func (cp downloadCheckpoint) todoParts() []downloadPart {
  341. dps := []downloadPart{}
  342. for i, ps := range cp.PartStat {
  343. if !ps {
  344. dps = append(dps, cp.Parts[i])
  345. }
  346. }
  347. return dps
  348. }
  349. // getCompletedBytes gets completed size
  350. func (cp downloadCheckpoint) getCompletedBytes() int64 {
  351. var completedBytes int64
  352. for i, part := range cp.Parts {
  353. if cp.PartStat[i] {
  354. completedBytes += (part.End - part.Start + 1)
  355. }
  356. }
  357. return completedBytes
  358. }
  359. // prepare initiates download tasks
  360. func (cp *downloadCheckpoint) prepare(meta http.Header, bucket *Bucket, objectKey, filePath string, partSize int64, uRange *UnpackedRange) error {
  361. // CP
  362. cp.Magic = downloadCpMagic
  363. cp.FilePath = filePath
  364. cp.Object = objectKey
  365. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 64)
  366. if err != nil {
  367. return err
  368. }
  369. cp.ObjStat.Size = objectSize
  370. cp.ObjStat.LastModified = meta.Get(HTTPHeaderLastModified)
  371. cp.ObjStat.Etag = meta.Get(HTTPHeaderEtag)
  372. if bucket.GetConfig().IsEnableCRC && meta.Get(HTTPHeaderOssCRC64) != "" {
  373. if uRange == nil || (!uRange.HasStart && !uRange.HasEnd) {
  374. cp.enableCRC = true
  375. cp.CRC, _ = strconv.ParseUint(meta.Get(HTTPHeaderOssCRC64), 10, 64)
  376. }
  377. }
  378. // Parts
  379. cp.Parts = getDownloadParts(objectSize, partSize, uRange)
  380. cp.PartStat = make([]bool, len(cp.Parts))
  381. for i := range cp.PartStat {
  382. cp.PartStat[i] = false
  383. }
  384. return nil
  385. }
  386. func (cp *downloadCheckpoint) complete(cpFilePath, downFilepath string) error {
  387. os.Remove(cpFilePath)
  388. return os.Rename(downFilepath, cp.FilePath)
  389. }
  390. // downloadFileWithCp downloads files with checkpoint.
  391. func (bucket Bucket) downloadFileWithCp(objectKey, filePath string, partSize int64, options []Option, cpFilePath string, routines int, uRange *UnpackedRange) error {
  392. tempFilePath := filePath + TempFileSuffix
  393. listener := GetProgressListener(options)
  394. // Load checkpoint data.
  395. dcp := downloadCheckpoint{}
  396. err := dcp.load(cpFilePath)
  397. if err != nil {
  398. os.Remove(cpFilePath)
  399. }
  400. // Get the object detailed meta for object whole size
  401. // must delete header:range to get whole object size
  402. skipOptions := DeleteOption(options, HTTPHeaderRange)
  403. meta, err := bucket.GetObjectDetailedMeta(objectKey, skipOptions...)
  404. if err != nil {
  405. return err
  406. }
  407. // Load error or data invalid. Re-initialize the download.
  408. valid, err := dcp.isValid(meta, uRange)
  409. if err != nil || !valid {
  410. if err = dcp.prepare(meta, &bucket, objectKey, filePath, partSize, uRange); err != nil {
  411. return err
  412. }
  413. os.Remove(cpFilePath)
  414. }
  415. // Create the file if not exists. Otherwise the parts download will overwrite it.
  416. fd, err := os.OpenFile(tempFilePath, os.O_WRONLY|os.O_CREATE, FilePermMode)
  417. if err != nil {
  418. return err
  419. }
  420. fd.Close()
  421. // Unfinished parts
  422. parts := dcp.todoParts()
  423. jobs := make(chan downloadPart, len(parts))
  424. results := make(chan downloadPart, len(parts))
  425. failed := make(chan error)
  426. die := make(chan bool)
  427. completedBytes := dcp.getCompletedBytes()
  428. event := newProgressEvent(TransferStartedEvent, completedBytes, dcp.ObjStat.Size, 0)
  429. publishProgress(listener, event)
  430. // Start the download workers routine
  431. arg := downloadWorkerArg{&bucket, objectKey, tempFilePath, options, downloadPartHooker, dcp.enableCRC}
  432. for w := 1; w <= routines; w++ {
  433. go downloadWorker(w, arg, jobs, results, failed, die)
  434. }
  435. // Concurrently downloads parts
  436. go downloadScheduler(jobs, parts)
  437. // Wait for the parts download finished
  438. completed := 0
  439. for completed < len(parts) {
  440. select {
  441. case part := <-results:
  442. completed++
  443. dcp.PartStat[part.Index] = true
  444. dcp.Parts[part.Index].CRC64 = part.CRC64
  445. dcp.dump(cpFilePath)
  446. downBytes := (part.End - part.Start + 1)
  447. completedBytes += downBytes
  448. event = newProgressEvent(TransferDataEvent, completedBytes, dcp.ObjStat.Size, downBytes)
  449. publishProgress(listener, event)
  450. case err := <-failed:
  451. close(die)
  452. event = newProgressEvent(TransferFailedEvent, completedBytes, dcp.ObjStat.Size, 0)
  453. publishProgress(listener, event)
  454. return err
  455. }
  456. if completed >= len(parts) {
  457. break
  458. }
  459. }
  460. event = newProgressEvent(TransferCompletedEvent, completedBytes, dcp.ObjStat.Size, 0)
  461. publishProgress(listener, event)
  462. if dcp.enableCRC {
  463. actualCRC := combineCRCInParts(dcp.Parts)
  464. err = CheckDownloadCRC(actualCRC, dcp.CRC)
  465. if err != nil {
  466. return err
  467. }
  468. }
  469. return dcp.complete(cpFilePath, tempFilePath)
  470. }