multicopy.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460
  1. package oss
  2. import (
  3. "crypto/md5"
  4. "encoding/base64"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "io/ioutil"
  9. "os"
  10. "strconv"
  11. )
  12. // CopyFile is multipart copy object
  13. //
  14. // srcBucketName source bucket name
  15. // srcObjectKey source object name
  16. // destObjectKey target object name in the form of bucketname.objectkey
  17. // partSize the part size in byte.
  18. // options object's contraints. Check out function InitiateMultipartUpload.
  19. //
  20. // error it's nil if the operation succeeds, otherwise it's an error object.
  21. //
  22. func (bucket Bucket) CopyFile(srcBucketName, srcObjectKey, destObjectKey string, partSize int64, options ...Option) error {
  23. destBucketName := bucket.BucketName
  24. if partSize < MinPartSize || partSize > MaxPartSize {
  25. return errors.New("oss: part size invalid range (1024KB, 5GB]")
  26. }
  27. cpConf := getCpConfig(options)
  28. routines := getRoutines(options)
  29. if cpConf != nil && cpConf.IsEnable && cpConf.cpDir != "" {
  30. src := fmt.Sprintf("oss://%v/%v", srcBucketName, srcObjectKey)
  31. dest := fmt.Sprintf("oss://%v/%v", bucket.BucketName, destObjectKey)
  32. cpFileName := getCpFileName(src, dest)
  33. cpFilePath := cpConf.cpDir + string(os.PathSeparator) + cpFileName
  34. return bucket.copyFileWithCp(srcBucketName, srcObjectKey, destBucketName, destObjectKey,
  35. partSize, options, cpFilePath, routines)
  36. }
  37. return bucket.copyFile(srcBucketName, srcObjectKey, destBucketName, destObjectKey,
  38. partSize, options, routines)
  39. }
  40. // ----- Concurrently copy without checkpoint ---------
  41. // copyWorkerArg defines the copy worker arguments
  42. type copyWorkerArg struct {
  43. bucket *Bucket
  44. imur InitiateMultipartUploadResult
  45. srcBucketName string
  46. srcObjectKey string
  47. options []Option
  48. hook copyPartHook
  49. }
  50. // copyPartHook is the hook for testing purpose
  51. type copyPartHook func(part copyPart) error
  52. var copyPartHooker copyPartHook = defaultCopyPartHook
  53. func defaultCopyPartHook(part copyPart) error {
  54. return nil
  55. }
  56. // copyWorker copies worker
  57. func copyWorker(id int, arg copyWorkerArg, jobs <-chan copyPart, results chan<- UploadPart, failed chan<- error, die <-chan bool) {
  58. for chunk := range jobs {
  59. if err := arg.hook(chunk); err != nil {
  60. failed <- err
  61. break
  62. }
  63. chunkSize := chunk.End - chunk.Start + 1
  64. part, err := arg.bucket.UploadPartCopy(arg.imur, arg.srcBucketName, arg.srcObjectKey,
  65. chunk.Start, chunkSize, chunk.Number, arg.options...)
  66. if err != nil {
  67. failed <- err
  68. break
  69. }
  70. select {
  71. case <-die:
  72. return
  73. default:
  74. }
  75. results <- part
  76. }
  77. }
  78. // copyScheduler
  79. func copyScheduler(jobs chan copyPart, parts []copyPart) {
  80. for _, part := range parts {
  81. jobs <- part
  82. }
  83. close(jobs)
  84. }
  85. // copyPart structure
  86. type copyPart struct {
  87. Number int // Part number (from 1 to 10,000)
  88. Start int64 // The start index in the source file.
  89. End int64 // The end index in the source file
  90. }
  91. // getCopyParts calculates copy parts
  92. func getCopyParts(bucket *Bucket, objectKey string, partSize int64) ([]copyPart, error) {
  93. meta, err := bucket.GetObjectDetailedMeta(objectKey)
  94. if err != nil {
  95. return nil, err
  96. }
  97. parts := []copyPart{}
  98. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  99. if err != nil {
  100. return nil, err
  101. }
  102. part := copyPart{}
  103. i := 0
  104. for offset := int64(0); offset < objectSize; offset += partSize {
  105. part.Number = i + 1
  106. part.Start = offset
  107. part.End = GetPartEnd(offset, objectSize, partSize)
  108. parts = append(parts, part)
  109. i++
  110. }
  111. return parts, nil
  112. }
  113. // getSrcObjectBytes gets the source file size
  114. func getSrcObjectBytes(parts []copyPart) int64 {
  115. var ob int64
  116. for _, part := range parts {
  117. ob += (part.End - part.Start + 1)
  118. }
  119. return ob
  120. }
  121. // copyFile is a concurrently copy without checkpoint
  122. func (bucket Bucket) copyFile(srcBucketName, srcObjectKey, destBucketName, destObjectKey string,
  123. partSize int64, options []Option, routines int) error {
  124. descBucket, err := bucket.Client.Bucket(destBucketName)
  125. srcBucket, err := bucket.Client.Bucket(srcBucketName)
  126. listener := getProgressListener(options)
  127. // Get copy parts
  128. parts, err := getCopyParts(srcBucket, srcObjectKey, partSize)
  129. if err != nil {
  130. return err
  131. }
  132. // Initialize the multipart upload
  133. imur, err := descBucket.InitiateMultipartUpload(destObjectKey, options...)
  134. if err != nil {
  135. return err
  136. }
  137. jobs := make(chan copyPart, len(parts))
  138. results := make(chan UploadPart, len(parts))
  139. failed := make(chan error)
  140. die := make(chan bool)
  141. var completedBytes int64
  142. totalBytes := getSrcObjectBytes(parts)
  143. event := newProgressEvent(TransferStartedEvent, 0, totalBytes)
  144. publishProgress(listener, event)
  145. // Start to copy workers
  146. arg := copyWorkerArg{descBucket, imur, srcBucketName, srcObjectKey, options, copyPartHooker}
  147. for w := 1; w <= routines; w++ {
  148. go copyWorker(w, arg, jobs, results, failed, die)
  149. }
  150. // Start the scheduler
  151. go copyScheduler(jobs, parts)
  152. // Wait for the parts finished.
  153. completed := 0
  154. ups := make([]UploadPart, len(parts))
  155. for completed < len(parts) {
  156. select {
  157. case part := <-results:
  158. completed++
  159. ups[part.PartNumber-1] = part
  160. completedBytes += (parts[part.PartNumber-1].End - parts[part.PartNumber-1].Start + 1)
  161. event = newProgressEvent(TransferDataEvent, completedBytes, totalBytes)
  162. publishProgress(listener, event)
  163. case err := <-failed:
  164. close(die)
  165. descBucket.AbortMultipartUpload(imur)
  166. event = newProgressEvent(TransferFailedEvent, completedBytes, totalBytes)
  167. publishProgress(listener, event)
  168. return err
  169. }
  170. if completed >= len(parts) {
  171. break
  172. }
  173. }
  174. event = newProgressEvent(TransferCompletedEvent, completedBytes, totalBytes)
  175. publishProgress(listener, event)
  176. // Complete the multipart upload
  177. _, err = descBucket.CompleteMultipartUpload(imur, ups)
  178. if err != nil {
  179. bucket.AbortMultipartUpload(imur)
  180. return err
  181. }
  182. return nil
  183. }
  184. // ----- Concurrently copy with checkpoint -----
  185. const copyCpMagic = "84F1F18C-FF1D-403B-A1D8-9DEB5F65910A"
  186. type copyCheckpoint struct {
  187. Magic string // Magic
  188. MD5 string // CP content MD5
  189. SrcBucketName string // Source bucket
  190. SrcObjectKey string // Source object
  191. DestBucketName string // Target bucket
  192. DestObjectKey string // Target object
  193. CopyID string // Copy ID
  194. ObjStat objectStat // Object stat
  195. Parts []copyPart // Copy parts
  196. CopyParts []UploadPart // The uploaded parts
  197. PartStat []bool // The part status
  198. }
  199. // isValid checks if the data is valid which means CP is valid and object is not updated.
  200. func (cp copyCheckpoint) isValid(bucket *Bucket, objectKey string) (bool, error) {
  201. // Compare CP's magic number and the MD5.
  202. cpb := cp
  203. cpb.MD5 = ""
  204. js, _ := json.Marshal(cpb)
  205. sum := md5.Sum(js)
  206. b64 := base64.StdEncoding.EncodeToString(sum[:])
  207. if cp.Magic != downloadCpMagic || b64 != cp.MD5 {
  208. return false, nil
  209. }
  210. // Make sure the object is not updated.
  211. meta, err := bucket.GetObjectDetailedMeta(objectKey)
  212. if err != nil {
  213. return false, err
  214. }
  215. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  216. if err != nil {
  217. return false, err
  218. }
  219. // Compare the object size and last modified time and etag.
  220. if cp.ObjStat.Size != objectSize ||
  221. cp.ObjStat.LastModified != meta.Get(HTTPHeaderLastModified) ||
  222. cp.ObjStat.Etag != meta.Get(HTTPHeaderEtag) {
  223. return false, nil
  224. }
  225. return true, nil
  226. }
  227. // load loads from the checkpoint file
  228. func (cp *copyCheckpoint) load(filePath string) error {
  229. contents, err := ioutil.ReadFile(filePath)
  230. if err != nil {
  231. return err
  232. }
  233. err = json.Unmarshal(contents, cp)
  234. return err
  235. }
  236. // update updates the parts status
  237. func (cp *copyCheckpoint) update(part UploadPart) {
  238. cp.CopyParts[part.PartNumber-1] = part
  239. cp.PartStat[part.PartNumber-1] = true
  240. }
  241. // dump dumps the CP to the file
  242. func (cp *copyCheckpoint) dump(filePath string) error {
  243. bcp := *cp
  244. // Calculate MD5
  245. bcp.MD5 = ""
  246. js, err := json.Marshal(bcp)
  247. if err != nil {
  248. return err
  249. }
  250. sum := md5.Sum(js)
  251. b64 := base64.StdEncoding.EncodeToString(sum[:])
  252. bcp.MD5 = b64
  253. // Serialization
  254. js, err = json.Marshal(bcp)
  255. if err != nil {
  256. return err
  257. }
  258. // Dump
  259. return ioutil.WriteFile(filePath, js, FilePermMode)
  260. }
  261. // todoParts returns unfinished parts
  262. func (cp copyCheckpoint) todoParts() []copyPart {
  263. dps := []copyPart{}
  264. for i, ps := range cp.PartStat {
  265. if !ps {
  266. dps = append(dps, cp.Parts[i])
  267. }
  268. }
  269. return dps
  270. }
  271. // getCompletedBytes returns finished bytes count
  272. func (cp copyCheckpoint) getCompletedBytes() int64 {
  273. var completedBytes int64
  274. for i, part := range cp.Parts {
  275. if cp.PartStat[i] {
  276. completedBytes += (part.End - part.Start + 1)
  277. }
  278. }
  279. return completedBytes
  280. }
  281. // prepare initializes the multipart upload
  282. func (cp *copyCheckpoint) prepare(srcBucket *Bucket, srcObjectKey string, destBucket *Bucket, destObjectKey string,
  283. partSize int64, options []Option) error {
  284. // CP
  285. cp.Magic = copyCpMagic
  286. cp.SrcBucketName = srcBucket.BucketName
  287. cp.SrcObjectKey = srcObjectKey
  288. cp.DestBucketName = destBucket.BucketName
  289. cp.DestObjectKey = destObjectKey
  290. // Object
  291. meta, err := srcBucket.GetObjectDetailedMeta(srcObjectKey)
  292. if err != nil {
  293. return err
  294. }
  295. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  296. if err != nil {
  297. return err
  298. }
  299. cp.ObjStat.Size = objectSize
  300. cp.ObjStat.LastModified = meta.Get(HTTPHeaderLastModified)
  301. cp.ObjStat.Etag = meta.Get(HTTPHeaderEtag)
  302. // Parts
  303. cp.Parts, err = getCopyParts(srcBucket, srcObjectKey, partSize)
  304. if err != nil {
  305. return err
  306. }
  307. cp.PartStat = make([]bool, len(cp.Parts))
  308. for i := range cp.PartStat {
  309. cp.PartStat[i] = false
  310. }
  311. cp.CopyParts = make([]UploadPart, len(cp.Parts))
  312. // Init copy
  313. imur, err := destBucket.InitiateMultipartUpload(destObjectKey, options...)
  314. if err != nil {
  315. return err
  316. }
  317. cp.CopyID = imur.UploadID
  318. return nil
  319. }
  320. func (cp *copyCheckpoint) complete(bucket *Bucket, parts []UploadPart, cpFilePath string) error {
  321. imur := InitiateMultipartUploadResult{Bucket: cp.DestBucketName,
  322. Key: cp.DestObjectKey, UploadID: cp.CopyID}
  323. _, err := bucket.CompleteMultipartUpload(imur, parts)
  324. if err != nil {
  325. return err
  326. }
  327. os.Remove(cpFilePath)
  328. return err
  329. }
  330. // copyFileWithCp is concurrently copy with checkpoint
  331. func (bucket Bucket) copyFileWithCp(srcBucketName, srcObjectKey, destBucketName, destObjectKey string,
  332. partSize int64, options []Option, cpFilePath string, routines int) error {
  333. descBucket, err := bucket.Client.Bucket(destBucketName)
  334. srcBucket, err := bucket.Client.Bucket(srcBucketName)
  335. listener := getProgressListener(options)
  336. // Load CP data
  337. ccp := copyCheckpoint{}
  338. err = ccp.load(cpFilePath)
  339. if err != nil {
  340. os.Remove(cpFilePath)
  341. }
  342. // Load error or the CP data is invalid---reinitialize
  343. valid, err := ccp.isValid(srcBucket, srcObjectKey)
  344. if err != nil || !valid {
  345. if err = ccp.prepare(srcBucket, srcObjectKey, descBucket, destObjectKey, partSize, options); err != nil {
  346. return err
  347. }
  348. os.Remove(cpFilePath)
  349. }
  350. // Unfinished parts
  351. parts := ccp.todoParts()
  352. imur := InitiateMultipartUploadResult{
  353. Bucket: destBucketName,
  354. Key: destObjectKey,
  355. UploadID: ccp.CopyID}
  356. jobs := make(chan copyPart, len(parts))
  357. results := make(chan UploadPart, len(parts))
  358. failed := make(chan error)
  359. die := make(chan bool)
  360. completedBytes := ccp.getCompletedBytes()
  361. event := newProgressEvent(TransferStartedEvent, completedBytes, ccp.ObjStat.Size)
  362. publishProgress(listener, event)
  363. // Start the worker coroutines
  364. arg := copyWorkerArg{descBucket, imur, srcBucketName, srcObjectKey, options, copyPartHooker}
  365. for w := 1; w <= routines; w++ {
  366. go copyWorker(w, arg, jobs, results, failed, die)
  367. }
  368. // Start the scheduler
  369. go copyScheduler(jobs, parts)
  370. // Wait for the parts completed.
  371. completed := 0
  372. for completed < len(parts) {
  373. select {
  374. case part := <-results:
  375. completed++
  376. ccp.update(part)
  377. ccp.dump(cpFilePath)
  378. completedBytes += (parts[part.PartNumber-1].End - parts[part.PartNumber-1].Start + 1)
  379. event = newProgressEvent(TransferDataEvent, completedBytes, ccp.ObjStat.Size)
  380. publishProgress(listener, event)
  381. case err := <-failed:
  382. close(die)
  383. event = newProgressEvent(TransferFailedEvent, completedBytes, ccp.ObjStat.Size)
  384. publishProgress(listener, event)
  385. return err
  386. }
  387. if completed >= len(parts) {
  388. break
  389. }
  390. }
  391. event = newProgressEvent(TransferCompletedEvent, completedBytes, ccp.ObjStat.Size)
  392. publishProgress(listener, event)
  393. return ccp.complete(descBucket, ccp.CopyParts, cpFilePath)
  394. }