upload.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438
  1. package oss
  2. import (
  3. "crypto/md5"
  4. "encoding/base64"
  5. "encoding/json"
  6. "errors"
  7. "io/ioutil"
  8. "os"
  9. "strconv"
  10. "time"
  11. )
  12. //
  13. // UploadFile 分片上传文件
  14. //
  15. // objectKey object名称。
  16. // filePath 本地文件。需要上传的文件。
  17. // partSize 本次上传文件片的大小,字节数。比如100 * 1024为每片100KB。
  18. // options 上传Object时可以指定Object的属性。详见InitiateMultipartUpload。
  19. //
  20. // error 操作成功为nil,非nil为错误信息。
  21. //
  22. func (bucket Bucket) UploadFile(objectKey, filePath string, partSize int64, options ...Option) error {
  23. if partSize < MinPartSize || partSize > MaxPartSize {
  24. return errors.New("oss: part size invalid range (1024KB, 5GB]")
  25. }
  26. cpConf, err := getCpConfig(options, filePath)
  27. if err != nil {
  28. return err
  29. }
  30. routines := getRoutines(options)
  31. if cpConf.IsEnable {
  32. return bucket.uploadFileWithCp(objectKey, filePath, partSize, options, cpConf.FilePath, routines)
  33. }
  34. return bucket.uploadFile(objectKey, filePath, partSize, options, routines)
  35. }
  36. // ----- 并发无断点的上传 -----
  37. // 获取Checkpoint配置
  38. func getCpConfig(options []Option, filePath string) (*cpConfig, error) {
  39. cpc := cpConfig{}
  40. cpStr, err := findOption(options, checkpointConfig, "")
  41. if err != nil {
  42. return nil, err
  43. }
  44. if cpStr != "" {
  45. if err = json.Unmarshal([]byte(cpStr), &cpc); err != nil {
  46. return nil, err
  47. }
  48. }
  49. if cpc.IsEnable && cpc.FilePath == "" {
  50. cpc.FilePath = filePath + CheckpointFileSuffix
  51. }
  52. return &cpc, nil
  53. }
  54. // 获取并发数,默认并发数1
  55. func getRoutines(options []Option) int {
  56. rStr, err := findOption(options, routineNum, "")
  57. if err != nil || rStr == "" {
  58. return 1
  59. }
  60. rs, err := strconv.Atoi(rStr)
  61. if err != nil {
  62. return 1
  63. }
  64. if rs < 1 {
  65. rs = 1
  66. } else if rs > 100 {
  67. rs = 100
  68. }
  69. return rs
  70. }
  71. // 测试使用
  72. type uploadPartHook func(id int, chunk FileChunk) error
  73. var uploadPartHooker uploadPartHook = defaultUploadPart
  74. func defaultUploadPart(id int, chunk FileChunk) error {
  75. return nil
  76. }
  77. // 工作协程参数
  78. type workerArg struct {
  79. bucket *Bucket
  80. filePath string
  81. imur InitiateMultipartUploadResult
  82. hook uploadPartHook
  83. }
  84. // 工作协程
  85. func worker(id int, arg workerArg, jobs <-chan FileChunk, results chan<- UploadPart, failed chan<- error, die <-chan bool) {
  86. for chunk := range jobs {
  87. if err := arg.hook(id, chunk); err != nil {
  88. failed <- err
  89. break
  90. }
  91. part, err := arg.bucket.UploadPartFromFile(arg.imur, arg.filePath, chunk.Offset, chunk.Size, chunk.Number)
  92. if err != nil {
  93. failed <- err
  94. break
  95. }
  96. select {
  97. case <-die:
  98. return
  99. default:
  100. }
  101. results <- part
  102. }
  103. }
  104. // 调度协程
  105. func scheduler(jobs chan FileChunk, chunks []FileChunk) {
  106. for _, chunk := range chunks {
  107. jobs <- chunk
  108. }
  109. close(jobs)
  110. }
  111. // 并发上传,不带断点续传功能
  112. func (bucket Bucket) uploadFile(objectKey, filePath string, partSize int64, options []Option, routines int) error {
  113. chunks, err := SplitFileByPartSize(filePath, partSize)
  114. if err != nil {
  115. return err
  116. }
  117. // 初始化上传任务
  118. imur, err := bucket.InitiateMultipartUpload(objectKey, options...)
  119. if err != nil {
  120. return err
  121. }
  122. jobs := make(chan FileChunk, len(chunks))
  123. results := make(chan UploadPart, len(chunks))
  124. failed := make(chan error)
  125. die := make(chan bool)
  126. // 启动工作协程
  127. arg := workerArg{&bucket, filePath, imur, uploadPartHooker}
  128. for w := 1; w <= routines; w++ {
  129. go worker(w, arg, jobs, results, failed, die)
  130. }
  131. // 并发上传分片
  132. go scheduler(jobs, chunks)
  133. // 等待分配分片上传完成
  134. completed := 0
  135. parts := make([]UploadPart, len(chunks))
  136. for completed < len(chunks) {
  137. select {
  138. case part := <-results:
  139. completed++
  140. parts[part.PartNumber-1] = part
  141. case err := <-failed:
  142. close(die)
  143. bucket.AbortMultipartUpload(imur)
  144. return err
  145. }
  146. if completed >= len(chunks) {
  147. break
  148. }
  149. }
  150. // 提交任务
  151. _, err = bucket.CompleteMultipartUpload(imur, parts)
  152. if err != nil {
  153. bucket.AbortMultipartUpload(imur)
  154. return err
  155. }
  156. return nil
  157. }
  158. // ----- 并发带断点的上传 -----
  159. const uploadCpMagic = "FE8BB4EA-B593-4FAC-AD7A-2459A36E2E62"
  160. type uploadCheckpoint struct {
  161. Magic string // magic
  162. MD5 string // cp内容的MD5
  163. FilePath string // 本地文件
  164. FileStat cpStat // 文件状态
  165. ObjectKey string // key
  166. UploadID string // upload id
  167. Parts []cpPart // 本地文件的全部分片
  168. }
  169. type cpStat struct {
  170. Size int64 // 文件大小
  171. LastModified time.Time // 本地文件最后修改时间
  172. MD5 string // 本地文件MD5
  173. }
  174. type cpPart struct {
  175. Chunk FileChunk // 分片
  176. Part UploadPart // 上传完成的分片
  177. IsCompleted bool // upload是否完成
  178. }
  179. // CP数据是否有效,CP有效且文件没有更新时有效
  180. func (cp uploadCheckpoint) isValid(filePath string) (bool, error) {
  181. // 比较CP的Magic及MD5
  182. cpb := cp
  183. cpb.MD5 = ""
  184. js, _ := json.Marshal(cpb)
  185. sum := md5.Sum(js)
  186. b64 := base64.StdEncoding.EncodeToString(sum[:])
  187. if cp.Magic != uploadCpMagic || b64 != cp.MD5 {
  188. return false, nil
  189. }
  190. // 确认本地文件是否更新
  191. fd, err := os.Open(filePath)
  192. if err != nil {
  193. return false, err
  194. }
  195. defer fd.Close()
  196. st, err := fd.Stat()
  197. if err != nil {
  198. return false, err
  199. }
  200. md, err := calcFileMD5(filePath)
  201. if err != nil {
  202. return false, err
  203. }
  204. // 比较文件大小/文件最后更新时间/文件MD5
  205. if cp.FileStat.Size != st.Size() ||
  206. cp.FileStat.LastModified != st.ModTime() ||
  207. cp.FileStat.MD5 != md {
  208. return false, nil
  209. }
  210. return true, nil
  211. }
  212. // 从文件中load
  213. func (cp *uploadCheckpoint) load(filePath string) error {
  214. contents, err := ioutil.ReadFile(filePath)
  215. if err != nil {
  216. return err
  217. }
  218. err = json.Unmarshal(contents, cp)
  219. return err
  220. }
  221. // dump到文件
  222. func (cp *uploadCheckpoint) dump(filePath string) error {
  223. bcp := *cp
  224. // 计算MD5
  225. bcp.MD5 = ""
  226. js, err := json.Marshal(bcp)
  227. if err != nil {
  228. return err
  229. }
  230. sum := md5.Sum(js)
  231. b64 := base64.StdEncoding.EncodeToString(sum[:])
  232. bcp.MD5 = b64
  233. // 序列化
  234. js, err = json.Marshal(bcp)
  235. if err != nil {
  236. return err
  237. }
  238. // dump
  239. return ioutil.WriteFile(filePath, js, FilePermMode)
  240. }
  241. // 更新分片状态
  242. func (cp *uploadCheckpoint) updatePart(part UploadPart) {
  243. cp.Parts[part.PartNumber-1].Part = part
  244. cp.Parts[part.PartNumber-1].IsCompleted = true
  245. }
  246. // 未完成的分片
  247. func (cp *uploadCheckpoint) todoParts() []FileChunk {
  248. fcs := []FileChunk{}
  249. for _, part := range cp.Parts {
  250. if !part.IsCompleted {
  251. fcs = append(fcs, part.Chunk)
  252. }
  253. }
  254. return fcs
  255. }
  256. // 所有的分片
  257. func (cp *uploadCheckpoint) allParts() []UploadPart {
  258. ps := []UploadPart{}
  259. for _, part := range cp.Parts {
  260. ps = append(ps, part.Part)
  261. }
  262. return ps
  263. }
  264. // 计算文件文件MD5
  265. func calcFileMD5(filePath string) (string, error) {
  266. return "", nil
  267. }
  268. // 初始化分片上传
  269. func prepare(cp *uploadCheckpoint, objectKey, filePath string, partSize int64, bucket *Bucket, options []Option) error {
  270. // cp
  271. cp.Magic = uploadCpMagic
  272. cp.FilePath = filePath
  273. cp.ObjectKey = objectKey
  274. // localfile
  275. fd, err := os.Open(filePath)
  276. if err != nil {
  277. return err
  278. }
  279. defer fd.Close()
  280. st, err := fd.Stat()
  281. if err != nil {
  282. return err
  283. }
  284. cp.FileStat.Size = st.Size()
  285. cp.FileStat.LastModified = st.ModTime()
  286. md, err := calcFileMD5(filePath)
  287. if err != nil {
  288. return err
  289. }
  290. cp.FileStat.MD5 = md
  291. // chunks
  292. parts, err := SplitFileByPartSize(filePath, partSize)
  293. if err != nil {
  294. return err
  295. }
  296. cp.Parts = make([]cpPart, len(parts))
  297. for i, part := range parts {
  298. cp.Parts[i].Chunk = part
  299. cp.Parts[i].IsCompleted = false
  300. }
  301. // init load
  302. imur, err := bucket.InitiateMultipartUpload(objectKey, options...)
  303. if err != nil {
  304. return err
  305. }
  306. cp.UploadID = imur.UploadID
  307. return nil
  308. }
  309. // 提交分片上传,删除CP文件
  310. func complete(cp *uploadCheckpoint, bucket *Bucket, parts []UploadPart, cpFilePath string) error {
  311. imur := InitiateMultipartUploadResult{Bucket: bucket.BucketName,
  312. Key: cp.ObjectKey, UploadID: cp.UploadID}
  313. _, err := bucket.CompleteMultipartUpload(imur, parts)
  314. if err != nil {
  315. return err
  316. }
  317. os.Remove(cpFilePath)
  318. return err
  319. }
  320. // 并发带断点的上传
  321. func (bucket Bucket) uploadFileWithCp(objectKey, filePath string, partSize int64, options []Option, cpFilePath string, routines int) error {
  322. // LOAD CP数据
  323. ucp := uploadCheckpoint{}
  324. err := ucp.load(cpFilePath)
  325. if err != nil {
  326. os.Remove(cpFilePath)
  327. }
  328. // LOAD出错或数据无效重新初始化上传
  329. valid, err := ucp.isValid(filePath)
  330. if err != nil || !valid {
  331. if err = prepare(&ucp, objectKey, filePath, partSize, &bucket, options); err != nil {
  332. return err
  333. }
  334. os.Remove(cpFilePath)
  335. }
  336. chunks := ucp.todoParts()
  337. imur := InitiateMultipartUploadResult{
  338. Bucket: bucket.BucketName,
  339. Key: objectKey,
  340. UploadID: ucp.UploadID}
  341. jobs := make(chan FileChunk, len(chunks))
  342. results := make(chan UploadPart, len(chunks))
  343. failed := make(chan error)
  344. die := make(chan bool)
  345. // 启动工作协程
  346. arg := workerArg{&bucket, filePath, imur, uploadPartHooker}
  347. for w := 1; w <= routines; w++ {
  348. go worker(w, arg, jobs, results, failed, die)
  349. }
  350. // 并发上传分片
  351. go scheduler(jobs, chunks)
  352. // 等待分配分片上传完成
  353. completed := 0
  354. for completed < len(chunks) {
  355. select {
  356. case part := <-results:
  357. completed++
  358. ucp.updatePart(part)
  359. ucp.dump(cpFilePath)
  360. case err := <-failed:
  361. close(die)
  362. return err
  363. }
  364. if completed >= len(chunks) {
  365. break
  366. }
  367. }
  368. // 提交分片上传
  369. err = complete(&ucp, &bucket, ucp.allParts(), cpFilePath)
  370. return err
  371. }