multicopy.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468
  1. package oss
  2. import (
  3. "crypto/md5"
  4. "encoding/base64"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "io/ioutil"
  9. "net/http"
  10. "os"
  11. "strconv"
  12. )
  13. // CopyFile is multipart copy object
  14. //
  15. // srcBucketName source bucket name
  16. // srcObjectKey source object name
  17. // destObjectKey target object name in the form of bucketname.objectkey
  18. // partSize the part size in byte.
  19. // options object's contraints. Check out function InitiateMultipartUpload.
  20. //
  21. // error it's nil if the operation succeeds, otherwise it's an error object.
  22. //
  23. func (bucket Bucket) CopyFile(srcBucketName, srcObjectKey, destObjectKey string, partSize int64, options ...Option) error {
  24. destBucketName := bucket.BucketName
  25. if partSize < MinPartSize || partSize > MaxPartSize {
  26. return errors.New("oss: part size invalid range (1024KB, 5GB]")
  27. }
  28. cpConf := getCpConfig(options)
  29. routines := getRoutines(options)
  30. if cpConf != nil && cpConf.IsEnable {
  31. cpFilePath := getCopyCpFilePath(cpConf, srcBucketName, srcObjectKey, destBucketName, destObjectKey)
  32. if cpFilePath != "" {
  33. return bucket.copyFileWithCp(srcBucketName, srcObjectKey, destBucketName, destObjectKey, partSize, options, cpFilePath, routines)
  34. }
  35. }
  36. return bucket.copyFile(srcBucketName, srcObjectKey, destBucketName, destObjectKey,
  37. partSize, options, routines)
  38. }
  39. func getCopyCpFilePath(cpConf *cpConfig, srcBucket, srcObject, destBucket, destObject string) string {
  40. if cpConf.FilePath == "" && cpConf.DirPath != "" {
  41. dest := fmt.Sprintf("oss://%v/%v", destBucket, destObject)
  42. src := fmt.Sprintf("oss://%v/%v", srcBucket, srcObject)
  43. cpFileName := getCpFileName(src, dest)
  44. cpConf.FilePath = cpConf.DirPath + string(os.PathSeparator) + cpFileName
  45. }
  46. return cpConf.FilePath
  47. }
  48. // ----- Concurrently copy without checkpoint ---------
  49. // copyWorkerArg defines the copy worker arguments
  50. type copyWorkerArg struct {
  51. bucket *Bucket
  52. imur InitiateMultipartUploadResult
  53. srcBucketName string
  54. srcObjectKey string
  55. options []Option
  56. hook copyPartHook
  57. }
  58. // copyPartHook is the hook for testing purpose
  59. type copyPartHook func(part copyPart) error
  60. var copyPartHooker copyPartHook = defaultCopyPartHook
  61. func defaultCopyPartHook(part copyPart) error {
  62. return nil
  63. }
  64. // copyWorker copies worker
  65. func copyWorker(id int, arg copyWorkerArg, jobs <-chan copyPart, results chan<- UploadPart, failed chan<- error, die <-chan bool) {
  66. for chunk := range jobs {
  67. if err := arg.hook(chunk); err != nil {
  68. failed <- err
  69. break
  70. }
  71. chunkSize := chunk.End - chunk.Start + 1
  72. part, err := arg.bucket.UploadPartCopy(arg.imur, arg.srcBucketName, arg.srcObjectKey,
  73. chunk.Start, chunkSize, chunk.Number, arg.options...)
  74. if err != nil {
  75. failed <- err
  76. break
  77. }
  78. select {
  79. case <-die:
  80. return
  81. default:
  82. }
  83. results <- part
  84. }
  85. }
  86. // copyScheduler
  87. func copyScheduler(jobs chan copyPart, parts []copyPart) {
  88. for _, part := range parts {
  89. jobs <- part
  90. }
  91. close(jobs)
  92. }
  93. // copyPart structure
  94. type copyPart struct {
  95. Number int // Part number (from 1 to 10,000)
  96. Start int64 // The start index in the source file.
  97. End int64 // The end index in the source file
  98. }
  99. // getCopyParts calculates copy parts
  100. func getCopyParts(objectSize, partSize int64) []copyPart {
  101. parts := []copyPart{}
  102. part := copyPart{}
  103. i := 0
  104. for offset := int64(0); offset < objectSize; offset += partSize {
  105. part.Number = i + 1
  106. part.Start = offset
  107. part.End = GetPartEnd(offset, objectSize, partSize)
  108. parts = append(parts, part)
  109. i++
  110. }
  111. return parts
  112. }
  113. // getSrcObjectBytes gets the source file size
  114. func getSrcObjectBytes(parts []copyPart) int64 {
  115. var ob int64
  116. for _, part := range parts {
  117. ob += (part.End - part.Start + 1)
  118. }
  119. return ob
  120. }
  121. // copyFile is a concurrently copy without checkpoint
  122. func (bucket Bucket) copyFile(srcBucketName, srcObjectKey, destBucketName, destObjectKey string,
  123. partSize int64, options []Option, routines int) error {
  124. descBucket, err := bucket.Client.Bucket(destBucketName)
  125. srcBucket, err := bucket.Client.Bucket(srcBucketName)
  126. listener := getProgressListener(options)
  127. payerOptions := []Option{}
  128. payer := getPayer(options)
  129. if payer != "" {
  130. payerOptions = append(payerOptions, RequestPayer(PayerType(payer)))
  131. }
  132. meta, err := srcBucket.GetObjectDetailedMeta(srcObjectKey, payerOptions...)
  133. if err != nil {
  134. return err
  135. }
  136. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  137. if err != nil {
  138. return err
  139. }
  140. // Get copy parts
  141. parts := getCopyParts(objectSize, partSize)
  142. // Initialize the multipart upload
  143. imur, err := descBucket.InitiateMultipartUpload(destObjectKey, options...)
  144. if err != nil {
  145. return err
  146. }
  147. jobs := make(chan copyPart, len(parts))
  148. results := make(chan UploadPart, len(parts))
  149. failed := make(chan error)
  150. die := make(chan bool)
  151. var completedBytes int64
  152. totalBytes := getSrcObjectBytes(parts)
  153. event := newProgressEvent(TransferStartedEvent, 0, totalBytes)
  154. publishProgress(listener, event)
  155. // Start to copy workers
  156. arg := copyWorkerArg{descBucket, imur, srcBucketName, srcObjectKey, payerOptions, copyPartHooker}
  157. for w := 1; w <= routines; w++ {
  158. go copyWorker(w, arg, jobs, results, failed, die)
  159. }
  160. // Start the scheduler
  161. go copyScheduler(jobs, parts)
  162. // Wait for the parts finished.
  163. completed := 0
  164. ups := make([]UploadPart, len(parts))
  165. for completed < len(parts) {
  166. select {
  167. case part := <-results:
  168. completed++
  169. ups[part.PartNumber-1] = part
  170. completedBytes += (parts[part.PartNumber-1].End - parts[part.PartNumber-1].Start + 1)
  171. event = newProgressEvent(TransferDataEvent, completedBytes, totalBytes)
  172. publishProgress(listener, event)
  173. case err := <-failed:
  174. close(die)
  175. descBucket.AbortMultipartUpload(imur, payerOptions...)
  176. event = newProgressEvent(TransferFailedEvent, completedBytes, totalBytes)
  177. publishProgress(listener, event)
  178. return err
  179. }
  180. if completed >= len(parts) {
  181. break
  182. }
  183. }
  184. event = newProgressEvent(TransferCompletedEvent, completedBytes, totalBytes)
  185. publishProgress(listener, event)
  186. // Complete the multipart upload
  187. _, err = descBucket.CompleteMultipartUpload(imur, ups, payerOptions...)
  188. if err != nil {
  189. bucket.AbortMultipartUpload(imur, payerOptions...)
  190. return err
  191. }
  192. return nil
  193. }
  194. // ----- Concurrently copy with checkpoint -----
  195. const copyCpMagic = "84F1F18C-FF1D-403B-A1D8-9DEB5F65910A"
  196. type copyCheckpoint struct {
  197. Magic string // Magic
  198. MD5 string // CP content MD5
  199. SrcBucketName string // Source bucket
  200. SrcObjectKey string // Source object
  201. DestBucketName string // Target bucket
  202. DestObjectKey string // Target object
  203. CopyID string // Copy ID
  204. ObjStat objectStat // Object stat
  205. Parts []copyPart // Copy parts
  206. CopyParts []UploadPart // The uploaded parts
  207. PartStat []bool // The part status
  208. }
  209. // isValid checks if the data is valid which means CP is valid and object is not updated.
  210. func (cp copyCheckpoint) isValid(meta http.Header) (bool, error) {
  211. // Compare CP's magic number and the MD5.
  212. cpb := cp
  213. cpb.MD5 = ""
  214. js, _ := json.Marshal(cpb)
  215. sum := md5.Sum(js)
  216. b64 := base64.StdEncoding.EncodeToString(sum[:])
  217. if cp.Magic != downloadCpMagic || b64 != cp.MD5 {
  218. return false, nil
  219. }
  220. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  221. if err != nil {
  222. return false, err
  223. }
  224. // Compare the object size and last modified time and etag.
  225. if cp.ObjStat.Size != objectSize ||
  226. cp.ObjStat.LastModified != meta.Get(HTTPHeaderLastModified) ||
  227. cp.ObjStat.Etag != meta.Get(HTTPHeaderEtag) {
  228. return false, nil
  229. }
  230. return true, nil
  231. }
  232. // load loads from the checkpoint file
  233. func (cp *copyCheckpoint) load(filePath string) error {
  234. contents, err := ioutil.ReadFile(filePath)
  235. if err != nil {
  236. return err
  237. }
  238. err = json.Unmarshal(contents, cp)
  239. return err
  240. }
  241. // update updates the parts status
  242. func (cp *copyCheckpoint) update(part UploadPart) {
  243. cp.CopyParts[part.PartNumber-1] = part
  244. cp.PartStat[part.PartNumber-1] = true
  245. }
  246. // dump dumps the CP to the file
  247. func (cp *copyCheckpoint) dump(filePath string) error {
  248. bcp := *cp
  249. // Calculate MD5
  250. bcp.MD5 = ""
  251. js, err := json.Marshal(bcp)
  252. if err != nil {
  253. return err
  254. }
  255. sum := md5.Sum(js)
  256. b64 := base64.StdEncoding.EncodeToString(sum[:])
  257. bcp.MD5 = b64
  258. // Serialization
  259. js, err = json.Marshal(bcp)
  260. if err != nil {
  261. return err
  262. }
  263. // Dump
  264. return ioutil.WriteFile(filePath, js, FilePermMode)
  265. }
  266. // todoParts returns unfinished parts
  267. func (cp copyCheckpoint) todoParts() []copyPart {
  268. dps := []copyPart{}
  269. for i, ps := range cp.PartStat {
  270. if !ps {
  271. dps = append(dps, cp.Parts[i])
  272. }
  273. }
  274. return dps
  275. }
  276. // getCompletedBytes returns finished bytes count
  277. func (cp copyCheckpoint) getCompletedBytes() int64 {
  278. var completedBytes int64
  279. for i, part := range cp.Parts {
  280. if cp.PartStat[i] {
  281. completedBytes += (part.End - part.Start + 1)
  282. }
  283. }
  284. return completedBytes
  285. }
  286. // prepare initializes the multipart upload
  287. func (cp *copyCheckpoint) prepare(meta http.Header, srcBucket *Bucket, srcObjectKey string, destBucket *Bucket, destObjectKey string,
  288. partSize int64, options []Option) error {
  289. // CP
  290. cp.Magic = copyCpMagic
  291. cp.SrcBucketName = srcBucket.BucketName
  292. cp.SrcObjectKey = srcObjectKey
  293. cp.DestBucketName = destBucket.BucketName
  294. cp.DestObjectKey = destObjectKey
  295. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  296. if err != nil {
  297. return err
  298. }
  299. cp.ObjStat.Size = objectSize
  300. cp.ObjStat.LastModified = meta.Get(HTTPHeaderLastModified)
  301. cp.ObjStat.Etag = meta.Get(HTTPHeaderEtag)
  302. // Parts
  303. cp.Parts = getCopyParts(objectSize, partSize)
  304. cp.PartStat = make([]bool, len(cp.Parts))
  305. for i := range cp.PartStat {
  306. cp.PartStat[i] = false
  307. }
  308. cp.CopyParts = make([]UploadPart, len(cp.Parts))
  309. // Init copy
  310. imur, err := destBucket.InitiateMultipartUpload(destObjectKey, options...)
  311. if err != nil {
  312. return err
  313. }
  314. cp.CopyID = imur.UploadID
  315. return nil
  316. }
  317. func (cp *copyCheckpoint) complete(bucket *Bucket, parts []UploadPart, cpFilePath string, options []Option) error {
  318. imur := InitiateMultipartUploadResult{Bucket: cp.DestBucketName,
  319. Key: cp.DestObjectKey, UploadID: cp.CopyID}
  320. _, err := bucket.CompleteMultipartUpload(imur, parts, options...)
  321. if err != nil {
  322. return err
  323. }
  324. os.Remove(cpFilePath)
  325. return err
  326. }
  327. // copyFileWithCp is concurrently copy with checkpoint
  328. func (bucket Bucket) copyFileWithCp(srcBucketName, srcObjectKey, destBucketName, destObjectKey string,
  329. partSize int64, options []Option, cpFilePath string, routines int) error {
  330. descBucket, err := bucket.Client.Bucket(destBucketName)
  331. srcBucket, err := bucket.Client.Bucket(srcBucketName)
  332. listener := getProgressListener(options)
  333. payerOptions := []Option{}
  334. payer := getPayer(options)
  335. if payer != "" {
  336. payerOptions = append(payerOptions, RequestPayer(PayerType(payer)))
  337. }
  338. // Load CP data
  339. ccp := copyCheckpoint{}
  340. err = ccp.load(cpFilePath)
  341. if err != nil {
  342. os.Remove(cpFilePath)
  343. }
  344. // Make sure the object is not updated.
  345. meta, err := srcBucket.GetObjectDetailedMeta(srcObjectKey, payerOptions...)
  346. if err != nil {
  347. return err
  348. }
  349. // Load error or the CP data is invalid---reinitialize
  350. valid, err := ccp.isValid(meta)
  351. if err != nil || !valid {
  352. if err = ccp.prepare(meta, srcBucket, srcObjectKey, descBucket, destObjectKey, partSize, options); err != nil {
  353. return err
  354. }
  355. os.Remove(cpFilePath)
  356. }
  357. // Unfinished parts
  358. parts := ccp.todoParts()
  359. imur := InitiateMultipartUploadResult{
  360. Bucket: destBucketName,
  361. Key: destObjectKey,
  362. UploadID: ccp.CopyID}
  363. jobs := make(chan copyPart, len(parts))
  364. results := make(chan UploadPart, len(parts))
  365. failed := make(chan error)
  366. die := make(chan bool)
  367. completedBytes := ccp.getCompletedBytes()
  368. event := newProgressEvent(TransferStartedEvent, completedBytes, ccp.ObjStat.Size)
  369. publishProgress(listener, event)
  370. // Start the worker coroutines
  371. arg := copyWorkerArg{descBucket, imur, srcBucketName, srcObjectKey, payerOptions, copyPartHooker}
  372. for w := 1; w <= routines; w++ {
  373. go copyWorker(w, arg, jobs, results, failed, die)
  374. }
  375. // Start the scheduler
  376. go copyScheduler(jobs, parts)
  377. // Wait for the parts completed.
  378. completed := 0
  379. for completed < len(parts) {
  380. select {
  381. case part := <-results:
  382. completed++
  383. ccp.update(part)
  384. ccp.dump(cpFilePath)
  385. completedBytes += (parts[part.PartNumber-1].End - parts[part.PartNumber-1].Start + 1)
  386. event = newProgressEvent(TransferDataEvent, completedBytes, ccp.ObjStat.Size)
  387. publishProgress(listener, event)
  388. case err := <-failed:
  389. close(die)
  390. event = newProgressEvent(TransferFailedEvent, completedBytes, ccp.ObjStat.Size)
  391. publishProgress(listener, event)
  392. return err
  393. }
  394. if completed >= len(parts) {
  395. break
  396. }
  397. }
  398. event = newProgressEvent(TransferCompletedEvent, completedBytes, ccp.ObjStat.Size)
  399. publishProgress(listener, event)
  400. return ccp.complete(descBucket, ccp.CopyParts, cpFilePath, payerOptions)
  401. }