multicopy.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460
  1. package oss
  2. import (
  3. "crypto/md5"
  4. "encoding/base64"
  5. "encoding/json"
  6. "errors"
  7. "io/ioutil"
  8. "os"
  9. "path/filepath"
  10. "strconv"
  11. )
  12. // CopyFile is multipart copy object
  13. //
  14. // srcBucketName source bucket name
  15. // srcObjectKey source object name
  16. // destObjectKey target object name in the form of bucketname.objectkey
  17. // partSize the part size in byte.
  18. // options object's contraints. Check out function InitiateMultipartUpload.
  19. //
  20. // error it's nil if the operation succeeds, otherwise it's an error object.
  21. //
  22. func (bucket Bucket) CopyFile(srcBucketName, srcObjectKey, destObjectKey string, partSize int64, options ...Option) error {
  23. destBucketName := bucket.BucketName
  24. if partSize < MinPartSize || partSize > MaxPartSize {
  25. return errors.New("oss: part size invalid range (1024KB, 5GB]")
  26. }
  27. cpConf, err := getCpConfig(options, filepath.Base(destObjectKey))
  28. if err != nil {
  29. return err
  30. }
  31. routines := getRoutines(options)
  32. if cpConf.IsEnable {
  33. return bucket.copyFileWithCp(srcBucketName, srcObjectKey, destBucketName, destObjectKey,
  34. partSize, options, cpConf.FilePath, routines)
  35. }
  36. return bucket.copyFile(srcBucketName, srcObjectKey, destBucketName, destObjectKey,
  37. partSize, options, routines)
  38. }
  39. // ----- Concurrently copy without checkpoint ---------
  40. // copyWorkerArg defines the copy worker arguments
  41. type copyWorkerArg struct {
  42. bucket *Bucket
  43. imur InitiateMultipartUploadResult
  44. srcBucketName string
  45. srcObjectKey string
  46. options []Option
  47. hook copyPartHook
  48. }
  49. // copyPartHook is the hook for testing purpose
  50. type copyPartHook func(part copyPart) error
  51. var copyPartHooker copyPartHook = defaultCopyPartHook
  52. func defaultCopyPartHook(part copyPart) error {
  53. return nil
  54. }
  55. // copyWorker copies worker
  56. func copyWorker(id int, arg copyWorkerArg, jobs <-chan copyPart, results chan<- UploadPart, failed chan<- error, die <-chan bool) {
  57. for chunk := range jobs {
  58. if err := arg.hook(chunk); err != nil {
  59. failed <- err
  60. break
  61. }
  62. chunkSize := chunk.End - chunk.Start + 1
  63. part, err := arg.bucket.UploadPartCopy(arg.imur, arg.srcBucketName, arg.srcObjectKey,
  64. chunk.Start, chunkSize, chunk.Number, arg.options...)
  65. if err != nil {
  66. failed <- err
  67. break
  68. }
  69. select {
  70. case <-die:
  71. return
  72. default:
  73. }
  74. results <- part
  75. }
  76. }
  77. // copyScheduler
  78. func copyScheduler(jobs chan copyPart, parts []copyPart) {
  79. for _, part := range parts {
  80. jobs <- part
  81. }
  82. close(jobs)
  83. }
  84. // copyPart structure
  85. type copyPart struct {
  86. Number int // Part number (from 1 to 10,000)
  87. Start int64 // The start index in the source file.
  88. End int64 // The end index in the source file
  89. }
  90. // getCopyParts calculates copy parts
  91. func getCopyParts(bucket *Bucket, objectKey string, partSize int64) ([]copyPart, error) {
  92. meta, err := bucket.GetObjectDetailedMeta(objectKey)
  93. if err != nil {
  94. return nil, err
  95. }
  96. parts := []copyPart{}
  97. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  98. if err != nil {
  99. return nil, err
  100. }
  101. part := copyPart{}
  102. i := 0
  103. for offset := int64(0); offset < objectSize; offset += partSize {
  104. part.Number = i + 1
  105. part.Start = offset
  106. part.End = GetPartEnd(offset, objectSize, partSize)
  107. parts = append(parts, part)
  108. i++
  109. }
  110. return parts, nil
  111. }
  112. // getSrcObjectBytes gets the source file size
  113. func getSrcObjectBytes(parts []copyPart) int64 {
  114. var ob int64
  115. for _, part := range parts {
  116. ob += (part.End - part.Start + 1)
  117. }
  118. return ob
  119. }
  120. // copyFile is a concurrently copy without checkpoint
  121. func (bucket Bucket) copyFile(srcBucketName, srcObjectKey, destBucketName, destObjectKey string,
  122. partSize int64, options []Option, routines int) error {
  123. descBucket, err := bucket.Client.Bucket(destBucketName)
  124. srcBucket, err := bucket.Client.Bucket(srcBucketName)
  125. listener := getProgressListener(options)
  126. // Get copy parts
  127. parts, err := getCopyParts(srcBucket, srcObjectKey, partSize)
  128. if err != nil {
  129. return err
  130. }
  131. // Initialize the multipart upload
  132. imur, err := descBucket.InitiateMultipartUpload(destObjectKey, options...)
  133. if err != nil {
  134. return err
  135. }
  136. jobs := make(chan copyPart, len(parts))
  137. results := make(chan UploadPart, len(parts))
  138. failed := make(chan error)
  139. die := make(chan bool)
  140. var completedBytes int64
  141. totalBytes := getSrcObjectBytes(parts)
  142. event := newProgressEvent(TransferStartedEvent, 0, totalBytes)
  143. publishProgress(listener, event)
  144. // Start to copy workers
  145. arg := copyWorkerArg{descBucket, imur, srcBucketName, srcObjectKey, options, copyPartHooker}
  146. for w := 1; w <= routines; w++ {
  147. go copyWorker(w, arg, jobs, results, failed, die)
  148. }
  149. // Start the scheduler
  150. go copyScheduler(jobs, parts)
  151. // Wait for the parts finished.
  152. completed := 0
  153. ups := make([]UploadPart, len(parts))
  154. for completed < len(parts) {
  155. select {
  156. case part := <-results:
  157. completed++
  158. ups[part.PartNumber-1] = part
  159. completedBytes += (parts[part.PartNumber-1].End - parts[part.PartNumber-1].Start + 1)
  160. event = newProgressEvent(TransferDataEvent, completedBytes, totalBytes)
  161. publishProgress(listener, event)
  162. case err := <-failed:
  163. close(die)
  164. descBucket.AbortMultipartUpload(imur)
  165. event = newProgressEvent(TransferFailedEvent, completedBytes, totalBytes)
  166. publishProgress(listener, event)
  167. return err
  168. }
  169. if completed >= len(parts) {
  170. break
  171. }
  172. }
  173. event = newProgressEvent(TransferCompletedEvent, completedBytes, totalBytes)
  174. publishProgress(listener, event)
  175. // Complete the multipart upload
  176. _, err = descBucket.CompleteMultipartUpload(imur, ups)
  177. if err != nil {
  178. bucket.AbortMultipartUpload(imur)
  179. return err
  180. }
  181. return nil
  182. }
  183. // ----- Concurrently copy with checkpoint -----
  184. const copyCpMagic = "84F1F18C-FF1D-403B-A1D8-9DEB5F65910A"
  185. type copyCheckpoint struct {
  186. Magic string // Magic
  187. MD5 string // CP content MD5
  188. SrcBucketName string // Source bucket
  189. SrcObjectKey string // Source object
  190. DestBucketName string // Target bucket
  191. DestObjectKey string // Target object
  192. CopyID string // Copy ID
  193. ObjStat objectStat // Object stat
  194. Parts []copyPart // Copy parts
  195. CopyParts []UploadPart // The uploaded parts
  196. PartStat []bool // The part status
  197. }
  198. // isValid checks if the data is valid which means CP is valid and object is not updated.
  199. func (cp copyCheckpoint) isValid(bucket *Bucket, objectKey string) (bool, error) {
  200. // Compare CP's magic number and the MD5.
  201. cpb := cp
  202. cpb.MD5 = ""
  203. js, _ := json.Marshal(cpb)
  204. sum := md5.Sum(js)
  205. b64 := base64.StdEncoding.EncodeToString(sum[:])
  206. if cp.Magic != downloadCpMagic || b64 != cp.MD5 {
  207. return false, nil
  208. }
  209. // Make sure the object is not updated.
  210. meta, err := bucket.GetObjectDetailedMeta(objectKey)
  211. if err != nil {
  212. return false, err
  213. }
  214. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  215. if err != nil {
  216. return false, err
  217. }
  218. // Compare the object size and last modified time and etag.
  219. if cp.ObjStat.Size != objectSize ||
  220. cp.ObjStat.LastModified != meta.Get(HTTPHeaderLastModified) ||
  221. cp.ObjStat.Etag != meta.Get(HTTPHeaderEtag) {
  222. return false, nil
  223. }
  224. return true, nil
  225. }
  226. // load loads from the checkpoint file
  227. func (cp *copyCheckpoint) load(filePath string) error {
  228. contents, err := ioutil.ReadFile(filePath)
  229. if err != nil {
  230. return err
  231. }
  232. err = json.Unmarshal(contents, cp)
  233. return err
  234. }
  235. // update updates the parts status
  236. func (cp *copyCheckpoint) update(part UploadPart) {
  237. cp.CopyParts[part.PartNumber-1] = part
  238. cp.PartStat[part.PartNumber-1] = true
  239. }
  240. // dump dumps the CP to the file
  241. func (cp *copyCheckpoint) dump(filePath string) error {
  242. bcp := *cp
  243. // Calculate MD5
  244. bcp.MD5 = ""
  245. js, err := json.Marshal(bcp)
  246. if err != nil {
  247. return err
  248. }
  249. sum := md5.Sum(js)
  250. b64 := base64.StdEncoding.EncodeToString(sum[:])
  251. bcp.MD5 = b64
  252. // Serialization
  253. js, err = json.Marshal(bcp)
  254. if err != nil {
  255. return err
  256. }
  257. // Dump
  258. return ioutil.WriteFile(filePath, js, FilePermMode)
  259. }
  260. // todoParts returns unfinished parts
  261. func (cp copyCheckpoint) todoParts() []copyPart {
  262. dps := []copyPart{}
  263. for i, ps := range cp.PartStat {
  264. if !ps {
  265. dps = append(dps, cp.Parts[i])
  266. }
  267. }
  268. return dps
  269. }
  270. // getCompletedBytes returns finished bytes count
  271. func (cp copyCheckpoint) getCompletedBytes() int64 {
  272. var completedBytes int64
  273. for i, part := range cp.Parts {
  274. if cp.PartStat[i] {
  275. completedBytes += (part.End - part.Start + 1)
  276. }
  277. }
  278. return completedBytes
  279. }
  280. // prepare initializes the multipart upload
  281. func (cp *copyCheckpoint) prepare(srcBucket *Bucket, srcObjectKey string, destBucket *Bucket, destObjectKey string,
  282. partSize int64, options []Option) error {
  283. // CP
  284. cp.Magic = copyCpMagic
  285. cp.SrcBucketName = srcBucket.BucketName
  286. cp.SrcObjectKey = srcObjectKey
  287. cp.DestBucketName = destBucket.BucketName
  288. cp.DestObjectKey = destObjectKey
  289. // Object
  290. meta, err := srcBucket.GetObjectDetailedMeta(srcObjectKey)
  291. if err != nil {
  292. return err
  293. }
  294. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  295. if err != nil {
  296. return err
  297. }
  298. cp.ObjStat.Size = objectSize
  299. cp.ObjStat.LastModified = meta.Get(HTTPHeaderLastModified)
  300. cp.ObjStat.Etag = meta.Get(HTTPHeaderEtag)
  301. // Parts
  302. cp.Parts, err = getCopyParts(srcBucket, srcObjectKey, partSize)
  303. if err != nil {
  304. return err
  305. }
  306. cp.PartStat = make([]bool, len(cp.Parts))
  307. for i := range cp.PartStat {
  308. cp.PartStat[i] = false
  309. }
  310. cp.CopyParts = make([]UploadPart, len(cp.Parts))
  311. // Init copy
  312. imur, err := destBucket.InitiateMultipartUpload(destObjectKey, options...)
  313. if err != nil {
  314. return err
  315. }
  316. cp.CopyID = imur.UploadID
  317. return nil
  318. }
  319. func (cp *copyCheckpoint) complete(bucket *Bucket, parts []UploadPart, cpFilePath string) error {
  320. imur := InitiateMultipartUploadResult{Bucket: cp.DestBucketName,
  321. Key: cp.DestObjectKey, UploadID: cp.CopyID}
  322. _, err := bucket.CompleteMultipartUpload(imur, parts)
  323. if err != nil {
  324. return err
  325. }
  326. os.Remove(cpFilePath)
  327. return err
  328. }
  329. // copyFileWithCp is concurrently copy with checkpoint
  330. func (bucket Bucket) copyFileWithCp(srcBucketName, srcObjectKey, destBucketName, destObjectKey string,
  331. partSize int64, options []Option, cpFilePath string, routines int) error {
  332. descBucket, err := bucket.Client.Bucket(destBucketName)
  333. srcBucket, err := bucket.Client.Bucket(srcBucketName)
  334. listener := getProgressListener(options)
  335. // Load CP data
  336. ccp := copyCheckpoint{}
  337. err = ccp.load(cpFilePath)
  338. if err != nil {
  339. os.Remove(cpFilePath)
  340. }
  341. // Load error or the CP data is invalid---reinitialize
  342. valid, err := ccp.isValid(srcBucket, srcObjectKey)
  343. if err != nil || !valid {
  344. if err = ccp.prepare(srcBucket, srcObjectKey, descBucket, destObjectKey, partSize, options); err != nil {
  345. return err
  346. }
  347. os.Remove(cpFilePath)
  348. }
  349. // Unfinished parts
  350. parts := ccp.todoParts()
  351. imur := InitiateMultipartUploadResult{
  352. Bucket: destBucketName,
  353. Key: destObjectKey,
  354. UploadID: ccp.CopyID}
  355. jobs := make(chan copyPart, len(parts))
  356. results := make(chan UploadPart, len(parts))
  357. failed := make(chan error)
  358. die := make(chan bool)
  359. completedBytes := ccp.getCompletedBytes()
  360. event := newProgressEvent(TransferStartedEvent, completedBytes, ccp.ObjStat.Size)
  361. publishProgress(listener, event)
  362. // Start the worker coroutines
  363. arg := copyWorkerArg{descBucket, imur, srcBucketName, srcObjectKey, options, copyPartHooker}
  364. for w := 1; w <= routines; w++ {
  365. go copyWorker(w, arg, jobs, results, failed, die)
  366. }
  367. // Start the scheduler
  368. go copyScheduler(jobs, parts)
  369. // Wait for the parts completed.
  370. completed := 0
  371. for completed < len(parts) {
  372. select {
  373. case part := <-results:
  374. completed++
  375. ccp.update(part)
  376. ccp.dump(cpFilePath)
  377. completedBytes += (parts[part.PartNumber-1].End - parts[part.PartNumber-1].Start + 1)
  378. event = newProgressEvent(TransferDataEvent, completedBytes, ccp.ObjStat.Size)
  379. publishProgress(listener, event)
  380. case err := <-failed:
  381. close(die)
  382. event = newProgressEvent(TransferFailedEvent, completedBytes, ccp.ObjStat.Size)
  383. publishProgress(listener, event)
  384. return err
  385. }
  386. if completed >= len(parts) {
  387. break
  388. }
  389. }
  390. event = newProgressEvent(TransferCompletedEvent, completedBytes, ccp.ObjStat.Size)
  391. publishProgress(listener, event)
  392. return ccp.complete(descBucket, ccp.CopyParts, cpFilePath)
  393. }