download.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567
  1. package oss
  2. import (
  3. "crypto/md5"
  4. "encoding/base64"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "hash"
  9. "hash/crc64"
  10. "io"
  11. "io/ioutil"
  12. "net/http"
  13. "os"
  14. "path/filepath"
  15. "strconv"
  16. "time"
  17. )
  18. // DownloadFile downloads files with multipart download.
  19. //
  20. // objectKey the object key.
  21. // filePath the local file to download from objectKey in OSS.
  22. // partSize the part size in bytes.
  23. // options object's constraints, check out GetObject for the reference.
  24. //
  25. // error it's nil when the call succeeds, otherwise it's an error object.
  26. //
  27. func (bucket Bucket) DownloadFile(objectKey, filePath string, partSize int64, options ...Option) error {
  28. if partSize < 1 {
  29. return errors.New("oss: part size smaller than 1")
  30. }
  31. uRange, err := GetRangeConfig(options)
  32. if err != nil {
  33. return err
  34. }
  35. cpConf := getCpConfig(options)
  36. routines := getRoutines(options)
  37. var strVersionId string
  38. versionId, _ := FindOption(options, "versionId", nil)
  39. if versionId != nil {
  40. strVersionId = versionId.(string)
  41. }
  42. if cpConf != nil && cpConf.IsEnable {
  43. cpFilePath := getDownloadCpFilePath(cpConf, bucket.BucketName, objectKey, strVersionId, filePath)
  44. if cpFilePath != "" {
  45. return bucket.downloadFileWithCp(objectKey, filePath, partSize, options, cpFilePath, routines, uRange)
  46. }
  47. }
  48. return bucket.downloadFile(objectKey, filePath, partSize, options, routines, uRange)
  49. }
  50. func getDownloadCpFilePath(cpConf *cpConfig, srcBucket, srcObject, versionId, destFile string) string {
  51. if cpConf.FilePath == "" && cpConf.DirPath != "" {
  52. src := fmt.Sprintf("oss://%v/%v", srcBucket, srcObject)
  53. absPath, _ := filepath.Abs(destFile)
  54. cpFileName := getCpFileName(src, absPath, versionId)
  55. cpConf.FilePath = cpConf.DirPath + string(os.PathSeparator) + cpFileName
  56. }
  57. return cpConf.FilePath
  58. }
  59. // downloadWorkerArg is download worker's parameters
  60. type downloadWorkerArg struct {
  61. bucket *Bucket
  62. key string
  63. filePath string
  64. options []Option
  65. hook downloadPartHook
  66. enableCRC bool
  67. }
  68. // downloadPartHook is hook for test
  69. type downloadPartHook func(part downloadPart) error
  70. var downloadPartHooker downloadPartHook = defaultDownloadPartHook
  71. func defaultDownloadPartHook(part downloadPart) error {
  72. return nil
  73. }
  74. // defaultDownloadProgressListener defines default ProgressListener, shields the ProgressListener in options of GetObject.
  75. type defaultDownloadProgressListener struct {
  76. }
  77. // ProgressChanged no-ops
  78. func (listener *defaultDownloadProgressListener) ProgressChanged(event *ProgressEvent) {
  79. }
  80. // downloadWorker
  81. func downloadWorker(id int, arg downloadWorkerArg, jobs <-chan downloadPart, results chan<- downloadPart, failed chan<- error, die <-chan bool) {
  82. for part := range jobs {
  83. if err := arg.hook(part); err != nil {
  84. failed <- err
  85. break
  86. }
  87. // Resolve options
  88. r := Range(part.Start, part.End)
  89. p := Progress(&defaultDownloadProgressListener{})
  90. var respHeader http.Header
  91. opts := make([]Option, len(arg.options)+3)
  92. // Append orderly, can not be reversed!
  93. opts = append(opts, arg.options...)
  94. opts = append(opts, r, p, GetResponseHeader(&respHeader))
  95. rd, err := arg.bucket.GetObject(arg.key, opts...)
  96. if err != nil {
  97. failed <- err
  98. break
  99. }
  100. defer rd.Close()
  101. var crcCalc hash.Hash64
  102. if arg.enableCRC {
  103. crcCalc = crc64.New(CrcTable())
  104. contentLen := part.End - part.Start + 1
  105. rd = ioutil.NopCloser(TeeReader(rd, crcCalc, contentLen, nil, nil))
  106. }
  107. defer rd.Close()
  108. select {
  109. case <-die:
  110. return
  111. default:
  112. }
  113. fd, err := os.OpenFile(arg.filePath, os.O_WRONLY, FilePermMode)
  114. if err != nil {
  115. failed <- err
  116. break
  117. }
  118. _, err = fd.Seek(part.Start-part.Offset, os.SEEK_SET)
  119. if err != nil {
  120. fd.Close()
  121. failed <- err
  122. break
  123. }
  124. startT := time.Now().UnixNano() / 1000 / 1000 / 1000
  125. _, err = io.Copy(fd, rd)
  126. endT := time.Now().UnixNano() / 1000 / 1000 / 1000
  127. if err != nil {
  128. arg.bucket.Client.Config.WriteLog(Debug, "download part error,cost:%d second,part number:%d,request id:%s,error:%s.\n", endT-startT, part.Index, GetRequestId(respHeader), err.Error())
  129. fd.Close()
  130. failed <- err
  131. break
  132. }
  133. if arg.enableCRC {
  134. part.CRC64 = crcCalc.Sum64()
  135. }
  136. fd.Close()
  137. results <- part
  138. }
  139. }
  140. // downloadScheduler
  141. func downloadScheduler(jobs chan downloadPart, parts []downloadPart) {
  142. for _, part := range parts {
  143. jobs <- part
  144. }
  145. close(jobs)
  146. }
  147. // downloadPart defines download part
  148. type downloadPart struct {
  149. Index int // Part number, starting from 0
  150. Start int64 // Start index
  151. End int64 // End index
  152. Offset int64 // Offset
  153. CRC64 uint64 // CRC check value of part
  154. }
  155. // getDownloadParts gets download parts
  156. func getDownloadParts(objectSize, partSize int64, uRange *UnpackedRange) []downloadPart {
  157. parts := []downloadPart{}
  158. part := downloadPart{}
  159. i := 0
  160. start, end := AdjustRange(uRange, objectSize)
  161. for offset := start; offset < end; offset += partSize {
  162. part.Index = i
  163. part.Start = offset
  164. part.End = GetPartEnd(offset, end, partSize)
  165. part.Offset = start
  166. part.CRC64 = 0
  167. parts = append(parts, part)
  168. i++
  169. }
  170. return parts
  171. }
  172. // getObjectBytes gets object bytes length
  173. func getObjectBytes(parts []downloadPart) int64 {
  174. var ob int64
  175. for _, part := range parts {
  176. ob += (part.End - part.Start + 1)
  177. }
  178. return ob
  179. }
  180. // combineCRCInParts caculates the total CRC of continuous parts
  181. func combineCRCInParts(dps []downloadPart) uint64 {
  182. if dps == nil || len(dps) == 0 {
  183. return 0
  184. }
  185. crc := dps[0].CRC64
  186. for i := 1; i < len(dps); i++ {
  187. crc = CRC64Combine(crc, dps[i].CRC64, (uint64)(dps[i].End-dps[i].Start+1))
  188. }
  189. return crc
  190. }
  191. // downloadFile downloads file concurrently without checkpoint.
  192. func (bucket Bucket) downloadFile(objectKey, filePath string, partSize int64, options []Option, routines int, uRange *UnpackedRange) error {
  193. tempFilePath := filePath + TempFileSuffix
  194. listener := GetProgressListener(options)
  195. // If the file does not exist, create one. If exists, the download will overwrite it.
  196. fd, err := os.OpenFile(tempFilePath, os.O_WRONLY|os.O_CREATE, FilePermMode)
  197. if err != nil {
  198. return err
  199. }
  200. fd.Close()
  201. // Get the object detailed meta for object whole size
  202. // must delete header:range to get whole object size
  203. skipOptions := DeleteOption(options, HTTPHeaderRange)
  204. meta, err := bucket.GetObjectDetailedMeta(objectKey, skipOptions...)
  205. if err != nil {
  206. return err
  207. }
  208. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 64)
  209. if err != nil {
  210. return err
  211. }
  212. enableCRC := false
  213. expectedCRC := (uint64)(0)
  214. if bucket.GetConfig().IsEnableCRC && meta.Get(HTTPHeaderOssCRC64) != "" {
  215. if uRange == nil || (!uRange.HasStart && !uRange.HasEnd) {
  216. enableCRC = true
  217. expectedCRC, _ = strconv.ParseUint(meta.Get(HTTPHeaderOssCRC64), 10, 64)
  218. }
  219. }
  220. // Get the parts of the file
  221. parts := getDownloadParts(objectSize, partSize, uRange)
  222. jobs := make(chan downloadPart, len(parts))
  223. results := make(chan downloadPart, len(parts))
  224. failed := make(chan error)
  225. die := make(chan bool)
  226. var completedBytes int64
  227. totalBytes := getObjectBytes(parts)
  228. event := newProgressEvent(TransferStartedEvent, 0, totalBytes, 0)
  229. publishProgress(listener, event)
  230. // Start the download workers
  231. arg := downloadWorkerArg{&bucket, objectKey, tempFilePath, options, downloadPartHooker, enableCRC}
  232. for w := 1; w <= routines; w++ {
  233. go downloadWorker(w, arg, jobs, results, failed, die)
  234. }
  235. // Download parts concurrently
  236. go downloadScheduler(jobs, parts)
  237. // Waiting for parts download finished
  238. completed := 0
  239. for completed < len(parts) {
  240. select {
  241. case part := <-results:
  242. completed++
  243. downBytes := (part.End - part.Start + 1)
  244. completedBytes += downBytes
  245. parts[part.Index].CRC64 = part.CRC64
  246. event = newProgressEvent(TransferDataEvent, completedBytes, totalBytes, downBytes)
  247. publishProgress(listener, event)
  248. case err := <-failed:
  249. close(die)
  250. event = newProgressEvent(TransferFailedEvent, completedBytes, totalBytes, 0)
  251. publishProgress(listener, event)
  252. return err
  253. }
  254. if completed >= len(parts) {
  255. break
  256. }
  257. }
  258. event = newProgressEvent(TransferCompletedEvent, completedBytes, totalBytes, 0)
  259. publishProgress(listener, event)
  260. if enableCRC {
  261. actualCRC := combineCRCInParts(parts)
  262. err = CheckDownloadCRC(actualCRC, expectedCRC)
  263. if err != nil {
  264. return err
  265. }
  266. }
  267. return os.Rename(tempFilePath, filePath)
  268. }
  269. // ----- Concurrent download with chcekpoint -----
  270. const downloadCpMagic = "92611BED-89E2-46B6-89E5-72F273D4B0A3"
  271. type downloadCheckpoint struct {
  272. Magic string // Magic
  273. MD5 string // Checkpoint content MD5
  274. FilePath string // Local file
  275. Object string // Key
  276. ObjStat objectStat // Object status
  277. Parts []downloadPart // All download parts
  278. PartStat []bool // Parts' download status
  279. Start int64 // Start point of the file
  280. End int64 // End point of the file
  281. enableCRC bool // Whether has CRC check
  282. CRC uint64 // CRC check value
  283. }
  284. type objectStat struct {
  285. Size int64 // Object size
  286. LastModified string // Last modified time
  287. Etag string // Etag
  288. }
  289. // isValid flags of checkpoint data is valid. It returns true when the data is valid and the checkpoint is valid and the object is not updated.
  290. func (cp downloadCheckpoint) isValid(meta http.Header, uRange *UnpackedRange) (bool, error) {
  291. // Compare the CP's Magic and the MD5
  292. cpb := cp
  293. cpb.MD5 = ""
  294. js, _ := json.Marshal(cpb)
  295. sum := md5.Sum(js)
  296. b64 := base64.StdEncoding.EncodeToString(sum[:])
  297. if cp.Magic != downloadCpMagic || b64 != cp.MD5 {
  298. return false, nil
  299. }
  300. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 64)
  301. if err != nil {
  302. return false, err
  303. }
  304. // Compare the object size, last modified time and etag
  305. if cp.ObjStat.Size != objectSize ||
  306. cp.ObjStat.LastModified != meta.Get(HTTPHeaderLastModified) ||
  307. cp.ObjStat.Etag != meta.Get(HTTPHeaderEtag) {
  308. return false, nil
  309. }
  310. // Check the download range
  311. if uRange != nil {
  312. start, end := AdjustRange(uRange, objectSize)
  313. if start != cp.Start || end != cp.End {
  314. return false, nil
  315. }
  316. }
  317. return true, nil
  318. }
  319. // load checkpoint from local file
  320. func (cp *downloadCheckpoint) load(filePath string) error {
  321. contents, err := ioutil.ReadFile(filePath)
  322. if err != nil {
  323. return err
  324. }
  325. err = json.Unmarshal(contents, cp)
  326. return err
  327. }
  328. // dump funciton dumps to file
  329. func (cp *downloadCheckpoint) dump(filePath string) error {
  330. bcp := *cp
  331. // Calculate MD5
  332. bcp.MD5 = ""
  333. js, err := json.Marshal(bcp)
  334. if err != nil {
  335. return err
  336. }
  337. sum := md5.Sum(js)
  338. b64 := base64.StdEncoding.EncodeToString(sum[:])
  339. bcp.MD5 = b64
  340. // Serialize
  341. js, err = json.Marshal(bcp)
  342. if err != nil {
  343. return err
  344. }
  345. // Dump
  346. return ioutil.WriteFile(filePath, js, FilePermMode)
  347. }
  348. // todoParts gets unfinished parts
  349. func (cp downloadCheckpoint) todoParts() []downloadPart {
  350. dps := []downloadPart{}
  351. for i, ps := range cp.PartStat {
  352. if !ps {
  353. dps = append(dps, cp.Parts[i])
  354. }
  355. }
  356. return dps
  357. }
  358. // getCompletedBytes gets completed size
  359. func (cp downloadCheckpoint) getCompletedBytes() int64 {
  360. var completedBytes int64
  361. for i, part := range cp.Parts {
  362. if cp.PartStat[i] {
  363. completedBytes += (part.End - part.Start + 1)
  364. }
  365. }
  366. return completedBytes
  367. }
  368. // prepare initiates download tasks
  369. func (cp *downloadCheckpoint) prepare(meta http.Header, bucket *Bucket, objectKey, filePath string, partSize int64, uRange *UnpackedRange) error {
  370. // CP
  371. cp.Magic = downloadCpMagic
  372. cp.FilePath = filePath
  373. cp.Object = objectKey
  374. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 64)
  375. if err != nil {
  376. return err
  377. }
  378. cp.ObjStat.Size = objectSize
  379. cp.ObjStat.LastModified = meta.Get(HTTPHeaderLastModified)
  380. cp.ObjStat.Etag = meta.Get(HTTPHeaderEtag)
  381. if bucket.GetConfig().IsEnableCRC && meta.Get(HTTPHeaderOssCRC64) != "" {
  382. if uRange == nil || (!uRange.HasStart && !uRange.HasEnd) {
  383. cp.enableCRC = true
  384. cp.CRC, _ = strconv.ParseUint(meta.Get(HTTPHeaderOssCRC64), 10, 64)
  385. }
  386. }
  387. // Parts
  388. cp.Parts = getDownloadParts(objectSize, partSize, uRange)
  389. cp.PartStat = make([]bool, len(cp.Parts))
  390. for i := range cp.PartStat {
  391. cp.PartStat[i] = false
  392. }
  393. return nil
  394. }
  395. func (cp *downloadCheckpoint) complete(cpFilePath, downFilepath string) error {
  396. err := os.Rename(downFilepath, cp.FilePath)
  397. if err != nil {
  398. return err
  399. }
  400. return os.Remove(cpFilePath)
  401. }
  402. // downloadFileWithCp downloads files with checkpoint.
  403. func (bucket Bucket) downloadFileWithCp(objectKey, filePath string, partSize int64, options []Option, cpFilePath string, routines int, uRange *UnpackedRange) error {
  404. tempFilePath := filePath + TempFileSuffix
  405. listener := GetProgressListener(options)
  406. // Load checkpoint data.
  407. dcp := downloadCheckpoint{}
  408. err := dcp.load(cpFilePath)
  409. if err != nil {
  410. os.Remove(cpFilePath)
  411. }
  412. // Get the object detailed meta for object whole size
  413. // must delete header:range to get whole object size
  414. skipOptions := DeleteOption(options, HTTPHeaderRange)
  415. meta, err := bucket.GetObjectDetailedMeta(objectKey, skipOptions...)
  416. if err != nil {
  417. return err
  418. }
  419. // Load error or data invalid. Re-initialize the download.
  420. valid, err := dcp.isValid(meta, uRange)
  421. if err != nil || !valid {
  422. if err = dcp.prepare(meta, &bucket, objectKey, filePath, partSize, uRange); err != nil {
  423. return err
  424. }
  425. os.Remove(cpFilePath)
  426. }
  427. // Create the file if not exists. Otherwise the parts download will overwrite it.
  428. fd, err := os.OpenFile(tempFilePath, os.O_WRONLY|os.O_CREATE, FilePermMode)
  429. if err != nil {
  430. return err
  431. }
  432. fd.Close()
  433. // Unfinished parts
  434. parts := dcp.todoParts()
  435. jobs := make(chan downloadPart, len(parts))
  436. results := make(chan downloadPart, len(parts))
  437. failed := make(chan error)
  438. die := make(chan bool)
  439. completedBytes := dcp.getCompletedBytes()
  440. event := newProgressEvent(TransferStartedEvent, completedBytes, dcp.ObjStat.Size, 0)
  441. publishProgress(listener, event)
  442. // Start the download workers routine
  443. arg := downloadWorkerArg{&bucket, objectKey, tempFilePath, options, downloadPartHooker, dcp.enableCRC}
  444. for w := 1; w <= routines; w++ {
  445. go downloadWorker(w, arg, jobs, results, failed, die)
  446. }
  447. // Concurrently downloads parts
  448. go downloadScheduler(jobs, parts)
  449. // Wait for the parts download finished
  450. completed := 0
  451. for completed < len(parts) {
  452. select {
  453. case part := <-results:
  454. completed++
  455. dcp.PartStat[part.Index] = true
  456. dcp.Parts[part.Index].CRC64 = part.CRC64
  457. dcp.dump(cpFilePath)
  458. downBytes := (part.End - part.Start + 1)
  459. completedBytes += downBytes
  460. event = newProgressEvent(TransferDataEvent, completedBytes, dcp.ObjStat.Size, downBytes)
  461. publishProgress(listener, event)
  462. case err := <-failed:
  463. close(die)
  464. event = newProgressEvent(TransferFailedEvent, completedBytes, dcp.ObjStat.Size, 0)
  465. publishProgress(listener, event)
  466. return err
  467. }
  468. if completed >= len(parts) {
  469. break
  470. }
  471. }
  472. event = newProgressEvent(TransferCompletedEvent, completedBytes, dcp.ObjStat.Size, 0)
  473. publishProgress(listener, event)
  474. if dcp.enableCRC {
  475. actualCRC := combineCRCInParts(dcp.Parts)
  476. err = CheckDownloadCRC(actualCRC, dcp.CRC)
  477. if err != nil {
  478. return err
  479. }
  480. }
  481. return dcp.complete(cpFilePath, tempFilePath)
  482. }