multipart.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. package oss
  2. import (
  3. "bytes"
  4. "encoding/xml"
  5. "io"
  6. "net/http"
  7. "os"
  8. "sort"
  9. "strconv"
  10. )
  11. // InitiateMultipartUpload initializes multipart upload
  12. //
  13. // objectKey object name
  14. // options the object constricts for upload. The valid options are CacheControl, ContentDisposition, ContentEncoding, Expires,
  15. // ServerSideEncryption, Meta, check out the following link:
  16. // https://help.aliyun.com/document_detail/oss/api-reference/multipart-upload/InitiateMultipartUpload.html
  17. //
  18. // InitiateMultipartUploadResult the return value of the InitiateMultipartUpload, which is used for calls later on such as UploadPartFromFile,UploadPartCopy.
  19. // error it's nil if the operation succeeds, otherwise it's an error object.
  20. //
  21. func (bucket Bucket) InitiateMultipartUpload(objectKey string, options ...Option) (InitiateMultipartUploadResult, error) {
  22. var imur InitiateMultipartUploadResult
  23. opts := addContentType(options, objectKey)
  24. params := map[string]interface{}{}
  25. params["uploads"] = nil
  26. resp, err := bucket.do("POST", objectKey, params, opts, nil, nil)
  27. if err != nil {
  28. return imur, err
  29. }
  30. defer resp.Body.Close()
  31. err = xmlUnmarshal(resp.Body, &imur)
  32. return imur, err
  33. }
  34. // UploadPart uploads parts
  35. //
  36. // After initializing a Multipart Upload, the upload Id and object key could be used for uploading the parts.
  37. // Each part has its part number (ranges from 1 to 10,000). And for each upload Id, the part number identifies the position of the part in the whole file.
  38. // And thus with the same part number and upload Id, another part upload will overwrite the data.
  39. // Except the last one, minimal part size is 100KB. There's no limit on the last part size.
  40. //
  41. // imur the returned value of InitiateMultipartUpload.
  42. // reader io.Reader the reader for the part's data.
  43. // size the part size.
  44. // partNumber the part number (ranges from 1 to 10,000). Invalid part number will lead to InvalidArgument error.
  45. //
  46. // UploadPart the return value of the upload part. It consists of PartNumber and ETag. It's valid when error is nil.
  47. // error it's nil if the operation succeeds, otherwise it's an error object.
  48. //
  49. func (bucket Bucket) UploadPart(imur InitiateMultipartUploadResult, reader io.Reader,
  50. partSize int64, partNumber int, options ...Option) (UploadPart, error) {
  51. request := &UploadPartRequest{
  52. InitResult: &imur,
  53. Reader: reader,
  54. PartSize: partSize,
  55. PartNumber: partNumber,
  56. }
  57. result, err := bucket.DoUploadPart(request, options)
  58. return result.Part, err
  59. }
  60. // UploadPartFromFile uploads part from the file.
  61. //
  62. // imur the return value of a successful InitiateMultipartUpload.
  63. // filePath the local file path to upload.
  64. // startPosition the start position in the local file.
  65. // partSize the part size.
  66. // partNumber the part number (from 1 to 10,000)
  67. //
  68. // UploadPart the return value consists of PartNumber and ETag.
  69. // error it's nil if the operation succeeds, otherwise it's an error object.
  70. //
  71. func (bucket Bucket) UploadPartFromFile(imur InitiateMultipartUploadResult, filePath string,
  72. startPosition, partSize int64, partNumber int, options ...Option) (UploadPart, error) {
  73. var part = UploadPart{}
  74. fd, err := os.Open(filePath)
  75. if err != nil {
  76. return part, err
  77. }
  78. defer fd.Close()
  79. fd.Seek(startPosition, os.SEEK_SET)
  80. request := &UploadPartRequest{
  81. InitResult: &imur,
  82. Reader: fd,
  83. PartSize: partSize,
  84. PartNumber: partNumber,
  85. }
  86. result, err := bucket.DoUploadPart(request, options)
  87. return result.Part, err
  88. }
  89. // DoUploadPart does the actual part upload.
  90. //
  91. // request part upload request
  92. //
  93. // UploadPartResult the result of uploading part.
  94. // error it's nil if the operation succeeds, otherwise it's an error object.
  95. //
  96. func (bucket Bucket) DoUploadPart(request *UploadPartRequest, options []Option) (*UploadPartResult, error) {
  97. listener := getProgressListener(options)
  98. opts := []Option{ContentLength(request.PartSize)}
  99. params := map[string]interface{}{}
  100. params["partNumber"] = strconv.Itoa(request.PartNumber)
  101. params["uploadId"] = request.InitResult.UploadID
  102. resp, err := bucket.do("PUT", request.InitResult.Key, params, opts,
  103. &io.LimitedReader{R: request.Reader, N: request.PartSize}, listener)
  104. if err != nil {
  105. return &UploadPartResult{}, err
  106. }
  107. defer resp.Body.Close()
  108. part := UploadPart{
  109. ETag: resp.Headers.Get(HTTPHeaderEtag),
  110. PartNumber: request.PartNumber,
  111. }
  112. if bucket.getConfig().IsEnableCRC {
  113. err = checkCRC(resp, "DoUploadPart")
  114. if err != nil {
  115. return &UploadPartResult{part}, err
  116. }
  117. }
  118. return &UploadPartResult{part}, nil
  119. }
  120. // UploadPartCopy uploads part copy
  121. //
  122. // imur the return value of InitiateMultipartUpload
  123. // copySrc source Object name
  124. // startPosition the part's start index in the source file
  125. // partSize the part size
  126. // partNumber the part number, ranges from 1 to 10,000. If it exceeds the range OSS returns InvalidArgument error.
  127. // options the constraints of source object for the copy. The copy happens only when these contraints are met. Otherwise it returns error.
  128. // CopySourceIfNoneMatch, CopySourceIfModifiedSince CopySourceIfUnmodifiedSince, check out the following link for the detail
  129. // https://help.aliyun.com/document_detail/oss/api-reference/multipart-upload/UploadPartCopy.html
  130. //
  131. // UploadPart the return value consists of PartNumber and ETag.
  132. // error it's nil if the operation succeeds, otherwise it's an error object.
  133. //
  134. func (bucket Bucket) UploadPartCopy(imur InitiateMultipartUploadResult, srcBucketName, srcObjectKey string,
  135. startPosition, partSize int64, partNumber int, options ...Option) (UploadPart, error) {
  136. var out UploadPartCopyResult
  137. var part UploadPart
  138. opts := []Option{CopySource(srcBucketName, srcObjectKey),
  139. CopySourceRange(startPosition, partSize)}
  140. opts = append(opts, options...)
  141. params := map[string]interface{}{}
  142. params["partNumber"] = strconv.Itoa(partNumber)
  143. params["uploadId"] = imur.UploadID
  144. resp, err := bucket.do("PUT", imur.Key, params, opts, nil, nil)
  145. if err != nil {
  146. return part, err
  147. }
  148. defer resp.Body.Close()
  149. err = xmlUnmarshal(resp.Body, &out)
  150. if err != nil {
  151. return part, err
  152. }
  153. part.ETag = out.ETag
  154. part.PartNumber = partNumber
  155. return part, nil
  156. }
  157. // CompleteMultipartUpload completes the multipart upload.
  158. //
  159. // imur the return value of InitiateMultipartUpload.
  160. // parts the array of return value of UploadPart/UploadPartFromFile/UploadPartCopy.
  161. //
  162. // CompleteMultipartUploadResponse the return value when the call succeeds. Only valid when the error is nil.
  163. // error it's nil if the operation succeeds, otherwise it's an error object.
  164. //
  165. func (bucket Bucket) CompleteMultipartUpload(imur InitiateMultipartUploadResult,
  166. parts []UploadPart) (CompleteMultipartUploadResult, error) {
  167. var out CompleteMultipartUploadResult
  168. sort.Sort(uploadParts(parts))
  169. cxml := completeMultipartUploadXML{}
  170. cxml.Part = parts
  171. bs, err := xml.Marshal(cxml)
  172. if err != nil {
  173. return out, err
  174. }
  175. buffer := new(bytes.Buffer)
  176. buffer.Write(bs)
  177. params := map[string]interface{}{}
  178. params["uploadId"] = imur.UploadID
  179. resp, err := bucket.do("POST", imur.Key, params, nil, buffer, nil)
  180. if err != nil {
  181. return out, err
  182. }
  183. defer resp.Body.Close()
  184. err = xmlUnmarshal(resp.Body, &out)
  185. return out, err
  186. }
  187. // AbortMultipartUpload aborts the multipart upload.
  188. //
  189. // imur the return value of InitiateMultipartUpload.
  190. //
  191. // error it's nil if the operation succeeds, otherwise it's an error object.
  192. //
  193. func (bucket Bucket) AbortMultipartUpload(imur InitiateMultipartUploadResult) error {
  194. params := map[string]interface{}{}
  195. params["uploadId"] = imur.UploadID
  196. resp, err := bucket.do("DELETE", imur.Key, params, nil, nil, nil)
  197. if err != nil {
  198. return err
  199. }
  200. defer resp.Body.Close()
  201. return checkRespCode(resp.StatusCode, []int{http.StatusNoContent})
  202. }
  203. // ListUploadedParts lists the uploaded parts.
  204. //
  205. // imur the return value of InitiateMultipartUpload.
  206. //
  207. // ListUploadedPartsResponse the return value if it succeeds, only valid when error is nil.
  208. // error it's nil if the operation succeeds, otherwise it's an error object.
  209. //
  210. func (bucket Bucket) ListUploadedParts(imur InitiateMultipartUploadResult) (ListUploadedPartsResult, error) {
  211. var out ListUploadedPartsResult
  212. params := map[string]interface{}{}
  213. params["uploadId"] = imur.UploadID
  214. resp, err := bucket.do("GET", imur.Key, params, nil, nil, nil)
  215. if err != nil {
  216. return out, err
  217. }
  218. defer resp.Body.Close()
  219. err = xmlUnmarshal(resp.Body, &out)
  220. return out, err
  221. }
  222. // ListMultipartUploads lists all ongoing multipart upload tasks
  223. //
  224. // options listObject's filter. Prefix specifies the returned object's prefix; KeyMarker specifies the returned object's start point in lexicographic order;
  225. // MaxKeys specifies the max entries to return; Delimiter is the character for grouping object keys.
  226. //
  227. // ListMultipartUploadResponse the return value if it succeeds, only valid when error is nil.
  228. // error it's nil if the operation succeeds, otherwise it's an error object.
  229. //
  230. func (bucket Bucket) ListMultipartUploads(options ...Option) (ListMultipartUploadResult, error) {
  231. var out ListMultipartUploadResult
  232. options = append(options, EncodingType("url"))
  233. params, err := getRawParams(options)
  234. if err != nil {
  235. return out, err
  236. }
  237. params["uploads"] = nil
  238. resp, err := bucket.do("GET", "", params, nil, nil, nil)
  239. if err != nil {
  240. return out, err
  241. }
  242. defer resp.Body.Close()
  243. err = xmlUnmarshal(resp.Body, &out)
  244. if err != nil {
  245. return out, err
  246. }
  247. err = decodeListMultipartUploadResult(&out)
  248. return out, err
  249. }