upload.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552
  1. package oss
  2. import (
  3. "crypto/md5"
  4. "encoding/base64"
  5. "encoding/hex"
  6. "encoding/json"
  7. "errors"
  8. "fmt"
  9. "io/ioutil"
  10. "net/http"
  11. "os"
  12. "path/filepath"
  13. "time"
  14. )
  15. // UploadFile is multipart file upload.
  16. //
  17. // objectKey the object name.
  18. // filePath the local file path to upload.
  19. // partSize the part size in byte.
  20. // options the options for uploading object.
  21. //
  22. // error it's nil if the operation succeeds, otherwise it's an error object.
  23. //
  24. func (bucket Bucket) UploadFile(objectKey, filePath string, partSize int64, options ...Option) error {
  25. if partSize < MinPartSize || partSize > MaxPartSize {
  26. return errors.New("oss: part size invalid range (100KB, 5GB]")
  27. }
  28. cpConf := getCpConfig(options)
  29. routines := getRoutines(options)
  30. if cpConf != nil && cpConf.IsEnable {
  31. cpFilePath := getUploadCpFilePath(cpConf, filePath, bucket.BucketName, objectKey)
  32. if cpFilePath != "" {
  33. return bucket.uploadFileWithCp(objectKey, filePath, partSize, options, cpFilePath, routines)
  34. }
  35. }
  36. return bucket.uploadFile(objectKey, filePath, partSize, options, routines)
  37. }
  38. func getUploadCpFilePath(cpConf *cpConfig, srcFile, destBucket, destObject string) string {
  39. if cpConf.FilePath == "" && cpConf.DirPath != "" {
  40. dest := fmt.Sprintf("oss://%v/%v", destBucket, destObject)
  41. absPath, _ := filepath.Abs(srcFile)
  42. cpFileName := getCpFileName(absPath, dest, "")
  43. cpConf.FilePath = cpConf.DirPath + string(os.PathSeparator) + cpFileName
  44. }
  45. return cpConf.FilePath
  46. }
  47. // ----- concurrent upload without checkpoint -----
  48. // getCpConfig gets checkpoint configuration
  49. func getCpConfig(options []Option) *cpConfig {
  50. cpcOpt, err := FindOption(options, checkpointConfig, nil)
  51. if err != nil || cpcOpt == nil {
  52. return nil
  53. }
  54. return cpcOpt.(*cpConfig)
  55. }
  56. // getCpFileName return the name of the checkpoint file
  57. func getCpFileName(src, dest, versionId string) string {
  58. md5Ctx := md5.New()
  59. md5Ctx.Write([]byte(src))
  60. srcCheckSum := hex.EncodeToString(md5Ctx.Sum(nil))
  61. md5Ctx.Reset()
  62. md5Ctx.Write([]byte(dest))
  63. destCheckSum := hex.EncodeToString(md5Ctx.Sum(nil))
  64. if versionId == "" {
  65. return fmt.Sprintf("%v-%v.cp", srcCheckSum, destCheckSum)
  66. }
  67. md5Ctx.Reset()
  68. md5Ctx.Write([]byte(versionId))
  69. versionCheckSum := hex.EncodeToString(md5Ctx.Sum(nil))
  70. return fmt.Sprintf("%v-%v-%v.cp", srcCheckSum, destCheckSum, versionCheckSum)
  71. }
  72. // getRoutines gets the routine count. by default it's 1.
  73. func getRoutines(options []Option) int {
  74. rtnOpt, err := FindOption(options, routineNum, nil)
  75. if err != nil || rtnOpt == nil {
  76. return 1
  77. }
  78. rs := rtnOpt.(int)
  79. if rs < 1 {
  80. rs = 1
  81. } else if rs > 100 {
  82. rs = 100
  83. }
  84. return rs
  85. }
  86. // getPayer return the payer of the request
  87. func getPayer(options []Option) string {
  88. payerOpt, err := FindOption(options, HTTPHeaderOssRequester, nil)
  89. if err != nil || payerOpt == nil {
  90. return ""
  91. }
  92. return payerOpt.(string)
  93. }
  94. // GetProgressListener gets the progress callback
  95. func GetProgressListener(options []Option) ProgressListener {
  96. isSet, listener, _ := IsOptionSet(options, progressListener)
  97. if !isSet {
  98. return nil
  99. }
  100. return listener.(ProgressListener)
  101. }
  102. // uploadPartHook is for testing usage
  103. type uploadPartHook func(id int, chunk FileChunk) error
  104. var uploadPartHooker uploadPartHook = defaultUploadPart
  105. func defaultUploadPart(id int, chunk FileChunk) error {
  106. return nil
  107. }
  108. // workerArg defines worker argument structure
  109. type workerArg struct {
  110. bucket *Bucket
  111. filePath string
  112. imur InitiateMultipartUploadResult
  113. options []Option
  114. hook uploadPartHook
  115. }
  116. // worker is the worker coroutine function
  117. type defaultUploadProgressListener struct {
  118. }
  119. // ProgressChanged no-ops
  120. func (listener *defaultUploadProgressListener) ProgressChanged(event *ProgressEvent) {
  121. }
  122. func worker(id int, arg workerArg, jobs <-chan FileChunk, results chan<- UploadPart, failed chan<- error, die <-chan bool) {
  123. for chunk := range jobs {
  124. if err := arg.hook(id, chunk); err != nil {
  125. failed <- err
  126. break
  127. }
  128. var respHeader http.Header
  129. p := Progress(&defaultUploadProgressListener{})
  130. opts := make([]Option, len(arg.options)+2)
  131. opts = append(opts, arg.options...)
  132. // use defaultUploadProgressListener
  133. opts = append(opts, p, GetResponseHeader(&respHeader))
  134. startT := time.Now().UnixNano() / 1000 / 1000 / 1000
  135. part, err := arg.bucket.UploadPartFromFile(arg.imur, arg.filePath, chunk.Offset, chunk.Size, chunk.Number, opts...)
  136. endT := time.Now().UnixNano() / 1000 / 1000 / 1000
  137. if err != nil {
  138. arg.bucket.Client.Config.WriteLog(Debug, "upload part error,cost:%d second,part number:%d,request id:%s,error:%s\n", endT-startT, chunk.Number, GetRequestId(respHeader), err.Error())
  139. failed <- err
  140. break
  141. }
  142. select {
  143. case <-die:
  144. return
  145. default:
  146. }
  147. results <- part
  148. }
  149. }
  150. // scheduler function
  151. func scheduler(jobs chan FileChunk, chunks []FileChunk) {
  152. for _, chunk := range chunks {
  153. jobs <- chunk
  154. }
  155. close(jobs)
  156. }
  157. func getTotalBytes(chunks []FileChunk) int64 {
  158. var tb int64
  159. for _, chunk := range chunks {
  160. tb += chunk.Size
  161. }
  162. return tb
  163. }
  164. // uploadFile is a concurrent upload, without checkpoint
  165. func (bucket Bucket) uploadFile(objectKey, filePath string, partSize int64, options []Option, routines int) error {
  166. listener := GetProgressListener(options)
  167. chunks, err := SplitFileByPartSize(filePath, partSize)
  168. if err != nil {
  169. return err
  170. }
  171. partOptions := ChoiceTransferPartOption(options)
  172. completeOptions := ChoiceCompletePartOption(options)
  173. abortOptions := ChoiceAbortPartOption(options)
  174. // Initialize the multipart upload
  175. imur, err := bucket.InitiateMultipartUpload(objectKey, options...)
  176. if err != nil {
  177. return err
  178. }
  179. jobs := make(chan FileChunk, len(chunks))
  180. results := make(chan UploadPart, len(chunks))
  181. failed := make(chan error)
  182. die := make(chan bool)
  183. var completedBytes int64
  184. totalBytes := getTotalBytes(chunks)
  185. event := newProgressEvent(TransferStartedEvent, 0, totalBytes, 0)
  186. publishProgress(listener, event)
  187. // Start the worker coroutine
  188. arg := workerArg{&bucket, filePath, imur, partOptions, uploadPartHooker}
  189. for w := 1; w <= routines; w++ {
  190. go worker(w, arg, jobs, results, failed, die)
  191. }
  192. // Schedule the jobs
  193. go scheduler(jobs, chunks)
  194. // Waiting for the upload finished
  195. completed := 0
  196. parts := make([]UploadPart, len(chunks))
  197. for completed < len(chunks) {
  198. select {
  199. case part := <-results:
  200. completed++
  201. parts[part.PartNumber-1] = part
  202. completedBytes += chunks[part.PartNumber-1].Size
  203. // why RwBytes in ProgressEvent is 0 ?
  204. // because read or write event has been notified in teeReader.Read()
  205. event = newProgressEvent(TransferDataEvent, completedBytes, totalBytes, chunks[part.PartNumber-1].Size)
  206. publishProgress(listener, event)
  207. case err := <-failed:
  208. close(die)
  209. event = newProgressEvent(TransferFailedEvent, completedBytes, totalBytes, 0)
  210. publishProgress(listener, event)
  211. bucket.AbortMultipartUpload(imur, abortOptions...)
  212. return err
  213. }
  214. if completed >= len(chunks) {
  215. break
  216. }
  217. }
  218. event = newProgressEvent(TransferStartedEvent, completedBytes, totalBytes, 0)
  219. publishProgress(listener, event)
  220. // Complete the multpart upload
  221. _, err = bucket.CompleteMultipartUpload(imur, parts, completeOptions...)
  222. if err != nil {
  223. bucket.AbortMultipartUpload(imur, abortOptions...)
  224. return err
  225. }
  226. return nil
  227. }
  228. // ----- concurrent upload with checkpoint -----
  229. const uploadCpMagic = "FE8BB4EA-B593-4FAC-AD7A-2459A36E2E62"
  230. type uploadCheckpoint struct {
  231. Magic string // Magic
  232. MD5 string // Checkpoint file content's MD5
  233. FilePath string // Local file path
  234. FileStat cpStat // File state
  235. ObjectKey string // Key
  236. UploadID string // Upload ID
  237. Parts []cpPart // All parts of the local file
  238. }
  239. type cpStat struct {
  240. Size int64 // File size
  241. LastModified time.Time // File's last modified time
  242. MD5 string // Local file's MD5
  243. }
  244. type cpPart struct {
  245. Chunk FileChunk // File chunk
  246. Part UploadPart // Uploaded part
  247. IsCompleted bool // Upload complete flag
  248. }
  249. // isValid checks if the uploaded data is valid---it's valid when the file is not updated and the checkpoint data is valid.
  250. func (cp uploadCheckpoint) isValid(filePath string) (bool, error) {
  251. // Compare the CP's magic number and MD5.
  252. cpb := cp
  253. cpb.MD5 = ""
  254. js, _ := json.Marshal(cpb)
  255. sum := md5.Sum(js)
  256. b64 := base64.StdEncoding.EncodeToString(sum[:])
  257. if cp.Magic != uploadCpMagic || b64 != cp.MD5 {
  258. return false, nil
  259. }
  260. // Make sure if the local file is updated.
  261. fd, err := os.Open(filePath)
  262. if err != nil {
  263. return false, err
  264. }
  265. defer fd.Close()
  266. st, err := fd.Stat()
  267. if err != nil {
  268. return false, err
  269. }
  270. md, err := calcFileMD5(filePath)
  271. if err != nil {
  272. return false, err
  273. }
  274. // Compare the file size, file's last modified time and file's MD5
  275. if cp.FileStat.Size != st.Size() ||
  276. !cp.FileStat.LastModified.Equal(st.ModTime()) ||
  277. cp.FileStat.MD5 != md {
  278. return false, nil
  279. }
  280. return true, nil
  281. }
  282. // load loads from the file
  283. func (cp *uploadCheckpoint) load(filePath string) error {
  284. contents, err := ioutil.ReadFile(filePath)
  285. if err != nil {
  286. return err
  287. }
  288. err = json.Unmarshal(contents, cp)
  289. return err
  290. }
  291. // dump dumps to the local file
  292. func (cp *uploadCheckpoint) dump(filePath string) error {
  293. bcp := *cp
  294. // Calculate MD5
  295. bcp.MD5 = ""
  296. js, err := json.Marshal(bcp)
  297. if err != nil {
  298. return err
  299. }
  300. sum := md5.Sum(js)
  301. b64 := base64.StdEncoding.EncodeToString(sum[:])
  302. bcp.MD5 = b64
  303. // Serialization
  304. js, err = json.Marshal(bcp)
  305. if err != nil {
  306. return err
  307. }
  308. // Dump
  309. return ioutil.WriteFile(filePath, js, FilePermMode)
  310. }
  311. // updatePart updates the part status
  312. func (cp *uploadCheckpoint) updatePart(part UploadPart) {
  313. cp.Parts[part.PartNumber-1].Part = part
  314. cp.Parts[part.PartNumber-1].IsCompleted = true
  315. }
  316. // todoParts returns unfinished parts
  317. func (cp *uploadCheckpoint) todoParts() []FileChunk {
  318. fcs := []FileChunk{}
  319. for _, part := range cp.Parts {
  320. if !part.IsCompleted {
  321. fcs = append(fcs, part.Chunk)
  322. }
  323. }
  324. return fcs
  325. }
  326. // allParts returns all parts
  327. func (cp *uploadCheckpoint) allParts() []UploadPart {
  328. ps := []UploadPart{}
  329. for _, part := range cp.Parts {
  330. ps = append(ps, part.Part)
  331. }
  332. return ps
  333. }
  334. // getCompletedBytes returns completed bytes count
  335. func (cp *uploadCheckpoint) getCompletedBytes() int64 {
  336. var completedBytes int64
  337. for _, part := range cp.Parts {
  338. if part.IsCompleted {
  339. completedBytes += part.Chunk.Size
  340. }
  341. }
  342. return completedBytes
  343. }
  344. // calcFileMD5 calculates the MD5 for the specified local file
  345. func calcFileMD5(filePath string) (string, error) {
  346. return "", nil
  347. }
  348. // prepare initializes the multipart upload
  349. func prepare(cp *uploadCheckpoint, objectKey, filePath string, partSize int64, bucket *Bucket, options []Option) error {
  350. // CP
  351. cp.Magic = uploadCpMagic
  352. cp.FilePath = filePath
  353. cp.ObjectKey = objectKey
  354. // Local file
  355. fd, err := os.Open(filePath)
  356. if err != nil {
  357. return err
  358. }
  359. defer fd.Close()
  360. st, err := fd.Stat()
  361. if err != nil {
  362. return err
  363. }
  364. cp.FileStat.Size = st.Size()
  365. cp.FileStat.LastModified = st.ModTime()
  366. md, err := calcFileMD5(filePath)
  367. if err != nil {
  368. return err
  369. }
  370. cp.FileStat.MD5 = md
  371. // Chunks
  372. parts, err := SplitFileByPartSize(filePath, partSize)
  373. if err != nil {
  374. return err
  375. }
  376. cp.Parts = make([]cpPart, len(parts))
  377. for i, part := range parts {
  378. cp.Parts[i].Chunk = part
  379. cp.Parts[i].IsCompleted = false
  380. }
  381. // Init load
  382. imur, err := bucket.InitiateMultipartUpload(objectKey, options...)
  383. if err != nil {
  384. return err
  385. }
  386. cp.UploadID = imur.UploadID
  387. return nil
  388. }
  389. // complete completes the multipart upload and deletes the local CP files
  390. func complete(cp *uploadCheckpoint, bucket *Bucket, parts []UploadPart, cpFilePath string, options []Option) error {
  391. imur := InitiateMultipartUploadResult{Bucket: bucket.BucketName,
  392. Key: cp.ObjectKey, UploadID: cp.UploadID}
  393. _, err := bucket.CompleteMultipartUpload(imur, parts, options...)
  394. if err != nil {
  395. return err
  396. }
  397. os.Remove(cpFilePath)
  398. return err
  399. }
  400. // uploadFileWithCp handles concurrent upload with checkpoint
  401. func (bucket Bucket) uploadFileWithCp(objectKey, filePath string, partSize int64, options []Option, cpFilePath string, routines int) error {
  402. listener := GetProgressListener(options)
  403. partOptions := ChoiceTransferPartOption(options)
  404. completeOptions := ChoiceCompletePartOption(options)
  405. // Load CP data
  406. ucp := uploadCheckpoint{}
  407. err := ucp.load(cpFilePath)
  408. if err != nil {
  409. os.Remove(cpFilePath)
  410. }
  411. // Load error or the CP data is invalid.
  412. valid, err := ucp.isValid(filePath)
  413. if err != nil || !valid {
  414. if err = prepare(&ucp, objectKey, filePath, partSize, &bucket, options); err != nil {
  415. return err
  416. }
  417. os.Remove(cpFilePath)
  418. }
  419. chunks := ucp.todoParts()
  420. imur := InitiateMultipartUploadResult{
  421. Bucket: bucket.BucketName,
  422. Key: objectKey,
  423. UploadID: ucp.UploadID}
  424. jobs := make(chan FileChunk, len(chunks))
  425. results := make(chan UploadPart, len(chunks))
  426. failed := make(chan error)
  427. die := make(chan bool)
  428. completedBytes := ucp.getCompletedBytes()
  429. // why RwBytes in ProgressEvent is 0 ?
  430. // because read or write event has been notified in teeReader.Read()
  431. event := newProgressEvent(TransferStartedEvent, completedBytes, ucp.FileStat.Size, 0)
  432. publishProgress(listener, event)
  433. // Start the workers
  434. arg := workerArg{&bucket, filePath, imur, partOptions, uploadPartHooker}
  435. for w := 1; w <= routines; w++ {
  436. go worker(w, arg, jobs, results, failed, die)
  437. }
  438. // Schedule jobs
  439. go scheduler(jobs, chunks)
  440. // Waiting for the job finished
  441. completed := 0
  442. for completed < len(chunks) {
  443. select {
  444. case part := <-results:
  445. completed++
  446. ucp.updatePart(part)
  447. ucp.dump(cpFilePath)
  448. completedBytes += ucp.Parts[part.PartNumber-1].Chunk.Size
  449. event = newProgressEvent(TransferDataEvent, completedBytes, ucp.FileStat.Size, ucp.Parts[part.PartNumber-1].Chunk.Size)
  450. publishProgress(listener, event)
  451. case err := <-failed:
  452. close(die)
  453. event = newProgressEvent(TransferFailedEvent, completedBytes, ucp.FileStat.Size, 0)
  454. publishProgress(listener, event)
  455. return err
  456. }
  457. if completed >= len(chunks) {
  458. break
  459. }
  460. }
  461. event = newProgressEvent(TransferCompletedEvent, completedBytes, ucp.FileStat.Size, 0)
  462. publishProgress(listener, event)
  463. // Complete the multipart upload
  464. err = complete(&ucp, &bucket, ucp.allParts(), cpFilePath, completeOptions)
  465. return err
  466. }