multicopy.go 12 KB


  1. package oss
  2. import (
  3. "crypto/md5"
  4. "encoding/base64"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "io/ioutil"
  9. "os"
  10. "strconv"
  11. )
  12. // CopyFile is multipart copy object
  13. //
  14. // srcBucketName source bucket name
  15. // srcObjectKey source object name
  16. // destObjectKey target object name in the form of bucketname.objectkey
  17. // partSize the part size in byte.
  18. // options object's contraints. Check out function InitiateMultipartUpload.
  19. //
  20. // error it's nil if the operation succeeds, otherwise it's an error object.
  21. //
  22. func (bucket Bucket) CopyFile(srcBucketName, srcObjectKey, destObjectKey string, partSize int64, options ...Option) error {
  23. destBucketName := bucket.BucketName
  24. if partSize < MinPartSize || partSize > MaxPartSize {
  25. return errors.New("oss: part size invalid range (1024KB, 5GB]")
  26. }
  27. cpConf := getCpConfig(options)
  28. routines := getRoutines(options)
  29. if cpConf != nil && cpConf.IsEnable && cpConf.cpDir != "" {
  30. src := fmt.Sprintf("oss://%v/%v", srcBucketName, srcObjectKey)
  31. dest := fmt.Sprintf("oss://%v/%v", bucket.BucketName, destObjectKey)
  32. cpFileName := getCpFileName(src, dest)
  33. cpFilePath := cpConf.cpDir + string(os.PathSeparator) + cpFileName
  34. return bucket.copyFileWithCp(srcBucketName, srcObjectKey, destBucketName, destObjectKey,
  35. partSize, options, cpFilePath, routines)
  36. }
  37. return bucket.copyFile(srcBucketName, srcObjectKey, destBucketName, destObjectKey,
  38. partSize, options, routines)
  39. }
  40. // ----- Concurrently copy without checkpoint ---------
  41. // copyWorkerArg defines the copy worker arguments
  42. type copyWorkerArg struct {
  43. bucket *Bucket
  44. imur InitiateMultipartUploadResult
  45. srcBucketName string
  46. srcObjectKey string
  47. options []Option
  48. hook copyPartHook
  49. }
  50. // copyPartHook is the hook for testing purpose
  51. type copyPartHook func(part copyPart) error
  52. var copyPartHooker copyPartHook = defaultCopyPartHook
  53. func defaultCopyPartHook(part copyPart) error {
  54. return nil
  55. }
  56. // copyWorker copies worker
  57. func copyWorker(id int, arg copyWorkerArg, jobs <-chan copyPart, results chan<- UploadPart, failed chan<- error, die <-chan bool) {
  58. for chunk := range jobs {
  59. if err := arg.hook(chunk); err != nil {
  60. failed <- err
  61. break
  62. }
  63. chunkSize := chunk.End - chunk.Start + 1
  64. part, err := arg.bucket.UploadPartCopy(arg.imur, arg.srcBucketName, arg.srcObjectKey,
  65. chunk.Start, chunkSize, chunk.Number, arg.options...)
  66. if err != nil {
  67. failed <- err
  68. break
  69. }
  70. select {
  71. case <-die:
  72. return
  73. default:
  74. }
  75. results <- part
  76. }
  77. }
  78. // copyScheduler
  79. func copyScheduler(jobs chan copyPart, parts []copyPart) {
  80. for _, part := range parts {
  81. jobs <- part
  82. }
  83. close(jobs)
  84. }
  85. // copyPart structure
  86. type copyPart struct {
  87. Number int // Part number (from 1 to 10,000)
  88. Start int64 // The start index in the source file.
  89. End int64 // The end index in the source file
  90. }
  91. // getCopyParts calculates copy parts
  92. func getCopyParts(objectSize, partSize int64) []copyPart {
  93. parts := []copyPart{}
  94. part := copyPart{}
  95. i := 0
  96. for offset := int64(0); offset < objectSize; offset += partSize {
  97. part.Number = i + 1
  98. part.Start = offset
  99. part.End = GetPartEnd(offset, objectSize, partSize)
  100. parts = append(parts, part)
  101. i++
  102. }
  103. return parts
  104. }
  105. // getSrcObjectBytes gets the source file size
  106. func getSrcObjectBytes(parts []copyPart) int64 {
  107. var ob int64
  108. for _, part := range parts {
  109. ob += (part.End - part.Start + 1)
  110. }
  111. return ob
  112. }
  113. // copyFile is a concurrently copy without checkpoint
  114. func (bucket Bucket) copyFile(srcBucketName, srcObjectKey, destBucketName, destObjectKey string,
  115. partSize int64, options []Option, routines int) error {
  116. descBucket, err := bucket.Client.Bucket(destBucketName)
  117. srcBucket, err := bucket.Client.Bucket(srcBucketName)
  118. listener := getProgressListener(options)
  119. meta, err := srcBucket.GetObjectDetailedMeta(srcObjectKey, options...)
  120. if err != nil {
  121. return err
  122. }
  123. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  124. if err != nil {
  125. return err
  126. }
  127. // Get copy parts
  128. parts := getCopyParts(objectSize, partSize)
  129. // Initialize the multipart upload
  130. imur, err := descBucket.InitiateMultipartUpload(destObjectKey, options...)
  131. if err != nil {
  132. return err
  133. }
  134. jobs := make(chan copyPart, len(parts))
  135. results := make(chan UploadPart, len(parts))
  136. failed := make(chan error)
  137. die := make(chan bool)
  138. var completedBytes int64
  139. totalBytes := getSrcObjectBytes(parts)
  140. event := newProgressEvent(TransferStartedEvent, 0, totalBytes)
  141. publishProgress(listener, event)
  142. // Start to copy workers
  143. arg := copyWorkerArg{descBucket, imur, srcBucketName, srcObjectKey, options, copyPartHooker}
  144. for w := 1; w <= routines; w++ {
  145. go copyWorker(w, arg, jobs, results, failed, die)
  146. }
  147. // Start the scheduler
  148. go copyScheduler(jobs, parts)
  149. // Wait for the parts finished.
  150. completed := 0
  151. ups := make([]UploadPart, len(parts))
  152. for completed < len(parts) {
  153. select {
  154. case part := <-results:
  155. completed++
  156. ups[part.PartNumber-1] = part
  157. completedBytes += (parts[part.PartNumber-1].End - parts[part.PartNumber-1].Start + 1)
  158. event = newProgressEvent(TransferDataEvent, completedBytes, totalBytes)
  159. publishProgress(listener, event)
  160. case err := <-failed:
  161. close(die)
  162. descBucket.AbortMultipartUpload(imur, options...)
  163. event = newProgressEvent(TransferFailedEvent, completedBytes, totalBytes)
  164. publishProgress(listener, event)
  165. return err
  166. }
  167. if completed >= len(parts) {
  168. break
  169. }
  170. }
  171. event = newProgressEvent(TransferCompletedEvent, completedBytes, totalBytes)
  172. publishProgress(listener, event)
  173. // Complete the multipart upload
  174. _, err = descBucket.CompleteMultipartUpload(imur, ups, options...)
  175. if err != nil {
  176. bucket.AbortMultipartUpload(imur, options...)
  177. return err
  178. }
  179. return nil
  180. }
  181. // ----- Concurrently copy with checkpoint -----
  182. const copyCpMagic = "84F1F18C-FF1D-403B-A1D8-9DEB5F65910A"
  183. type copyCheckpoint struct {
  184. Magic string // Magic
  185. MD5 string // CP content MD5
  186. SrcBucketName string // Source bucket
  187. SrcObjectKey string // Source object
  188. DestBucketName string // Target bucket
  189. DestObjectKey string // Target object
  190. CopyID string // Copy ID
  191. ObjStat objectStat // Object stat
  192. Parts []copyPart // Copy parts
  193. CopyParts []UploadPart // The uploaded parts
  194. PartStat []bool // The part status
  195. }
  196. // isValid checks if the data is valid which means CP is valid and object is not updated.
  197. func (cp copyCheckpoint) isValid(bucket *Bucket, objectKey string) (bool, error) {
  198. // Compare CP's magic number and the MD5.
  199. cpb := cp
  200. cpb.MD5 = ""
  201. js, _ := json.Marshal(cpb)
  202. sum := md5.Sum(js)
  203. b64 := base64.StdEncoding.EncodeToString(sum[:])
  204. if cp.Magic != downloadCpMagic || b64 != cp.MD5 {
  205. return false, nil
  206. }
  207. // Make sure the object is not updated.
  208. meta, err := bucket.GetObjectDetailedMeta(objectKey)
  209. if err != nil {
  210. return false, err
  211. }
  212. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  213. if err != nil {
  214. return false, err
  215. }
  216. // Compare the object size and last modified time and etag.
  217. if cp.ObjStat.Size != objectSize ||
  218. cp.ObjStat.LastModified != meta.Get(HTTPHeaderLastModified) ||
  219. cp.ObjStat.Etag != meta.Get(HTTPHeaderEtag) {
  220. return false, nil
  221. }
  222. return true, nil
  223. }
  224. // load loads from the checkpoint file
  225. func (cp *copyCheckpoint) load(filePath string) error {
  226. contents, err := ioutil.ReadFile(filePath)
  227. if err != nil {
  228. return err
  229. }
  230. err = json.Unmarshal(contents, cp)
  231. return err
  232. }
  233. // update updates the parts status
  234. func (cp *copyCheckpoint) update(part UploadPart) {
  235. cp.CopyParts[part.PartNumber-1] = part
  236. cp.PartStat[part.PartNumber-1] = true
  237. }
  238. // dump dumps the CP to the file
  239. func (cp *copyCheckpoint) dump(filePath string) error {
  240. bcp := *cp
  241. // Calculate MD5
  242. bcp.MD5 = ""
  243. js, err := json.Marshal(bcp)
  244. if err != nil {
  245. return err
  246. }
  247. sum := md5.Sum(js)
  248. b64 := base64.StdEncoding.EncodeToString(sum[:])
  249. bcp.MD5 = b64
  250. // Serialization
  251. js, err = json.Marshal(bcp)
  252. if err != nil {
  253. return err
  254. }
  255. // Dump
  256. return ioutil.WriteFile(filePath, js, FilePermMode)
  257. }
  258. // todoParts returns unfinished parts
  259. func (cp copyCheckpoint) todoParts() []copyPart {
  260. dps := []copyPart{}
  261. for i, ps := range cp.PartStat {
  262. if !ps {
  263. dps = append(dps, cp.Parts[i])
  264. }
  265. }
  266. return dps
  267. }
  268. // getCompletedBytes returns finished bytes count
  269. func (cp copyCheckpoint) getCompletedBytes() int64 {
  270. var completedBytes int64
  271. for i, part := range cp.Parts {
  272. if cp.PartStat[i] {
  273. completedBytes += (part.End - part.Start + 1)
  274. }
  275. }
  276. return completedBytes
  277. }
  278. // prepare initializes the multipart upload
  279. func (cp *copyCheckpoint) prepare(srcBucket *Bucket, srcObjectKey string, destBucket *Bucket, destObjectKey string,
  280. partSize int64, options []Option) error {
  281. // CP
  282. cp.Magic = copyCpMagic
  283. cp.SrcBucketName = srcBucket.BucketName
  284. cp.SrcObjectKey = srcObjectKey
  285. cp.DestBucketName = destBucket.BucketName
  286. cp.DestObjectKey = destObjectKey
  287. // Object
  288. meta, err := srcBucket.GetObjectDetailedMeta(srcObjectKey, options...)
  289. if err != nil {
  290. return err
  291. }
  292. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  293. if err != nil {
  294. return err
  295. }
  296. cp.ObjStat.Size = objectSize
  297. cp.ObjStat.LastModified = meta.Get(HTTPHeaderLastModified)
  298. cp.ObjStat.Etag = meta.Get(HTTPHeaderEtag)
  299. // Parts
  300. cp.Parts = getCopyParts(objectSize, partSize)
  301. cp.PartStat = make([]bool, len(cp.Parts))
  302. for i := range cp.PartStat {
  303. cp.PartStat[i] = false
  304. }
  305. cp.CopyParts = make([]UploadPart, len(cp.Parts))
  306. // Init copy
  307. imur, err := destBucket.InitiateMultipartUpload(destObjectKey, options...)
  308. if err != nil {
  309. return err
  310. }
  311. cp.CopyID = imur.UploadID
  312. return nil
  313. }
  314. func (cp *copyCheckpoint) complete(bucket *Bucket, parts []UploadPart, cpFilePath string, options []Option) error {
  315. imur := InitiateMultipartUploadResult{Bucket: cp.DestBucketName,
  316. Key: cp.DestObjectKey, UploadID: cp.CopyID}
  317. _, err := bucket.CompleteMultipartUpload(imur, parts, options...)
  318. if err != nil {
  319. return err
  320. }
  321. os.Remove(cpFilePath)
  322. return err
  323. }
  324. // copyFileWithCp is concurrently copy with checkpoint
  325. func (bucket Bucket) copyFileWithCp(srcBucketName, srcObjectKey, destBucketName, destObjectKey string,
  326. partSize int64, options []Option, cpFilePath string, routines int) error {
  327. descBucket, err := bucket.Client.Bucket(destBucketName)
  328. srcBucket, err := bucket.Client.Bucket(srcBucketName)
  329. listener := getProgressListener(options)
  330. // Load CP data
  331. ccp := copyCheckpoint{}
  332. err = ccp.load(cpFilePath)
  333. if err != nil {
  334. os.Remove(cpFilePath)
  335. }
  336. // Load error or the CP data is invalid---reinitialize
  337. valid, err := ccp.isValid(srcBucket, srcObjectKey)
  338. if err != nil || !valid {
  339. if err = ccp.prepare(srcBucket, srcObjectKey, descBucket, destObjectKey, partSize, options); err != nil {
  340. return err
  341. }
  342. os.Remove(cpFilePath)
  343. }
  344. // Unfinished parts
  345. parts := ccp.todoParts()
  346. imur := InitiateMultipartUploadResult{
  347. Bucket: destBucketName,
  348. Key: destObjectKey,
  349. UploadID: ccp.CopyID}
  350. jobs := make(chan copyPart, len(parts))
  351. results := make(chan UploadPart, len(parts))
  352. failed := make(chan error)
  353. die := make(chan bool)
  354. completedBytes := ccp.getCompletedBytes()
  355. event := newProgressEvent(TransferStartedEvent, completedBytes, ccp.ObjStat.Size)
  356. publishProgress(listener, event)
  357. // Start the worker coroutines
  358. arg := copyWorkerArg{descBucket, imur, srcBucketName, srcObjectKey, options, copyPartHooker}
  359. for w := 1; w <= routines; w++ {
  360. go copyWorker(w, arg, jobs, results, failed, die)
  361. }
  362. // Start the scheduler
  363. go copyScheduler(jobs, parts)
  364. // Wait for the parts completed.
  365. completed := 0
  366. for completed < len(parts) {
  367. select {
  368. case part := <-results:
  369. completed++
  370. ccp.update(part)
  371. ccp.dump(cpFilePath)
  372. completedBytes += (parts[part.PartNumber-1].End - parts[part.PartNumber-1].Start + 1)
  373. event = newProgressEvent(TransferDataEvent, completedBytes, ccp.ObjStat.Size)
  374. publishProgress(listener, event)
  375. case err := <-failed:
  376. close(die)
  377. event = newProgressEvent(TransferFailedEvent, completedBytes, ccp.ObjStat.Size)
  378. publishProgress(listener, event)
  379. return err
  380. }
  381. if completed >= len(parts) {
  382. break
  383. }
  384. }
  385. event = newProgressEvent(TransferCompletedEvent, completedBytes, ccp.ObjStat.Size)
  386. publishProgress(listener, event)
  387. return ccp.complete(descBucket, ccp.CopyParts, cpFilePath, options)
  388. }