download.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558
  1. package oss
  2. import (
  3. "crypto/md5"
  4. "encoding/base64"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "hash"
  9. "hash/crc64"
  10. "io"
  11. "io/ioutil"
  12. "net/http"
  13. "os"
  14. "path/filepath"
  15. "strconv"
  16. )
  17. // DownloadFile downloads files with multipart download.
  18. //
  19. // objectKey the object key.
  20. // filePath the local file to download from objectKey in OSS.
  21. // partSize the part size in bytes.
  22. // options object's constraints, check out GetObject for the reference.
  23. //
  24. // error it's nil when the call succeeds, otherwise it's an error object.
  25. //
  26. func (bucket Bucket) DownloadFile(objectKey, filePath string, partSize int64, options ...Option) error {
  27. if partSize < 1 {
  28. return errors.New("oss: part size smaller than 1")
  29. }
  30. uRange, err := GetRangeConfig(options)
  31. if err != nil {
  32. return err
  33. }
  34. cpConf := getCpConfig(options)
  35. routines := getRoutines(options)
  36. var strVersionId string
  37. versionId, _ := FindOption(options, "versionId", nil)
  38. if versionId != nil {
  39. strVersionId = versionId.(string)
  40. }
  41. if cpConf != nil && cpConf.IsEnable {
  42. cpFilePath := getDownloadCpFilePath(cpConf, bucket.BucketName, objectKey, strVersionId, filePath)
  43. if cpFilePath != "" {
  44. return bucket.downloadFileWithCp(objectKey, filePath, partSize, options, cpFilePath, routines, uRange)
  45. }
  46. }
  47. return bucket.downloadFile(objectKey, filePath, partSize, options, routines, uRange)
  48. }
  49. func getDownloadCpFilePath(cpConf *cpConfig, srcBucket, srcObject, versionId, destFile string) string {
  50. if cpConf.FilePath == "" && cpConf.DirPath != "" {
  51. src := fmt.Sprintf("oss://%v/%v", srcBucket, srcObject)
  52. absPath, _ := filepath.Abs(destFile)
  53. cpFileName := getCpFileName(src, absPath, versionId)
  54. cpConf.FilePath = cpConf.DirPath + string(os.PathSeparator) + cpFileName
  55. }
  56. return cpConf.FilePath
  57. }
  58. // downloadWorkerArg is download worker's parameters
  59. type downloadWorkerArg struct {
  60. bucket *Bucket
  61. key string
  62. filePath string
  63. options []Option
  64. hook downloadPartHook
  65. enableCRC bool
  66. }
  67. // downloadPartHook is hook for test
  68. type downloadPartHook func(part downloadPart) error
  69. var downloadPartHooker downloadPartHook = defaultDownloadPartHook
  70. func defaultDownloadPartHook(part downloadPart) error {
  71. return nil
  72. }
  73. // defaultDownloadProgressListener defines default ProgressListener, shields the ProgressListener in options of GetObject.
  74. type defaultDownloadProgressListener struct {
  75. }
  76. // ProgressChanged no-ops
  77. func (listener *defaultDownloadProgressListener) ProgressChanged(event *ProgressEvent) {
  78. }
  79. // downloadWorker
  80. func downloadWorker(id int, arg downloadWorkerArg, jobs <-chan downloadPart, results chan<- downloadPart, failed chan<- error, die <-chan bool) {
  81. for part := range jobs {
  82. if err := arg.hook(part); err != nil {
  83. failed <- err
  84. break
  85. }
  86. // Resolve options
  87. r := Range(part.Start, part.End)
  88. p := Progress(&defaultDownloadProgressListener{})
  89. opts := make([]Option, len(arg.options)+2)
  90. // Append orderly, can not be reversed!
  91. opts = append(opts, arg.options...)
  92. opts = append(opts, r, p)
  93. rd, err := arg.bucket.GetObject(arg.key, opts...)
  94. if err != nil {
  95. failed <- err
  96. break
  97. }
  98. defer rd.Close()
  99. var crcCalc hash.Hash64
  100. if arg.enableCRC {
  101. crcCalc = crc64.New(CrcTable())
  102. contentLen := part.End - part.Start + 1
  103. rd = ioutil.NopCloser(TeeReader(rd, crcCalc, contentLen, nil, nil))
  104. }
  105. defer rd.Close()
  106. select {
  107. case <-die:
  108. return
  109. default:
  110. }
  111. fd, err := os.OpenFile(arg.filePath, os.O_WRONLY, FilePermMode)
  112. if err != nil {
  113. failed <- err
  114. break
  115. }
  116. _, err = fd.Seek(part.Start-part.Offset, os.SEEK_SET)
  117. if err != nil {
  118. fd.Close()
  119. failed <- err
  120. break
  121. }
  122. _, err = io.Copy(fd, rd)
  123. if err != nil {
  124. fd.Close()
  125. failed <- err
  126. break
  127. }
  128. if arg.enableCRC {
  129. part.CRC64 = crcCalc.Sum64()
  130. }
  131. fd.Close()
  132. results <- part
  133. }
  134. }
  135. // downloadScheduler
  136. func downloadScheduler(jobs chan downloadPart, parts []downloadPart) {
  137. for _, part := range parts {
  138. jobs <- part
  139. }
  140. close(jobs)
  141. }
  142. // downloadPart defines download part
  143. type downloadPart struct {
  144. Index int // Part number, starting from 0
  145. Start int64 // Start index
  146. End int64 // End index
  147. Offset int64 // Offset
  148. CRC64 uint64 // CRC check value of part
  149. }
  150. // getDownloadParts gets download parts
  151. func getDownloadParts(objectSize, partSize int64, uRange *UnpackedRange) []downloadPart {
  152. parts := []downloadPart{}
  153. part := downloadPart{}
  154. i := 0
  155. start, end := AdjustRange(uRange, objectSize)
  156. for offset := start; offset < end; offset += partSize {
  157. part.Index = i
  158. part.Start = offset
  159. part.End = GetPartEnd(offset, end, partSize)
  160. part.Offset = start
  161. part.CRC64 = 0
  162. parts = append(parts, part)
  163. i++
  164. }
  165. return parts
  166. }
  167. // getObjectBytes gets object bytes length
  168. func getObjectBytes(parts []downloadPart) int64 {
  169. var ob int64
  170. for _, part := range parts {
  171. ob += (part.End - part.Start + 1)
  172. }
  173. return ob
  174. }
  175. // combineCRCInParts caculates the total CRC of continuous parts
  176. func combineCRCInParts(dps []downloadPart) uint64 {
  177. if dps == nil || len(dps) == 0 {
  178. return 0
  179. }
  180. crc := dps[0].CRC64
  181. for i := 1; i < len(dps); i++ {
  182. crc = CRC64Combine(crc, dps[i].CRC64, (uint64)(dps[i].End-dps[i].Start+1))
  183. }
  184. return crc
  185. }
  186. // downloadFile downloads file concurrently without checkpoint.
  187. func (bucket Bucket) downloadFile(objectKey, filePath string, partSize int64, options []Option, routines int, uRange *UnpackedRange) error {
  188. tempFilePath := filePath + TempFileSuffix
  189. listener := GetProgressListener(options)
  190. // If the file does not exist, create one. If exists, the download will overwrite it.
  191. fd, err := os.OpenFile(tempFilePath, os.O_WRONLY|os.O_CREATE, FilePermMode)
  192. if err != nil {
  193. return err
  194. }
  195. fd.Close()
  196. // Get the object detailed meta for object whole size
  197. // must delete header:range to get whole object size
  198. skipOptions := DeleteOption(options, HTTPHeaderRange)
  199. meta, err := bucket.GetObjectDetailedMeta(objectKey, skipOptions...)
  200. if err != nil {
  201. return err
  202. }
  203. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 64)
  204. if err != nil {
  205. return err
  206. }
  207. enableCRC := false
  208. expectedCRC := (uint64)(0)
  209. if bucket.GetConfig().IsEnableCRC && meta.Get(HTTPHeaderOssCRC64) != "" {
  210. if uRange == nil || (!uRange.HasStart && !uRange.HasEnd) {
  211. enableCRC = true
  212. expectedCRC, _ = strconv.ParseUint(meta.Get(HTTPHeaderOssCRC64), 10, 64)
  213. }
  214. }
  215. // Get the parts of the file
  216. parts := getDownloadParts(objectSize, partSize, uRange)
  217. jobs := make(chan downloadPart, len(parts))
  218. results := make(chan downloadPart, len(parts))
  219. failed := make(chan error)
  220. die := make(chan bool)
  221. var completedBytes int64
  222. totalBytes := getObjectBytes(parts)
  223. event := newProgressEvent(TransferStartedEvent, 0, totalBytes, 0)
  224. publishProgress(listener, event)
  225. // Start the download workers
  226. arg := downloadWorkerArg{&bucket, objectKey, tempFilePath, options, downloadPartHooker, enableCRC}
  227. for w := 1; w <= routines; w++ {
  228. go downloadWorker(w, arg, jobs, results, failed, die)
  229. }
  230. // Download parts concurrently
  231. go downloadScheduler(jobs, parts)
  232. // Waiting for parts download finished
  233. completed := 0
  234. for completed < len(parts) {
  235. select {
  236. case part := <-results:
  237. completed++
  238. downBytes := (part.End - part.Start + 1)
  239. completedBytes += downBytes
  240. parts[part.Index].CRC64 = part.CRC64
  241. event = newProgressEvent(TransferDataEvent, completedBytes, totalBytes, downBytes)
  242. publishProgress(listener, event)
  243. case err := <-failed:
  244. close(die)
  245. event = newProgressEvent(TransferFailedEvent, completedBytes, totalBytes, 0)
  246. publishProgress(listener, event)
  247. return err
  248. }
  249. if completed >= len(parts) {
  250. break
  251. }
  252. }
  253. event = newProgressEvent(TransferCompletedEvent, completedBytes, totalBytes, 0)
  254. publishProgress(listener, event)
  255. if enableCRC {
  256. actualCRC := combineCRCInParts(parts)
  257. err = CheckDownloadCRC(actualCRC, expectedCRC)
  258. if err != nil {
  259. return err
  260. }
  261. }
  262. return os.Rename(tempFilePath, filePath)
  263. }
  264. // ----- Concurrent download with chcekpoint -----
  265. const downloadCpMagic = "92611BED-89E2-46B6-89E5-72F273D4B0A3"
  266. type downloadCheckpoint struct {
  267. Magic string // Magic
  268. MD5 string // Checkpoint content MD5
  269. FilePath string // Local file
  270. Object string // Key
  271. ObjStat objectStat // Object status
  272. Parts []downloadPart // All download parts
  273. PartStat []bool // Parts' download status
  274. Start int64 // Start point of the file
  275. End int64 // End point of the file
  276. enableCRC bool // Whether has CRC check
  277. CRC uint64 // CRC check value
  278. }
  279. type objectStat struct {
  280. Size int64 // Object size
  281. LastModified string // Last modified time
  282. Etag string // Etag
  283. }
  284. // isValid flags of checkpoint data is valid. It returns true when the data is valid and the checkpoint is valid and the object is not updated.
  285. func (cp downloadCheckpoint) isValid(meta http.Header, uRange *UnpackedRange) (bool, error) {
  286. // Compare the CP's Magic and the MD5
  287. cpb := cp
  288. cpb.MD5 = ""
  289. js, _ := json.Marshal(cpb)
  290. sum := md5.Sum(js)
  291. b64 := base64.StdEncoding.EncodeToString(sum[:])
  292. if cp.Magic != downloadCpMagic || b64 != cp.MD5 {
  293. return false, nil
  294. }
  295. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 64)
  296. if err != nil {
  297. return false, err
  298. }
  299. // Compare the object size, last modified time and etag
  300. if cp.ObjStat.Size != objectSize ||
  301. cp.ObjStat.LastModified != meta.Get(HTTPHeaderLastModified) ||
  302. cp.ObjStat.Etag != meta.Get(HTTPHeaderEtag) {
  303. return false, nil
  304. }
  305. // Check the download range
  306. if uRange != nil {
  307. start, end := AdjustRange(uRange, objectSize)
  308. if start != cp.Start || end != cp.End {
  309. return false, nil
  310. }
  311. }
  312. return true, nil
  313. }
  314. // load checkpoint from local file
  315. func (cp *downloadCheckpoint) load(filePath string) error {
  316. contents, err := ioutil.ReadFile(filePath)
  317. if err != nil {
  318. return err
  319. }
  320. err = json.Unmarshal(contents, cp)
  321. return err
  322. }
  323. // dump funciton dumps to file
  324. func (cp *downloadCheckpoint) dump(filePath string) error {
  325. bcp := *cp
  326. // Calculate MD5
  327. bcp.MD5 = ""
  328. js, err := json.Marshal(bcp)
  329. if err != nil {
  330. return err
  331. }
  332. sum := md5.Sum(js)
  333. b64 := base64.StdEncoding.EncodeToString(sum[:])
  334. bcp.MD5 = b64
  335. // Serialize
  336. js, err = json.Marshal(bcp)
  337. if err != nil {
  338. return err
  339. }
  340. // Dump
  341. return ioutil.WriteFile(filePath, js, FilePermMode)
  342. }
  343. // todoParts gets unfinished parts
  344. func (cp downloadCheckpoint) todoParts() []downloadPart {
  345. dps := []downloadPart{}
  346. for i, ps := range cp.PartStat {
  347. if !ps {
  348. dps = append(dps, cp.Parts[i])
  349. }
  350. }
  351. return dps
  352. }
  353. // getCompletedBytes gets completed size
  354. func (cp downloadCheckpoint) getCompletedBytes() int64 {
  355. var completedBytes int64
  356. for i, part := range cp.Parts {
  357. if cp.PartStat[i] {
  358. completedBytes += (part.End - part.Start + 1)
  359. }
  360. }
  361. return completedBytes
  362. }
  363. // prepare initiates download tasks
  364. func (cp *downloadCheckpoint) prepare(meta http.Header, bucket *Bucket, objectKey, filePath string, partSize int64, uRange *UnpackedRange) error {
  365. // CP
  366. cp.Magic = downloadCpMagic
  367. cp.FilePath = filePath
  368. cp.Object = objectKey
  369. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 64)
  370. if err != nil {
  371. return err
  372. }
  373. cp.ObjStat.Size = objectSize
  374. cp.ObjStat.LastModified = meta.Get(HTTPHeaderLastModified)
  375. cp.ObjStat.Etag = meta.Get(HTTPHeaderEtag)
  376. if bucket.GetConfig().IsEnableCRC && meta.Get(HTTPHeaderOssCRC64) != "" {
  377. if uRange == nil || (!uRange.HasStart && !uRange.HasEnd) {
  378. cp.enableCRC = true
  379. cp.CRC, _ = strconv.ParseUint(meta.Get(HTTPHeaderOssCRC64), 10, 64)
  380. }
  381. }
  382. // Parts
  383. cp.Parts = getDownloadParts(objectSize, partSize, uRange)
  384. cp.PartStat = make([]bool, len(cp.Parts))
  385. for i := range cp.PartStat {
  386. cp.PartStat[i] = false
  387. }
  388. return nil
  389. }
  390. func (cp *downloadCheckpoint) complete(cpFilePath, downFilepath string) error {
  391. os.Remove(cpFilePath)
  392. return os.Rename(downFilepath, cp.FilePath)
  393. }
  394. // downloadFileWithCp downloads files with checkpoint.
  395. func (bucket Bucket) downloadFileWithCp(objectKey, filePath string, partSize int64, options []Option, cpFilePath string, routines int, uRange *UnpackedRange) error {
  396. tempFilePath := filePath + TempFileSuffix
  397. listener := GetProgressListener(options)
  398. // Load checkpoint data.
  399. dcp := downloadCheckpoint{}
  400. err := dcp.load(cpFilePath)
  401. if err != nil {
  402. os.Remove(cpFilePath)
  403. }
  404. // Get the object detailed meta for object whole size
  405. // must delete header:range to get whole object size
  406. skipOptions := DeleteOption(options, HTTPHeaderRange)
  407. meta, err := bucket.GetObjectDetailedMeta(objectKey, skipOptions...)
  408. if err != nil {
  409. return err
  410. }
  411. // Load error or data invalid. Re-initialize the download.
  412. valid, err := dcp.isValid(meta, uRange)
  413. if err != nil || !valid {
  414. if err = dcp.prepare(meta, &bucket, objectKey, filePath, partSize, uRange); err != nil {
  415. return err
  416. }
  417. os.Remove(cpFilePath)
  418. }
  419. // Create the file if not exists. Otherwise the parts download will overwrite it.
  420. fd, err := os.OpenFile(tempFilePath, os.O_WRONLY|os.O_CREATE, FilePermMode)
  421. if err != nil {
  422. return err
  423. }
  424. fd.Close()
  425. // Unfinished parts
  426. parts := dcp.todoParts()
  427. jobs := make(chan downloadPart, len(parts))
  428. results := make(chan downloadPart, len(parts))
  429. failed := make(chan error)
  430. die := make(chan bool)
  431. completedBytes := dcp.getCompletedBytes()
  432. event := newProgressEvent(TransferStartedEvent, completedBytes, dcp.ObjStat.Size, 0)
  433. publishProgress(listener, event)
  434. // Start the download workers routine
  435. arg := downloadWorkerArg{&bucket, objectKey, tempFilePath, options, downloadPartHooker, dcp.enableCRC}
  436. for w := 1; w <= routines; w++ {
  437. go downloadWorker(w, arg, jobs, results, failed, die)
  438. }
  439. // Concurrently downloads parts
  440. go downloadScheduler(jobs, parts)
  441. // Wait for the parts download finished
  442. completed := 0
  443. for completed < len(parts) {
  444. select {
  445. case part := <-results:
  446. completed++
  447. dcp.PartStat[part.Index] = true
  448. dcp.Parts[part.Index].CRC64 = part.CRC64
  449. dcp.dump(cpFilePath)
  450. downBytes := (part.End - part.Start + 1)
  451. completedBytes += downBytes
  452. event = newProgressEvent(TransferDataEvent, completedBytes, dcp.ObjStat.Size, downBytes)
  453. publishProgress(listener, event)
  454. case err := <-failed:
  455. close(die)
  456. event = newProgressEvent(TransferFailedEvent, completedBytes, dcp.ObjStat.Size, 0)
  457. publishProgress(listener, event)
  458. return err
  459. }
  460. if completed >= len(parts) {
  461. break
  462. }
  463. }
  464. event = newProgressEvent(TransferCompletedEvent, completedBytes, dcp.ObjStat.Size, 0)
  465. publishProgress(listener, event)
  466. if dcp.enableCRC {
  467. actualCRC := combineCRCInParts(dcp.Parts)
  468. err = CheckDownloadCRC(actualCRC, dcp.CRC)
  469. if err != nil {
  470. return err
  471. }
  472. }
  473. return dcp.complete(cpFilePath, tempFilePath)
  474. }