upload.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484
  1. package oss
  2. import (
  3. "crypto/md5"
  4. "encoding/base64"
  5. "encoding/json"
  6. "errors"
  7. "io/ioutil"
  8. "os"
  9. "time"
  10. )
  11. // UploadFile is multipart file upload.
  12. //
  13. // objectKey the object name.
  14. // filePath the local file path to upload.
  15. // partSize the part size in byte.
  16. // options the options for uploading object.
  17. //
  18. // error it's nil if the operation succeeds, otherwise it's an error object.
  19. //
  20. func (bucket Bucket) UploadFile(objectKey, filePath string, partSize int64, options ...Option) error {
  21. if partSize < MinPartSize || partSize > MaxPartSize {
  22. return errors.New("oss: part size invalid range (1024KB, 5GB]")
  23. }
  24. cpConf, err := getCpConfig(options, filePath)
  25. if err != nil {
  26. return err
  27. }
  28. routines := getRoutines(options)
  29. if cpConf.IsEnable {
  30. return bucket.uploadFileWithCp(objectKey, filePath, partSize, options, cpConf.FilePath, routines)
  31. }
  32. return bucket.uploadFile(objectKey, filePath, partSize, options, routines)
  33. }
  34. // ----- concurrent upload without checkpoint -----
  35. // getCpConfig gets checkpoint configuration
  36. func getCpConfig(options []Option, filePath string) (*cpConfig, error) {
  37. cpc := &cpConfig{}
  38. cpcOpt, err := findOption(options, checkpointConfig, nil)
  39. if err != nil || cpcOpt == nil {
  40. return cpc, err
  41. }
  42. cpc = cpcOpt.(*cpConfig)
  43. if cpc.IsEnable && cpc.FilePath == "" {
  44. cpc.FilePath = filePath + CheckpointFileSuffix
  45. }
  46. return cpc, nil
  47. }
  48. // getRoutines gets the routine count. by default it's 1.
  49. func getRoutines(options []Option) int {
  50. rtnOpt, err := findOption(options, routineNum, nil)
  51. if err != nil || rtnOpt == nil {
  52. return 1
  53. }
  54. rs := rtnOpt.(int)
  55. if rs < 1 {
  56. rs = 1
  57. } else if rs > 100 {
  58. rs = 100
  59. }
  60. return rs
  61. }
  62. // getProgressListener gets the progress callback
  63. func getProgressListener(options []Option) ProgressListener {
  64. isSet, listener, _ := isOptionSet(options, progressListener)
  65. if !isSet {
  66. return nil
  67. }
  68. return listener.(ProgressListener)
  69. }
  70. // uploadPartHook is for testing usage
  71. type uploadPartHook func(id int, chunk FileChunk) error
  72. var uploadPartHooker uploadPartHook = defaultUploadPart
  73. func defaultUploadPart(id int, chunk FileChunk) error {
  74. return nil
  75. }
  76. // workerArg defines worker argument structure
  77. type workerArg struct {
  78. bucket *Bucket
  79. filePath string
  80. imur InitiateMultipartUploadResult
  81. hook uploadPartHook
  82. }
  83. // worker is the worker coroutine function
  84. func worker(id int, arg workerArg, jobs <-chan FileChunk, results chan<- UploadPart, failed chan<- error, die <-chan bool) {
  85. for chunk := range jobs {
  86. if err := arg.hook(id, chunk); err != nil {
  87. failed <- err
  88. break
  89. }
  90. part, err := arg.bucket.UploadPartFromFile(arg.imur, arg.filePath, chunk.Offset, chunk.Size, chunk.Number)
  91. if err != nil {
  92. failed <- err
  93. break
  94. }
  95. select {
  96. case <-die:
  97. return
  98. default:
  99. }
  100. results <- part
  101. }
  102. }
  103. // scheduler function
  104. func scheduler(jobs chan FileChunk, chunks []FileChunk) {
  105. for _, chunk := range chunks {
  106. jobs <- chunk
  107. }
  108. close(jobs)
  109. }
  110. func getTotalBytes(chunks []FileChunk) int64 {
  111. var tb int64
  112. for _, chunk := range chunks {
  113. tb += chunk.Size
  114. }
  115. return tb
  116. }
  117. // uploadFile is a concurrent upload, without checkpoint
  118. func (bucket Bucket) uploadFile(objectKey, filePath string, partSize int64, options []Option, routines int) error {
  119. listener := getProgressListener(options)
  120. chunks, err := SplitFileByPartSize(filePath, partSize)
  121. if err != nil {
  122. return err
  123. }
  124. // Initialize the multipart upload
  125. imur, err := bucket.InitiateMultipartUpload(objectKey, options...)
  126. if err != nil {
  127. return err
  128. }
  129. jobs := make(chan FileChunk, len(chunks))
  130. results := make(chan UploadPart, len(chunks))
  131. failed := make(chan error)
  132. die := make(chan bool)
  133. var completedBytes int64
  134. totalBytes := getTotalBytes(chunks)
  135. event := newProgressEvent(TransferStartedEvent, 0, totalBytes)
  136. publishProgress(listener, event)
  137. // Start the worker coroutine
  138. arg := workerArg{&bucket, filePath, imur, uploadPartHooker}
  139. for w := 1; w <= routines; w++ {
  140. go worker(w, arg, jobs, results, failed, die)
  141. }
  142. // Schedule the jobs
  143. go scheduler(jobs, chunks)
  144. // Waiting for the upload finished
  145. completed := 0
  146. parts := make([]UploadPart, len(chunks))
  147. for completed < len(chunks) {
  148. select {
  149. case part := <-results:
  150. completed++
  151. parts[part.PartNumber-1] = part
  152. completedBytes += chunks[part.PartNumber-1].Size
  153. event = newProgressEvent(TransferDataEvent, completedBytes, totalBytes)
  154. publishProgress(listener, event)
  155. case err := <-failed:
  156. close(die)
  157. event = newProgressEvent(TransferFailedEvent, completedBytes, totalBytes)
  158. publishProgress(listener, event)
  159. bucket.AbortMultipartUpload(imur)
  160. return err
  161. }
  162. if completed >= len(chunks) {
  163. break
  164. }
  165. }
  166. event = newProgressEvent(TransferStartedEvent, completedBytes, totalBytes)
  167. publishProgress(listener, event)
  168. // Complete the multpart upload
  169. _, err = bucket.CompleteMultipartUpload(imur, parts)
  170. if err != nil {
  171. bucket.AbortMultipartUpload(imur)
  172. return err
  173. }
  174. return nil
  175. }
  176. // ----- concurrent upload with checkpoint -----
  177. const uploadCpMagic = "FE8BB4EA-B593-4FAC-AD7A-2459A36E2E62"
  178. type uploadCheckpoint struct {
  179. Magic string // Magic
  180. MD5 string // Checkpoint file content's MD5
  181. FilePath string // Local file path
  182. FileStat cpStat // File state
  183. ObjectKey string // Key
  184. UploadID string // Upload ID
  185. Parts []cpPart // All parts of the local file
  186. }
  187. type cpStat struct {
  188. Size int64 // File size
  189. LastModified time.Time // File's last modified time
  190. MD5 string // Local file's MD5
  191. }
  192. type cpPart struct {
  193. Chunk FileChunk // File chunk
  194. Part UploadPart // Uploaded part
  195. IsCompleted bool // Upload complete flag
  196. }
  197. // isValid checks if the uploaded data is valid---it's valid when the file is not updated and the checkpoint data is valid.
  198. func (cp uploadCheckpoint) isValid(filePath string) (bool, error) {
  199. // Compare the CP's magic number and MD5.
  200. cpb := cp
  201. cpb.MD5 = ""
  202. js, _ := json.Marshal(cpb)
  203. sum := md5.Sum(js)
  204. b64 := base64.StdEncoding.EncodeToString(sum[:])
  205. if cp.Magic != uploadCpMagic || b64 != cp.MD5 {
  206. return false, nil
  207. }
  208. // Make sure if the local file is updated.
  209. fd, err := os.Open(filePath)
  210. if err != nil {
  211. return false, err
  212. }
  213. defer fd.Close()
  214. st, err := fd.Stat()
  215. if err != nil {
  216. return false, err
  217. }
  218. md, err := calcFileMD5(filePath)
  219. if err != nil {
  220. return false, err
  221. }
  222. // Compare the file size, file's last modified time and file's MD5
  223. if cp.FileStat.Size != st.Size() ||
  224. cp.FileStat.LastModified != st.ModTime() ||
  225. cp.FileStat.MD5 != md {
  226. return false, nil
  227. }
  228. return true, nil
  229. }
  230. // load loads from the file
  231. func (cp *uploadCheckpoint) load(filePath string) error {
  232. contents, err := ioutil.ReadFile(filePath)
  233. if err != nil {
  234. return err
  235. }
  236. err = json.Unmarshal(contents, cp)
  237. return err
  238. }
  239. // dump dumps to the local file
  240. func (cp *uploadCheckpoint) dump(filePath string) error {
  241. bcp := *cp
  242. // Calculate MD5
  243. bcp.MD5 = ""
  244. js, err := json.Marshal(bcp)
  245. if err != nil {
  246. return err
  247. }
  248. sum := md5.Sum(js)
  249. b64 := base64.StdEncoding.EncodeToString(sum[:])
  250. bcp.MD5 = b64
  251. // Serialization
  252. js, err = json.Marshal(bcp)
  253. if err != nil {
  254. return err
  255. }
  256. // Dump
  257. return ioutil.WriteFile(filePath, js, FilePermMode)
  258. }
  259. // updatePart updates the part status
  260. func (cp *uploadCheckpoint) updatePart(part UploadPart) {
  261. cp.Parts[part.PartNumber-1].Part = part
  262. cp.Parts[part.PartNumber-1].IsCompleted = true
  263. }
  264. // todoParts returns unfinished parts
  265. func (cp *uploadCheckpoint) todoParts() []FileChunk {
  266. fcs := []FileChunk{}
  267. for _, part := range cp.Parts {
  268. if !part.IsCompleted {
  269. fcs = append(fcs, part.Chunk)
  270. }
  271. }
  272. return fcs
  273. }
  274. // allParts returns all parts
  275. func (cp *uploadCheckpoint) allParts() []UploadPart {
  276. ps := []UploadPart{}
  277. for _, part := range cp.Parts {
  278. ps = append(ps, part.Part)
  279. }
  280. return ps
  281. }
  282. // getCompletedBytes returns completed bytes count
  283. func (cp *uploadCheckpoint) getCompletedBytes() int64 {
  284. var completedBytes int64
  285. for _, part := range cp.Parts {
  286. if part.IsCompleted {
  287. completedBytes += part.Chunk.Size
  288. }
  289. }
  290. return completedBytes
  291. }
  292. // calcFileMD5 calculates the MD5 for the specified local file
  293. func calcFileMD5(filePath string) (string, error) {
  294. return "", nil
  295. }
  296. // prepare initializes the multipart upload
  297. func prepare(cp *uploadCheckpoint, objectKey, filePath string, partSize int64, bucket *Bucket, options []Option) error {
  298. // CP
  299. cp.Magic = uploadCpMagic
  300. cp.FilePath = filePath
  301. cp.ObjectKey = objectKey
  302. // Local file
  303. fd, err := os.Open(filePath)
  304. if err != nil {
  305. return err
  306. }
  307. defer fd.Close()
  308. st, err := fd.Stat()
  309. if err != nil {
  310. return err
  311. }
  312. cp.FileStat.Size = st.Size()
  313. cp.FileStat.LastModified = st.ModTime()
  314. md, err := calcFileMD5(filePath)
  315. if err != nil {
  316. return err
  317. }
  318. cp.FileStat.MD5 = md
  319. // Chunks
  320. parts, err := SplitFileByPartSize(filePath, partSize)
  321. if err != nil {
  322. return err
  323. }
  324. cp.Parts = make([]cpPart, len(parts))
  325. for i, part := range parts {
  326. cp.Parts[i].Chunk = part
  327. cp.Parts[i].IsCompleted = false
  328. }
  329. // Init load
  330. imur, err := bucket.InitiateMultipartUpload(objectKey, options...)
  331. if err != nil {
  332. return err
  333. }
  334. cp.UploadID = imur.UploadID
  335. return nil
  336. }
  337. // complete completes the multipart upload and deletes the local CP files
  338. func complete(cp *uploadCheckpoint, bucket *Bucket, parts []UploadPart, cpFilePath string) error {
  339. imur := InitiateMultipartUploadResult{Bucket: bucket.BucketName,
  340. Key: cp.ObjectKey, UploadID: cp.UploadID}
  341. _, err := bucket.CompleteMultipartUpload(imur, parts)
  342. if err != nil {
  343. return err
  344. }
  345. os.Remove(cpFilePath)
  346. return err
  347. }
  348. // uploadFileWithCp handles concurrent upload with checkpoint
  349. func (bucket Bucket) uploadFileWithCp(objectKey, filePath string, partSize int64, options []Option, cpFilePath string, routines int) error {
  350. listener := getProgressListener(options)
  351. // Load CP data
  352. ucp := uploadCheckpoint{}
  353. err := ucp.load(cpFilePath)
  354. if err != nil {
  355. os.Remove(cpFilePath)
  356. }
  357. // Load error or the CP data is invalid.
  358. valid, err := ucp.isValid(filePath)
  359. if err != nil || !valid {
  360. if err = prepare(&ucp, objectKey, filePath, partSize, &bucket, options); err != nil {
  361. return err
  362. }
  363. os.Remove(cpFilePath)
  364. }
  365. chunks := ucp.todoParts()
  366. imur := InitiateMultipartUploadResult{
  367. Bucket: bucket.BucketName,
  368. Key: objectKey,
  369. UploadID: ucp.UploadID}
  370. jobs := make(chan FileChunk, len(chunks))
  371. results := make(chan UploadPart, len(chunks))
  372. failed := make(chan error)
  373. die := make(chan bool)
  374. completedBytes := ucp.getCompletedBytes()
  375. event := newProgressEvent(TransferStartedEvent, completedBytes, ucp.FileStat.Size)
  376. publishProgress(listener, event)
  377. // Start the workers
  378. arg := workerArg{&bucket, filePath, imur, uploadPartHooker}
  379. for w := 1; w <= routines; w++ {
  380. go worker(w, arg, jobs, results, failed, die)
  381. }
  382. // Schedule jobs
  383. go scheduler(jobs, chunks)
  384. // Waiting for the job finished
  385. completed := 0
  386. for completed < len(chunks) {
  387. select {
  388. case part := <-results:
  389. completed++
  390. ucp.updatePart(part)
  391. ucp.dump(cpFilePath)
  392. completedBytes += ucp.Parts[part.PartNumber-1].Chunk.Size
  393. event = newProgressEvent(TransferDataEvent, completedBytes, ucp.FileStat.Size)
  394. publishProgress(listener, event)
  395. case err := <-failed:
  396. close(die)
  397. event = newProgressEvent(TransferFailedEvent, completedBytes, ucp.FileStat.Size)
  398. publishProgress(listener, event)
  399. return err
  400. }
  401. if completed >= len(chunks) {
  402. break
  403. }
  404. }
  405. event = newProgressEvent(TransferCompletedEvent, completedBytes, ucp.FileStat.Size)
  406. publishProgress(listener, event)
  407. // Complete the multipart upload
  408. err = complete(&ucp, &bucket, ucp.allParts(), cpFilePath)
  409. return err
  410. }