upload.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517
  1. package oss
  2. import (
  3. "crypto/md5"
  4. "encoding/base64"
  5. "encoding/hex"
  6. "encoding/json"
  7. "errors"
  8. "fmt"
  9. "io/ioutil"
  10. "os"
  11. "path/filepath"
  12. "time"
  13. )
  14. // UploadFile is multipart file upload.
  15. //
  16. // objectKey the object name.
  17. // filePath the local file path to upload.
  18. // partSize the part size in byte.
  19. // options the options for uploading object.
  20. //
  21. // error it's nil if the operation succeeds, otherwise it's an error object.
  22. //
  23. func (bucket Bucket) UploadFile(objectKey, filePath string, partSize int64, options ...Option) error {
  24. if partSize < MinPartSize || partSize > MaxPartSize {
  25. return errors.New("oss: part size invalid range (1024KB, 5GB]")
  26. }
  27. cpConf := getCpConfig(options)
  28. routines := getRoutines(options)
  29. if cpConf != nil && cpConf.IsEnable && cpConf.cpDir != "" {
  30. dest := fmt.Sprintf("oss://%v/%v", bucket.BucketName, objectKey)
  31. absPath, _ := filepath.Abs(filePath)
  32. cpFileName := getCpFileName(absPath, dest)
  33. cpFilePath := cpConf.cpDir + string(os.PathSeparator) + cpFileName
  34. return bucket.uploadFileWithCp(objectKey, filePath, partSize, options, cpFilePath, routines)
  35. }
  36. return bucket.uploadFile(objectKey, filePath, partSize, options, routines)
  37. }
  38. // ----- concurrent upload without checkpoint -----
  39. // getCpConfig gets checkpoint configuration
  40. func getCpConfig(options []Option) *cpConfig {
  41. cpcOpt, err := findOption(options, checkpointConfig, nil)
  42. if err != nil || cpcOpt == nil {
  43. return nil
  44. }
  45. return cpcOpt.(*cpConfig)
  46. }
  47. // getCpFileName return the name of the checkpoint file
  48. func getCpFileName(src, dest string) string {
  49. md5Ctx := md5.New()
  50. md5Ctx.Write([]byte(src))
  51. srcCheckSum := hex.EncodeToString(md5Ctx.Sum(nil))
  52. md5Ctx.Reset()
  53. md5Ctx.Write([]byte(dest))
  54. destCheckSum := hex.EncodeToString(md5Ctx.Sum(nil))
  55. return fmt.Sprintf("%v-%v.cp", srcCheckSum, destCheckSum)
  56. }
  57. // getRoutines gets the routine count. by default it's 1.
  58. func getRoutines(options []Option) int {
  59. rtnOpt, err := findOption(options, routineNum, nil)
  60. if err != nil || rtnOpt == nil {
  61. return 1
  62. }
  63. rs := rtnOpt.(int)
  64. if rs < 1 {
  65. rs = 1
  66. } else if rs > 100 {
  67. rs = 100
  68. }
  69. return rs
  70. }
  71. // getPayer return the payer of the request
  72. func getPayer(options []Option) string {
  73. payerOpt, err := findOption(options, HTTPHeaderOSSRequester, nil)
  74. if err != nil || payerOpt == nil {
  75. return ""
  76. }
  77. return payerOpt.(string)
  78. }
  79. // getProgressListener gets the progress callback
  80. func getProgressListener(options []Option) ProgressListener {
  81. isSet, listener, _ := isOptionSet(options, progressListener)
  82. if !isSet {
  83. return nil
  84. }
  85. return listener.(ProgressListener)
  86. }
  87. // uploadPartHook is for testing usage
  88. type uploadPartHook func(id int, chunk FileChunk) error
  89. var uploadPartHooker uploadPartHook = defaultUploadPart
  90. func defaultUploadPart(id int, chunk FileChunk) error {
  91. return nil
  92. }
  93. // workerArg defines worker argument structure
  94. type workerArg struct {
  95. bucket *Bucket
  96. filePath string
  97. imur InitiateMultipartUploadResult
  98. options []Option
  99. hook uploadPartHook
  100. }
  101. // worker is the worker coroutine function
  102. func worker(id int, arg workerArg, jobs <-chan FileChunk, results chan<- UploadPart, failed chan<- error, die <-chan bool) {
  103. for chunk := range jobs {
  104. if err := arg.hook(id, chunk); err != nil {
  105. failed <- err
  106. break
  107. }
  108. part, err := arg.bucket.UploadPartFromFile(arg.imur, arg.filePath, chunk.Offset, chunk.Size, chunk.Number, arg.options...)
  109. if err != nil {
  110. failed <- err
  111. break
  112. }
  113. select {
  114. case <-die:
  115. return
  116. default:
  117. }
  118. results <- part
  119. }
  120. }
  121. // scheduler function
  122. func scheduler(jobs chan FileChunk, chunks []FileChunk) {
  123. for _, chunk := range chunks {
  124. jobs <- chunk
  125. }
  126. close(jobs)
  127. }
  128. func getTotalBytes(chunks []FileChunk) int64 {
  129. var tb int64
  130. for _, chunk := range chunks {
  131. tb += chunk.Size
  132. }
  133. return tb
  134. }
  135. // uploadFile is a concurrent upload, without checkpoint
  136. func (bucket Bucket) uploadFile(objectKey, filePath string, partSize int64, options []Option, routines int) error {
  137. listener := getProgressListener(options)
  138. chunks, err := SplitFileByPartSize(filePath, partSize)
  139. if err != nil {
  140. return err
  141. }
  142. payerOptions := []Option{}
  143. payer := getPayer(options)
  144. if payer != "" {
  145. payerOptions = append(payerOptions, RequestPayer(PayerType(payer)))
  146. }
  147. // Initialize the multipart upload
  148. imur, err := bucket.InitiateMultipartUpload(objectKey, options...)
  149. if err != nil {
  150. return err
  151. }
  152. jobs := make(chan FileChunk, len(chunks))
  153. results := make(chan UploadPart, len(chunks))
  154. failed := make(chan error)
  155. die := make(chan bool)
  156. var completedBytes int64
  157. totalBytes := getTotalBytes(chunks)
  158. event := newProgressEvent(TransferStartedEvent, 0, totalBytes)
  159. publishProgress(listener, event)
  160. // Start the worker coroutine
  161. arg := workerArg{&bucket, filePath, imur, payerOptions, uploadPartHooker}
  162. for w := 1; w <= routines; w++ {
  163. go worker(w, arg, jobs, results, failed, die)
  164. }
  165. // Schedule the jobs
  166. go scheduler(jobs, chunks)
  167. // Waiting for the upload finished
  168. completed := 0
  169. parts := make([]UploadPart, len(chunks))
  170. for completed < len(chunks) {
  171. select {
  172. case part := <-results:
  173. completed++
  174. parts[part.PartNumber-1] = part
  175. completedBytes += chunks[part.PartNumber-1].Size
  176. event = newProgressEvent(TransferDataEvent, completedBytes, totalBytes)
  177. publishProgress(listener, event)
  178. case err := <-failed:
  179. close(die)
  180. event = newProgressEvent(TransferFailedEvent, completedBytes, totalBytes)
  181. publishProgress(listener, event)
  182. bucket.AbortMultipartUpload(imur, payerOptions...)
  183. return err
  184. }
  185. if completed >= len(chunks) {
  186. break
  187. }
  188. }
  189. event = newProgressEvent(TransferStartedEvent, completedBytes, totalBytes)
  190. publishProgress(listener, event)
  191. // Complete the multpart upload
  192. _, err = bucket.CompleteMultipartUpload(imur, parts, payerOptions...)
  193. if err != nil {
  194. bucket.AbortMultipartUpload(imur, payerOptions...)
  195. return err
  196. }
  197. return nil
  198. }
  199. // ----- concurrent upload with checkpoint -----
  200. const uploadCpMagic = "FE8BB4EA-B593-4FAC-AD7A-2459A36E2E62"
  201. type uploadCheckpoint struct {
  202. Magic string // Magic
  203. MD5 string // Checkpoint file content's MD5
  204. FilePath string // Local file path
  205. FileStat cpStat // File state
  206. ObjectKey string // Key
  207. UploadID string // Upload ID
  208. Parts []cpPart // All parts of the local file
  209. }
  210. type cpStat struct {
  211. Size int64 // File size
  212. LastModified time.Time // File's last modified time
  213. MD5 string // Local file's MD5
  214. }
  215. type cpPart struct {
  216. Chunk FileChunk // File chunk
  217. Part UploadPart // Uploaded part
  218. IsCompleted bool // Upload complete flag
  219. }
  220. // isValid checks if the uploaded data is valid---it's valid when the file is not updated and the checkpoint data is valid.
  221. func (cp uploadCheckpoint) isValid(filePath string) (bool, error) {
  222. // Compare the CP's magic number and MD5.
  223. cpb := cp
  224. cpb.MD5 = ""
  225. js, _ := json.Marshal(cpb)
  226. sum := md5.Sum(js)
  227. b64 := base64.StdEncoding.EncodeToString(sum[:])
  228. if cp.Magic != uploadCpMagic || b64 != cp.MD5 {
  229. return false, nil
  230. }
  231. // Make sure if the local file is updated.
  232. fd, err := os.Open(filePath)
  233. if err != nil {
  234. return false, err
  235. }
  236. defer fd.Close()
  237. st, err := fd.Stat()
  238. if err != nil {
  239. return false, err
  240. }
  241. md, err := calcFileMD5(filePath)
  242. if err != nil {
  243. return false, err
  244. }
  245. // Compare the file size, file's last modified time and file's MD5
  246. if cp.FileStat.Size != st.Size() ||
  247. cp.FileStat.LastModified != st.ModTime() ||
  248. cp.FileStat.MD5 != md {
  249. return false, nil
  250. }
  251. return true, nil
  252. }
  253. // load loads from the file
  254. func (cp *uploadCheckpoint) load(filePath string) error {
  255. contents, err := ioutil.ReadFile(filePath)
  256. if err != nil {
  257. return err
  258. }
  259. err = json.Unmarshal(contents, cp)
  260. return err
  261. }
  262. // dump dumps to the local file
  263. func (cp *uploadCheckpoint) dump(filePath string) error {
  264. bcp := *cp
  265. // Calculate MD5
  266. bcp.MD5 = ""
  267. js, err := json.Marshal(bcp)
  268. if err != nil {
  269. return err
  270. }
  271. sum := md5.Sum(js)
  272. b64 := base64.StdEncoding.EncodeToString(sum[:])
  273. bcp.MD5 = b64
  274. // Serialization
  275. js, err = json.Marshal(bcp)
  276. if err != nil {
  277. return err
  278. }
  279. // Dump
  280. return ioutil.WriteFile(filePath, js, FilePermMode)
  281. }
  282. // updatePart updates the part status
  283. func (cp *uploadCheckpoint) updatePart(part UploadPart) {
  284. cp.Parts[part.PartNumber-1].Part = part
  285. cp.Parts[part.PartNumber-1].IsCompleted = true
  286. }
  287. // todoParts returns unfinished parts
  288. func (cp *uploadCheckpoint) todoParts() []FileChunk {
  289. fcs := []FileChunk{}
  290. for _, part := range cp.Parts {
  291. if !part.IsCompleted {
  292. fcs = append(fcs, part.Chunk)
  293. }
  294. }
  295. return fcs
  296. }
  297. // allParts returns all parts
  298. func (cp *uploadCheckpoint) allParts() []UploadPart {
  299. ps := []UploadPart{}
  300. for _, part := range cp.Parts {
  301. ps = append(ps, part.Part)
  302. }
  303. return ps
  304. }
  305. // getCompletedBytes returns completed bytes count
  306. func (cp *uploadCheckpoint) getCompletedBytes() int64 {
  307. var completedBytes int64
  308. for _, part := range cp.Parts {
  309. if part.IsCompleted {
  310. completedBytes += part.Chunk.Size
  311. }
  312. }
  313. return completedBytes
  314. }
  315. // calcFileMD5 calculates the MD5 for the specified local file
  316. func calcFileMD5(filePath string) (string, error) {
  317. return "", nil
  318. }
  319. // prepare initializes the multipart upload
  320. func prepare(cp *uploadCheckpoint, objectKey, filePath string, partSize int64, bucket *Bucket, options []Option) error {
  321. // CP
  322. cp.Magic = uploadCpMagic
  323. cp.FilePath = filePath
  324. cp.ObjectKey = objectKey
  325. // Local file
  326. fd, err := os.Open(filePath)
  327. if err != nil {
  328. return err
  329. }
  330. defer fd.Close()
  331. st, err := fd.Stat()
  332. if err != nil {
  333. return err
  334. }
  335. cp.FileStat.Size = st.Size()
  336. cp.FileStat.LastModified = st.ModTime()
  337. md, err := calcFileMD5(filePath)
  338. if err != nil {
  339. return err
  340. }
  341. cp.FileStat.MD5 = md
  342. // Chunks
  343. parts, err := SplitFileByPartSize(filePath, partSize)
  344. if err != nil {
  345. return err
  346. }
  347. cp.Parts = make([]cpPart, len(parts))
  348. for i, part := range parts {
  349. cp.Parts[i].Chunk = part
  350. cp.Parts[i].IsCompleted = false
  351. }
  352. // Init load
  353. imur, err := bucket.InitiateMultipartUpload(objectKey, options...)
  354. if err != nil {
  355. return err
  356. }
  357. cp.UploadID = imur.UploadID
  358. return nil
  359. }
  360. // complete completes the multipart upload and deletes the local CP files
  361. func complete(cp *uploadCheckpoint, bucket *Bucket, parts []UploadPart, cpFilePath string, options []Option) error {
  362. imur := InitiateMultipartUploadResult{Bucket: bucket.BucketName,
  363. Key: cp.ObjectKey, UploadID: cp.UploadID}
  364. _, err := bucket.CompleteMultipartUpload(imur, parts, options...)
  365. if err != nil {
  366. return err
  367. }
  368. os.Remove(cpFilePath)
  369. return err
  370. }
  371. // uploadFileWithCp handles concurrent upload with checkpoint
  372. func (bucket Bucket) uploadFileWithCp(objectKey, filePath string, partSize int64, options []Option, cpFilePath string, routines int) error {
  373. listener := getProgressListener(options)
  374. payerOptions := []Option{}
  375. payer := getPayer(options)
  376. if payer != "" {
  377. payerOptions = append(payerOptions, RequestPayer(PayerType(payer)))
  378. }
  379. // Load CP data
  380. ucp := uploadCheckpoint{}
  381. err := ucp.load(cpFilePath)
  382. if err != nil {
  383. os.Remove(cpFilePath)
  384. }
  385. // Load error or the CP data is invalid.
  386. valid, err := ucp.isValid(filePath)
  387. if err != nil || !valid {
  388. if err = prepare(&ucp, objectKey, filePath, partSize, &bucket, options); err != nil {
  389. return err
  390. }
  391. os.Remove(cpFilePath)
  392. }
  393. chunks := ucp.todoParts()
  394. imur := InitiateMultipartUploadResult{
  395. Bucket: bucket.BucketName,
  396. Key: objectKey,
  397. UploadID: ucp.UploadID}
  398. jobs := make(chan FileChunk, len(chunks))
  399. results := make(chan UploadPart, len(chunks))
  400. failed := make(chan error)
  401. die := make(chan bool)
  402. completedBytes := ucp.getCompletedBytes()
  403. event := newProgressEvent(TransferStartedEvent, completedBytes, ucp.FileStat.Size)
  404. publishProgress(listener, event)
  405. // Start the workers
  406. arg := workerArg{&bucket, filePath, imur, payerOptions, uploadPartHooker}
  407. for w := 1; w <= routines; w++ {
  408. go worker(w, arg, jobs, results, failed, die)
  409. }
  410. // Schedule jobs
  411. go scheduler(jobs, chunks)
  412. // Waiting for the job finished
  413. completed := 0
  414. for completed < len(chunks) {
  415. select {
  416. case part := <-results:
  417. completed++
  418. ucp.updatePart(part)
  419. ucp.dump(cpFilePath)
  420. completedBytes += ucp.Parts[part.PartNumber-1].Chunk.Size
  421. event = newProgressEvent(TransferDataEvent, completedBytes, ucp.FileStat.Size)
  422. publishProgress(listener, event)
  423. case err := <-failed:
  424. close(die)
  425. event = newProgressEvent(TransferFailedEvent, completedBytes, ucp.FileStat.Size)
  426. publishProgress(listener, event)
  427. return err
  428. }
  429. if completed >= len(chunks) {
  430. break
  431. }
  432. }
  433. event = newProgressEvent(TransferCompletedEvent, completedBytes, ucp.FileStat.Size)
  434. publishProgress(listener, event)
  435. // Complete the multipart upload
  436. err = complete(&ucp, &bucket, ucp.allParts(), cpFilePath, payerOptions)
  437. return err
  438. }