multicopy.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474
  1. package oss
  2. import (
  3. "crypto/md5"
  4. "encoding/base64"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "io/ioutil"
  9. "net/http"
  10. "os"
  11. "strconv"
  12. )
  13. // CopyFile is multipart copy object
  14. //
  15. // srcBucketName source bucket name
  16. // srcObjectKey source object name
  17. // destObjectKey target object name in the form of bucketname.objectkey
  18. // partSize the part size in byte.
  19. // options object's contraints. Check out function InitiateMultipartUpload.
  20. //
  21. // error it's nil if the operation succeeds, otherwise it's an error object.
  22. //
  23. func (bucket Bucket) CopyFile(srcBucketName, srcObjectKey, destObjectKey string, partSize int64, options ...Option) error {
  24. destBucketName := bucket.BucketName
  25. if partSize < MinPartSize || partSize > MaxPartSize {
  26. return errors.New("oss: part size invalid range (1024KB, 5GB]")
  27. }
  28. cpConf := getCpConfig(options)
  29. routines := getRoutines(options)
  30. var strVersionId string
  31. versionId, _ := FindOption(options, "versionId", nil)
  32. if versionId != nil {
  33. strVersionId = versionId.(string)
  34. }
  35. if cpConf != nil && cpConf.IsEnable {
  36. cpFilePath := getCopyCpFilePath(cpConf, srcBucketName, srcObjectKey, destBucketName, destObjectKey, strVersionId)
  37. if cpFilePath != "" {
  38. return bucket.copyFileWithCp(srcBucketName, srcObjectKey, destBucketName, destObjectKey, partSize, options, cpFilePath, routines)
  39. }
  40. }
  41. return bucket.copyFile(srcBucketName, srcObjectKey, destBucketName, destObjectKey,
  42. partSize, options, routines)
  43. }
  44. func getCopyCpFilePath(cpConf *cpConfig, srcBucket, srcObject, destBucket, destObject, versionId string) string {
  45. if cpConf.FilePath == "" && cpConf.DirPath != "" {
  46. dest := fmt.Sprintf("oss://%v/%v", destBucket, destObject)
  47. src := fmt.Sprintf("oss://%v/%v", srcBucket, srcObject)
  48. cpFileName := getCpFileName(src, dest, versionId)
  49. cpConf.FilePath = cpConf.DirPath + string(os.PathSeparator) + cpFileName
  50. }
  51. return cpConf.FilePath
  52. }
  53. // ----- Concurrently copy without checkpoint ---------
  54. // copyWorkerArg defines the copy worker arguments
  55. type copyWorkerArg struct {
  56. bucket *Bucket
  57. imur InitiateMultipartUploadResult
  58. srcBucketName string
  59. srcObjectKey string
  60. options []Option
  61. hook copyPartHook
  62. }
  63. // copyPartHook is the hook for testing purpose
  64. type copyPartHook func(part copyPart) error
  65. var copyPartHooker copyPartHook = defaultCopyPartHook
  66. func defaultCopyPartHook(part copyPart) error {
  67. return nil
  68. }
  69. // copyWorker copies worker
  70. func copyWorker(id int, arg copyWorkerArg, jobs <-chan copyPart, results chan<- UploadPart, failed chan<- error, die <-chan bool) {
  71. for chunk := range jobs {
  72. if err := arg.hook(chunk); err != nil {
  73. failed <- err
  74. break
  75. }
  76. chunkSize := chunk.End - chunk.Start + 1
  77. part, err := arg.bucket.UploadPartCopy(arg.imur, arg.srcBucketName, arg.srcObjectKey,
  78. chunk.Start, chunkSize, chunk.Number, arg.options...)
  79. if err != nil {
  80. failed <- err
  81. break
  82. }
  83. select {
  84. case <-die:
  85. return
  86. default:
  87. }
  88. results <- part
  89. }
  90. }
  91. // copyScheduler
  92. func copyScheduler(jobs chan copyPart, parts []copyPart) {
  93. for _, part := range parts {
  94. jobs <- part
  95. }
  96. close(jobs)
  97. }
  98. // copyPart structure
  99. type copyPart struct {
  100. Number int // Part number (from 1 to 10,000)
  101. Start int64 // The start index in the source file.
  102. End int64 // The end index in the source file
  103. }
  104. // getCopyParts calculates copy parts
  105. func getCopyParts(objectSize, partSize int64) []copyPart {
  106. parts := []copyPart{}
  107. part := copyPart{}
  108. i := 0
  109. for offset := int64(0); offset < objectSize; offset += partSize {
  110. part.Number = i + 1
  111. part.Start = offset
  112. part.End = GetPartEnd(offset, objectSize, partSize)
  113. parts = append(parts, part)
  114. i++
  115. }
  116. return parts
  117. }
  118. // getSrcObjectBytes gets the source file size
  119. func getSrcObjectBytes(parts []copyPart) int64 {
  120. var ob int64
  121. for _, part := range parts {
  122. ob += (part.End - part.Start + 1)
  123. }
  124. return ob
  125. }
  126. // copyFile is a concurrently copy without checkpoint
  127. func (bucket Bucket) copyFile(srcBucketName, srcObjectKey, destBucketName, destObjectKey string,
  128. partSize int64, options []Option, routines int) error {
  129. descBucket, err := bucket.Client.Bucket(destBucketName)
  130. srcBucket, err := bucket.Client.Bucket(srcBucketName)
  131. listener := GetProgressListener(options)
  132. // choice valid options
  133. headerOptions := ChoiceHeadObjectOption(options)
  134. partOptions := ChoiceTransferPartOption(options)
  135. completeOptions := ChoiceCompletePartOption(options)
  136. abortOptions := ChoiceAbortPartOption(options)
  137. meta, err := srcBucket.GetObjectDetailedMeta(srcObjectKey, headerOptions...)
  138. if err != nil {
  139. return err
  140. }
  141. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  142. if err != nil {
  143. return err
  144. }
  145. // Get copy parts
  146. parts := getCopyParts(objectSize, partSize)
  147. // Initialize the multipart upload
  148. imur, err := descBucket.InitiateMultipartUpload(destObjectKey, options...)
  149. if err != nil {
  150. return err
  151. }
  152. jobs := make(chan copyPart, len(parts))
  153. results := make(chan UploadPart, len(parts))
  154. failed := make(chan error)
  155. die := make(chan bool)
  156. var completedBytes int64
  157. totalBytes := getSrcObjectBytes(parts)
  158. event := newProgressEvent(TransferStartedEvent, 0, totalBytes, 0)
  159. publishProgress(listener, event)
  160. // Start to copy workers
  161. arg := copyWorkerArg{descBucket, imur, srcBucketName, srcObjectKey, partOptions, copyPartHooker}
  162. for w := 1; w <= routines; w++ {
  163. go copyWorker(w, arg, jobs, results, failed, die)
  164. }
  165. // Start the scheduler
  166. go copyScheduler(jobs, parts)
  167. // Wait for the parts finished.
  168. completed := 0
  169. ups := make([]UploadPart, len(parts))
  170. for completed < len(parts) {
  171. select {
  172. case part := <-results:
  173. completed++
  174. ups[part.PartNumber-1] = part
  175. copyBytes := (parts[part.PartNumber-1].End - parts[part.PartNumber-1].Start + 1)
  176. completedBytes += copyBytes
  177. event = newProgressEvent(TransferDataEvent, completedBytes, totalBytes, copyBytes)
  178. publishProgress(listener, event)
  179. case err := <-failed:
  180. close(die)
  181. descBucket.AbortMultipartUpload(imur, abortOptions...)
  182. event = newProgressEvent(TransferFailedEvent, completedBytes, totalBytes, 0)
  183. publishProgress(listener, event)
  184. return err
  185. }
  186. if completed >= len(parts) {
  187. break
  188. }
  189. }
  190. event = newProgressEvent(TransferCompletedEvent, completedBytes, totalBytes, 0)
  191. publishProgress(listener, event)
  192. // Complete the multipart upload
  193. _, err = descBucket.CompleteMultipartUpload(imur, ups, completeOptions...)
  194. if err != nil {
  195. bucket.AbortMultipartUpload(imur, abortOptions...)
  196. return err
  197. }
  198. return nil
  199. }
  200. // ----- Concurrently copy with checkpoint -----
  201. const copyCpMagic = "84F1F18C-FF1D-403B-A1D8-9DEB5F65910A"
  202. type copyCheckpoint struct {
  203. Magic string // Magic
  204. MD5 string // CP content MD5
  205. SrcBucketName string // Source bucket
  206. SrcObjectKey string // Source object
  207. DestBucketName string // Target bucket
  208. DestObjectKey string // Target object
  209. CopyID string // Copy ID
  210. ObjStat objectStat // Object stat
  211. Parts []copyPart // Copy parts
  212. CopyParts []UploadPart // The uploaded parts
  213. PartStat []bool // The part status
  214. }
  215. // isValid checks if the data is valid which means CP is valid and object is not updated.
  216. func (cp copyCheckpoint) isValid(meta http.Header) (bool, error) {
  217. // Compare CP's magic number and the MD5.
  218. cpb := cp
  219. cpb.MD5 = ""
  220. js, _ := json.Marshal(cpb)
  221. sum := md5.Sum(js)
  222. b64 := base64.StdEncoding.EncodeToString(sum[:])
  223. if cp.Magic != downloadCpMagic || b64 != cp.MD5 {
  224. return false, nil
  225. }
  226. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 64)
  227. if err != nil {
  228. return false, err
  229. }
  230. // Compare the object size and last modified time and etag.
  231. if cp.ObjStat.Size != objectSize ||
  232. cp.ObjStat.LastModified != meta.Get(HTTPHeaderLastModified) ||
  233. cp.ObjStat.Etag != meta.Get(HTTPHeaderEtag) {
  234. return false, nil
  235. }
  236. return true, nil
  237. }
  238. // load loads from the checkpoint file
  239. func (cp *copyCheckpoint) load(filePath string) error {
  240. contents, err := ioutil.ReadFile(filePath)
  241. if err != nil {
  242. return err
  243. }
  244. err = json.Unmarshal(contents, cp)
  245. return err
  246. }
  247. // update updates the parts status
  248. func (cp *copyCheckpoint) update(part UploadPart) {
  249. cp.CopyParts[part.PartNumber-1] = part
  250. cp.PartStat[part.PartNumber-1] = true
  251. }
  252. // dump dumps the CP to the file
  253. func (cp *copyCheckpoint) dump(filePath string) error {
  254. bcp := *cp
  255. // Calculate MD5
  256. bcp.MD5 = ""
  257. js, err := json.Marshal(bcp)
  258. if err != nil {
  259. return err
  260. }
  261. sum := md5.Sum(js)
  262. b64 := base64.StdEncoding.EncodeToString(sum[:])
  263. bcp.MD5 = b64
  264. // Serialization
  265. js, err = json.Marshal(bcp)
  266. if err != nil {
  267. return err
  268. }
  269. // Dump
  270. return ioutil.WriteFile(filePath, js, FilePermMode)
  271. }
  272. // todoParts returns unfinished parts
  273. func (cp copyCheckpoint) todoParts() []copyPart {
  274. dps := []copyPart{}
  275. for i, ps := range cp.PartStat {
  276. if !ps {
  277. dps = append(dps, cp.Parts[i])
  278. }
  279. }
  280. return dps
  281. }
  282. // getCompletedBytes returns finished bytes count
  283. func (cp copyCheckpoint) getCompletedBytes() int64 {
  284. var completedBytes int64
  285. for i, part := range cp.Parts {
  286. if cp.PartStat[i] {
  287. completedBytes += (part.End - part.Start + 1)
  288. }
  289. }
  290. return completedBytes
  291. }
  292. // prepare initializes the multipart upload
  293. func (cp *copyCheckpoint) prepare(meta http.Header, srcBucket *Bucket, srcObjectKey string, destBucket *Bucket, destObjectKey string,
  294. partSize int64, options []Option) error {
  295. // CP
  296. cp.Magic = copyCpMagic
  297. cp.SrcBucketName = srcBucket.BucketName
  298. cp.SrcObjectKey = srcObjectKey
  299. cp.DestBucketName = destBucket.BucketName
  300. cp.DestObjectKey = destObjectKey
  301. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 64)
  302. if err != nil {
  303. return err
  304. }
  305. cp.ObjStat.Size = objectSize
  306. cp.ObjStat.LastModified = meta.Get(HTTPHeaderLastModified)
  307. cp.ObjStat.Etag = meta.Get(HTTPHeaderEtag)
  308. // Parts
  309. cp.Parts = getCopyParts(objectSize, partSize)
  310. cp.PartStat = make([]bool, len(cp.Parts))
  311. for i := range cp.PartStat {
  312. cp.PartStat[i] = false
  313. }
  314. cp.CopyParts = make([]UploadPart, len(cp.Parts))
  315. // Init copy
  316. imur, err := destBucket.InitiateMultipartUpload(destObjectKey, options...)
  317. if err != nil {
  318. return err
  319. }
  320. cp.CopyID = imur.UploadID
  321. return nil
  322. }
  323. func (cp *copyCheckpoint) complete(bucket *Bucket, parts []UploadPart, cpFilePath string, options []Option) error {
  324. imur := InitiateMultipartUploadResult{Bucket: cp.DestBucketName,
  325. Key: cp.DestObjectKey, UploadID: cp.CopyID}
  326. _, err := bucket.CompleteMultipartUpload(imur, parts, options...)
  327. if err != nil {
  328. return err
  329. }
  330. os.Remove(cpFilePath)
  331. return err
  332. }
  333. // copyFileWithCp is concurrently copy with checkpoint
  334. func (bucket Bucket) copyFileWithCp(srcBucketName, srcObjectKey, destBucketName, destObjectKey string,
  335. partSize int64, options []Option, cpFilePath string, routines int) error {
  336. descBucket, err := bucket.Client.Bucket(destBucketName)
  337. srcBucket, err := bucket.Client.Bucket(srcBucketName)
  338. listener := GetProgressListener(options)
  339. // Load CP data
  340. ccp := copyCheckpoint{}
  341. err = ccp.load(cpFilePath)
  342. if err != nil {
  343. os.Remove(cpFilePath)
  344. }
  345. // choice valid options
  346. headerOptions := ChoiceHeadObjectOption(options)
  347. partOptions := ChoiceTransferPartOption(options)
  348. completeOptions := ChoiceCompletePartOption(options)
  349. meta, err := srcBucket.GetObjectDetailedMeta(srcObjectKey, headerOptions...)
  350. if err != nil {
  351. return err
  352. }
  353. // Load error or the CP data is invalid---reinitialize
  354. valid, err := ccp.isValid(meta)
  355. if err != nil || !valid {
  356. if err = ccp.prepare(meta, srcBucket, srcObjectKey, descBucket, destObjectKey, partSize, options); err != nil {
  357. return err
  358. }
  359. os.Remove(cpFilePath)
  360. }
  361. // Unfinished parts
  362. parts := ccp.todoParts()
  363. imur := InitiateMultipartUploadResult{
  364. Bucket: destBucketName,
  365. Key: destObjectKey,
  366. UploadID: ccp.CopyID}
  367. jobs := make(chan copyPart, len(parts))
  368. results := make(chan UploadPart, len(parts))
  369. failed := make(chan error)
  370. die := make(chan bool)
  371. completedBytes := ccp.getCompletedBytes()
  372. event := newProgressEvent(TransferStartedEvent, completedBytes, ccp.ObjStat.Size, 0)
  373. publishProgress(listener, event)
  374. // Start the worker coroutines
  375. arg := copyWorkerArg{descBucket, imur, srcBucketName, srcObjectKey, partOptions, copyPartHooker}
  376. for w := 1; w <= routines; w++ {
  377. go copyWorker(w, arg, jobs, results, failed, die)
  378. }
  379. // Start the scheduler
  380. go copyScheduler(jobs, parts)
  381. // Wait for the parts completed.
  382. completed := 0
  383. for completed < len(parts) {
  384. select {
  385. case part := <-results:
  386. completed++
  387. ccp.update(part)
  388. ccp.dump(cpFilePath)
  389. copyBytes := (parts[part.PartNumber-1].End - parts[part.PartNumber-1].Start + 1)
  390. completedBytes += copyBytes
  391. event = newProgressEvent(TransferDataEvent, completedBytes, ccp.ObjStat.Size, copyBytes)
  392. publishProgress(listener, event)
  393. case err := <-failed:
  394. close(die)
  395. event = newProgressEvent(TransferFailedEvent, completedBytes, ccp.ObjStat.Size, 0)
  396. publishProgress(listener, event)
  397. return err
  398. }
  399. if completed >= len(parts) {
  400. break
  401. }
  402. }
  403. event = newProgressEvent(TransferCompletedEvent, completedBytes, ccp.ObjStat.Size, 0)
  404. publishProgress(listener, event)
  405. return ccp.complete(descBucket, ccp.CopyParts, cpFilePath, completeOptions)
  406. }