multicopy.go 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413
  1. package oss
  2. import (
  3. "crypto/md5"
  4. "encoding/base64"
  5. "encoding/json"
  6. "errors"
  7. "io/ioutil"
  8. "os"
  9. "path/filepath"
  10. "strconv"
  11. )
  12. //
  13. // CopyFile 分片复制文件
  14. //
  15. // objectKey object key。
  16. // filePath 本地文件。objectKey下载到文件。
  17. // partSize 本次上传文件片的大小,字节数。比如100 * 1024为每片100KB。
  18. // options Object的属性限制项。详见InitiateMultipartUpload。
  19. //
  20. // error 操作成功error为nil,非nil为错误信息。
  21. //
  22. func (bucket Bucket) CopyFile(srcBucketName, srcObjectKey, destBucketName, destObjectKey string,
  23. partSize int64, options ...Option) error {
  24. if partSize < MinPartSize || partSize > MaxPartSize {
  25. return errors.New("oss: part size invalid range (1024KB, 5GB]")
  26. }
  27. cpConf, err := getCpConfig(options, filepath.Base(destObjectKey))
  28. if err != nil {
  29. return err
  30. }
  31. routines := getRoutines(options)
  32. if cpConf.IsEnable {
  33. return bucket.copyFileWithCp(srcBucketName, srcObjectKey, destBucketName, destObjectKey,
  34. partSize, options, cpConf.FilePath, routines)
  35. }
  36. return bucket.copyFile(srcBucketName, srcObjectKey, destBucketName, destObjectKey,
  37. partSize, options, routines)
  38. }
  39. // ----- 并发无断点的下载 -----
  40. // 工作协程参数
  41. type copyWorkerArg struct {
  42. bucket *Bucket
  43. imur InitiateMultipartUploadResult
  44. srcBucketName string
  45. srcObjectKey string
  46. options []Option
  47. hook copyPartHook
  48. }
  49. // Hook用于测试
  50. type copyPartHook func(part copyPart) error
  51. var copyPartHooker copyPartHook = defaultCopyPartHook
  52. func defaultCopyPartHook(part copyPart) error {
  53. return nil
  54. }
  55. // 工作协程
  56. func copyWorker(id int, arg copyWorkerArg, jobs <-chan copyPart, results chan<- UploadPart, failed chan<- error, die <- chan bool) {
  57. for chunk := range jobs {
  58. if err := arg.hook(chunk); err != nil {
  59. failed <- err
  60. break
  61. }
  62. chunkSize := chunk.End - chunk.Start + 1
  63. part, err := arg.bucket.UploadPartCopy(arg.imur, arg.srcBucketName, arg.srcObjectKey,
  64. chunk.Start, chunkSize, chunk.Number, arg.options...)
  65. if err != nil {
  66. failed <- err
  67. break
  68. }
  69. select {
  70. case <-die:
  71. return
  72. default:
  73. }
  74. results <- part
  75. }
  76. }
  77. // 调度协程
  78. func copyScheduler(jobs chan copyPart, parts []copyPart) {
  79. for _, part := range parts {
  80. jobs <- part
  81. }
  82. close(jobs)
  83. }
  84. // 分片
  85. type copyPart struct {
  86. Number int // 片序号[1, 10000]
  87. Start int64 // 片起始位置
  88. End int64 // 片结束位置
  89. }
  90. // 文件分片
  91. func getCopyParts(bucket *Bucket, objectKey string, partSize int64) ([]copyPart, error) {
  92. meta, err := bucket.GetObjectDetailedMeta(objectKey)
  93. if err != nil {
  94. return nil, err
  95. }
  96. parts := []copyPart{}
  97. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  98. if err != nil {
  99. return nil, err
  100. }
  101. part := copyPart{}
  102. i := 0
  103. for offset := int64(0); offset < objectSize; offset += partSize {
  104. part.Number = i + 1
  105. part.Start = offset
  106. part.End = GetPartEnd(offset, objectSize, partSize)
  107. parts = append(parts, part)
  108. i++
  109. }
  110. return parts, nil
  111. }
  112. // 并发无断点续传的下载
  113. func (bucket Bucket) copyFile(srcBucketName, srcObjectKey, destBucketName, destObjectKey string,
  114. partSize int64, options []Option, routines int) error {
  115. descBucket, err := bucket.Client.Bucket(destBucketName)
  116. srcBucket, err := bucket.Client.Bucket(srcBucketName)
  117. // 分割文件
  118. parts, err := getCopyParts(srcBucket, srcObjectKey, partSize)
  119. if err != nil {
  120. return err
  121. }
  122. // 初始化上传任务
  123. imur, err := descBucket.InitiateMultipartUpload(destObjectKey, options...)
  124. if err != nil {
  125. return err
  126. }
  127. jobs := make(chan copyPart, len(parts))
  128. results := make(chan UploadPart, len(parts))
  129. failed := make(chan error)
  130. die := make(chan bool)
  131. // 启动工作协程
  132. arg := copyWorkerArg{descBucket, imur, srcBucketName, srcObjectKey, options, copyPartHooker}
  133. for w := 1; w <= routines; w++ {
  134. go copyWorker(w, arg, jobs, results, failed, die)
  135. }
  136. // 并发上传分片
  137. go copyScheduler(jobs, parts)
  138. // 等待分片下载完成
  139. completed := 0
  140. ups := make([]UploadPart, len(parts))
  141. for completed < len(parts) {
  142. select {
  143. case part := <-results:
  144. completed++
  145. ups[part.PartNumber-1] = part
  146. case err := <-failed:
  147. close(die)
  148. descBucket.AbortMultipartUpload(imur)
  149. return err
  150. }
  151. if completed >= len(parts) {
  152. break
  153. }
  154. }
  155. // 提交任务
  156. _, err = descBucket.CompleteMultipartUpload(imur, ups)
  157. if err != nil {
  158. bucket.AbortMultipartUpload(imur)
  159. return err
  160. }
  161. return nil
  162. }
  163. // ----- 并发有断点的下载 -----
  164. const copyCpMagic = "84F1F18C-FF1D-403B-A1D8-9DEB5F65910A"
  165. type copyCheckpoint struct {
  166. Magic string // magic
  167. MD5 string // cp内容的MD5
  168. SrcBucketName string // 源Bucket
  169. SrcObjectKey string // 源Object
  170. DestBucketName string // 目标Bucket
  171. DestObjectKey string // 目标Bucket
  172. CopyID string // copy id
  173. ObjStat objectStat // 文件状态
  174. Parts []copyPart // 全部分片
  175. CopyParts []UploadPart // 分片上传成功后的返回值
  176. PartStat []bool // 分片下载是否完成
  177. }
  178. // CP数据是否有效,CP有效且Object没有更新时有效
  179. func (cp copyCheckpoint) isValid(bucket *Bucket, objectKey string) (bool, error) {
  180. // 比较CP的Magic及MD5
  181. cpb := cp
  182. cpb.MD5 = ""
  183. js, _ := json.Marshal(cpb)
  184. sum := md5.Sum(js)
  185. b64 := base64.StdEncoding.EncodeToString(sum[:])
  186. if cp.Magic != downloadCpMagic || b64 != cp.MD5 {
  187. return false, nil
  188. }
  189. // 确认object没有更新
  190. meta, err := bucket.GetObjectDetailedMeta(objectKey)
  191. if err != nil {
  192. return false, err
  193. }
  194. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  195. if err != nil {
  196. return false, err
  197. }
  198. // 比较Object的大小/最后修改时间/etag
  199. if cp.ObjStat.Size != objectSize ||
  200. cp.ObjStat.LastModified != meta.Get(HTTPHeaderLastModified) ||
  201. cp.ObjStat.Etag != meta.Get(HTTPHeaderEtag) {
  202. return false, nil
  203. }
  204. return true, nil
  205. }
  206. // 从文件中load
  207. func (cp *copyCheckpoint) load(filePath string) error {
  208. contents, err := ioutil.ReadFile(filePath)
  209. if err != nil {
  210. return err
  211. }
  212. err = json.Unmarshal(contents, cp)
  213. return err
  214. }
  215. // 更新分片状态
  216. func (cp *copyCheckpoint) update(part UploadPart) {
  217. cp.CopyParts[part.PartNumber - 1] = part
  218. cp.PartStat[part.PartNumber - 1] = true
  219. }
  220. // dump到文件
  221. func (cp *copyCheckpoint) dump(filePath string) error {
  222. bcp := *cp
  223. // 计算MD5
  224. bcp.MD5 = ""
  225. js, err := json.Marshal(bcp)
  226. if err != nil {
  227. return err
  228. }
  229. sum := md5.Sum(js)
  230. b64 := base64.StdEncoding.EncodeToString(sum[:])
  231. bcp.MD5 = b64
  232. // 序列化
  233. js, err = json.Marshal(bcp)
  234. if err != nil {
  235. return err
  236. }
  237. // dump
  238. return ioutil.WriteFile(filePath, js, 0644)
  239. }
  240. // 未完成的分片
  241. func (cp copyCheckpoint) todoParts() []copyPart {
  242. dps := []copyPart{}
  243. for i, ps := range cp.PartStat {
  244. if !ps {
  245. dps = append(dps, cp.Parts[i])
  246. }
  247. }
  248. return dps
  249. }
  250. // 初始化下载任务
  251. func (cp *copyCheckpoint) prepare(srcBucket *Bucket, srcObjectKey string, destBucket *Bucket, destObjectKey string,
  252. partSize int64, options []Option) error {
  253. // cp
  254. cp.Magic = copyCpMagic
  255. cp.SrcBucketName = srcBucket.BucketName
  256. cp.SrcObjectKey = srcObjectKey
  257. cp.DestBucketName = destBucket.BucketName
  258. cp.DestObjectKey = destObjectKey
  259. // object
  260. meta, err := srcBucket.GetObjectDetailedMeta(srcObjectKey)
  261. if err != nil {
  262. return err
  263. }
  264. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  265. if err != nil {
  266. return err
  267. }
  268. cp.ObjStat.Size = objectSize
  269. cp.ObjStat.LastModified = meta.Get(HTTPHeaderLastModified)
  270. cp.ObjStat.Etag = meta.Get(HTTPHeaderEtag)
  271. // parts
  272. cp.Parts, err = getCopyParts(srcBucket, srcObjectKey, partSize)
  273. if err != nil {
  274. return err
  275. }
  276. cp.PartStat = make([]bool, len(cp.Parts))
  277. for i := range cp.PartStat {
  278. cp.PartStat[i] = false
  279. }
  280. cp.CopyParts = make([]UploadPart, len(cp.Parts))
  281. // init copy
  282. imur, err := destBucket.InitiateMultipartUpload(destObjectKey, options...)
  283. if err != nil {
  284. return err
  285. }
  286. cp.CopyID = imur.UploadID
  287. return nil
  288. }
  289. func (cp *copyCheckpoint) complete(bucket *Bucket, parts []UploadPart, cpFilePath string) error {
  290. imur := InitiateMultipartUploadResult{Bucket: cp.DestBucketName,
  291. Key: cp.DestObjectKey, UploadID: cp.CopyID}
  292. _, err := bucket.CompleteMultipartUpload(imur, parts)
  293. if err != nil {
  294. return err
  295. }
  296. os.Remove(cpFilePath)
  297. return err
  298. }
  299. // 并发带断点的下载
  300. func (bucket Bucket) copyFileWithCp(srcBucketName, srcObjectKey, destBucketName, destObjectKey string,
  301. partSize int64, options []Option, cpFilePath string, routines int) error {
  302. descBucket, err := bucket.Client.Bucket(destBucketName)
  303. srcBucket, err := bucket.Client.Bucket(srcBucketName)
  304. // LOAD CP数据
  305. ccp := copyCheckpoint{}
  306. err = ccp.load(cpFilePath)
  307. if err != nil {
  308. os.Remove(cpFilePath)
  309. }
  310. // LOAD出错或数据无效重新初始化下载
  311. valid, err := ccp.isValid(srcBucket, srcObjectKey)
  312. if err != nil || !valid {
  313. if err = ccp.prepare(srcBucket, srcObjectKey, descBucket, destObjectKey, partSize, options); err != nil {
  314. return err
  315. }
  316. os.Remove(cpFilePath)
  317. }
  318. // 未完成的分片
  319. parts := ccp.todoParts()
  320. imur := InitiateMultipartUploadResult{
  321. Bucket: destBucketName,
  322. Key: destObjectKey,
  323. UploadID: ccp.CopyID}
  324. jobs := make(chan copyPart, len(parts))
  325. results := make(chan UploadPart, len(parts))
  326. failed := make(chan error)
  327. die := make(chan bool)
  328. // 启动工作协程
  329. arg := copyWorkerArg{descBucket, imur, srcBucketName, srcObjectKey, options, copyPartHooker}
  330. for w := 1; w <= routines; w++ {
  331. go copyWorker(w, arg, jobs, results, failed, die)
  332. }
  333. // 并发下载分片
  334. go copyScheduler(jobs, parts)
  335. // 等待分片下载完成
  336. completed := 0
  337. for completed < len(parts) {
  338. select {
  339. case part := <-results:
  340. completed++
  341. ccp.update(part);
  342. ccp.dump(cpFilePath)
  343. case err := <-failed:
  344. close(die)
  345. return err
  346. }
  347. if completed >= len(parts) {
  348. break
  349. }
  350. }
  351. return ccp.complete(descBucket, ccp.CopyParts, cpFilePath)
  352. }