multipart.go 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. package oss
  2. import (
  3. "bytes"
  4. "encoding/xml"
  5. "io"
  6. "net/http"
  7. "net/url"
  8. "os"
  9. "sort"
  10. "strconv"
  11. )
  12. // InitiateMultipartUpload initializes multipart upload
  13. //
  14. // objectKey object name
  15. // options the object constricts for upload. The valid options are CacheControl, ContentDisposition, ContentEncoding, Expires,
  16. // ServerSideEncryption, Meta, check out the following link:
  17. // https://help.aliyun.com/document_detail/oss/api-reference/multipart-upload/InitiateMultipartUpload.html
  18. //
  19. // InitiateMultipartUploadResult the return value of the InitiateMultipartUpload, which is used for calls later on such as UploadPartFromFile,UploadPartCopy.
  20. // error it's nil if the operation succeeds, otherwise it's an error object.
  21. //
  22. func (bucket Bucket) InitiateMultipartUpload(objectKey string, options ...Option) (InitiateMultipartUploadResult, error) {
  23. var imur InitiateMultipartUploadResult
  24. opts := addContentType(options, objectKey)
  25. params := map[string]interface{}{}
  26. params["uploads"] = nil
  27. resp, err := bucket.do("POST", objectKey, params, opts, nil, nil)
  28. if err != nil {
  29. return imur, err
  30. }
  31. defer resp.Body.Close()
  32. err = xmlUnmarshal(resp.Body, &imur)
  33. return imur, err
  34. }
  35. // UploadPart uploads parts
  36. //
  37. // After initializing a Multipart Upload, the upload Id and object key could be used for uploading the parts.
  38. // Each part has its part number (ranges from 1 to 10,000). And for each upload Id, the part number identifies the position of the part in the whole file.
  39. // And thus with the same part number and upload Id, another part upload will overwrite the data.
  40. // Except the last one, minimal part size is 100KB. There's no limit on the last part size.
  41. //
  42. // imur the returned value of InitiateMultipartUpload.
  43. // reader io.Reader the reader for the part's data.
  44. // size the part size.
  45. // partNumber the part number (ranges from 1 to 10,000). Invalid part number will lead to InvalidArgument error.
  46. //
  47. // UploadPart the return value of the upload part. It consists of PartNumber and ETag. It's valid when error is nil.
  48. // error it's nil if the operation succeeds, otherwise it's an error object.
  49. //
  50. func (bucket Bucket) UploadPart(imur InitiateMultipartUploadResult, reader io.Reader,
  51. partSize int64, partNumber int, options ...Option) (UploadPart, error) {
  52. request := &UploadPartRequest{
  53. InitResult: &imur,
  54. Reader: reader,
  55. PartSize: partSize,
  56. PartNumber: partNumber,
  57. }
  58. result, err := bucket.DoUploadPart(request, options)
  59. return result.Part, err
  60. }
  61. // UploadPartFromFile uploads part from the file.
  62. //
  63. // imur the return value of a successful InitiateMultipartUpload.
  64. // filePath the local file path to upload.
  65. // startPosition the start position in the local file.
  66. // partSize the part size.
  67. // partNumber the part number (from 1 to 10,000)
  68. //
  69. // UploadPart the return value consists of PartNumber and ETag.
  70. // error it's nil if the operation succeeds, otherwise it's an error object.
  71. //
  72. func (bucket Bucket) UploadPartFromFile(imur InitiateMultipartUploadResult, filePath string,
  73. startPosition, partSize int64, partNumber int, options ...Option) (UploadPart, error) {
  74. var part = UploadPart{}
  75. fd, err := os.Open(filePath)
  76. if err != nil {
  77. return part, err
  78. }
  79. defer fd.Close()
  80. fd.Seek(startPosition, os.SEEK_SET)
  81. request := &UploadPartRequest{
  82. InitResult: &imur,
  83. Reader: fd,
  84. PartSize: partSize,
  85. PartNumber: partNumber,
  86. }
  87. result, err := bucket.DoUploadPart(request, options)
  88. return result.Part, err
  89. }
  90. // DoUploadPart does the actual part upload.
  91. //
  92. // request part upload request
  93. //
  94. // UploadPartResult the result of uploading part.
  95. // error it's nil if the operation succeeds, otherwise it's an error object.
  96. //
  97. func (bucket Bucket) DoUploadPart(request *UploadPartRequest, options []Option) (*UploadPartResult, error) {
  98. listener := getProgressListener(options)
  99. options = append(options, ContentLength(request.PartSize))
  100. params := map[string]interface{}{}
  101. params["partNumber"] = strconv.Itoa(request.PartNumber)
  102. params["uploadId"] = request.InitResult.UploadID
  103. resp, err := bucket.do("PUT", request.InitResult.Key, params, options,
  104. &io.LimitedReader{R: request.Reader, N: request.PartSize}, listener)
  105. if err != nil {
  106. return &UploadPartResult{}, err
  107. }
  108. defer resp.Body.Close()
  109. part := UploadPart{
  110. ETag: resp.Headers.Get(HTTPHeaderEtag),
  111. PartNumber: request.PartNumber,
  112. }
  113. if bucket.getConfig().IsEnableCRC {
  114. err = checkCRC(resp, "DoUploadPart")
  115. if err != nil {
  116. return &UploadPartResult{part}, err
  117. }
  118. }
  119. return &UploadPartResult{part}, nil
  120. }
  121. // UploadPartCopy uploads part copy
  122. //
  123. // imur the return value of InitiateMultipartUpload
  124. // copySrc source Object name
  125. // startPosition the part's start index in the source file
  126. // partSize the part size
  127. // partNumber the part number, ranges from 1 to 10,000. If it exceeds the range OSS returns InvalidArgument error.
  128. // options the constraints of source object for the copy. The copy happens only when these contraints are met. Otherwise it returns error.
  129. // CopySourceIfNoneMatch, CopySourceIfModifiedSince CopySourceIfUnmodifiedSince, check out the following link for the detail
  130. // https://help.aliyun.com/document_detail/oss/api-reference/multipart-upload/UploadPartCopy.html
  131. //
  132. // UploadPart the return value consists of PartNumber and ETag.
  133. // error it's nil if the operation succeeds, otherwise it's an error object.
  134. //
  135. func (bucket Bucket) UploadPartCopy(imur InitiateMultipartUploadResult, srcBucketName, srcObjectKey string,
  136. startPosition, partSize int64, partNumber int, options ...Option) (UploadPart, error) {
  137. var out UploadPartCopyResult
  138. var part UploadPart
  139. opts := []Option{CopySource(srcBucketName, url.QueryEscape(srcObjectKey)),
  140. CopySourceRange(startPosition, partSize)}
  141. opts = append(opts, options...)
  142. params := map[string]interface{}{}
  143. params["partNumber"] = strconv.Itoa(partNumber)
  144. params["uploadId"] = imur.UploadID
  145. resp, err := bucket.do("PUT", imur.Key, params, opts, nil, nil)
  146. if err != nil {
  147. return part, err
  148. }
  149. defer resp.Body.Close()
  150. err = xmlUnmarshal(resp.Body, &out)
  151. if err != nil {
  152. return part, err
  153. }
  154. part.ETag = out.ETag
  155. part.PartNumber = partNumber
  156. return part, nil
  157. }
  158. // CompleteMultipartUpload completes the multipart upload.
  159. //
  160. // imur the return value of InitiateMultipartUpload.
  161. // parts the array of return value of UploadPart/UploadPartFromFile/UploadPartCopy.
  162. //
  163. // CompleteMultipartUploadResponse the return value when the call succeeds. Only valid when the error is nil.
  164. // error it's nil if the operation succeeds, otherwise it's an error object.
  165. //
  166. func (bucket Bucket) CompleteMultipartUpload(imur InitiateMultipartUploadResult,
  167. parts []UploadPart, options ...Option) (CompleteMultipartUploadResult, error) {
  168. var out CompleteMultipartUploadResult
  169. sort.Sort(uploadParts(parts))
  170. cxml := completeMultipartUploadXML{}
  171. cxml.Part = parts
  172. bs, err := xml.Marshal(cxml)
  173. if err != nil {
  174. return out, err
  175. }
  176. buffer := new(bytes.Buffer)
  177. buffer.Write(bs)
  178. params := map[string]interface{}{}
  179. params["uploadId"] = imur.UploadID
  180. resp, err := bucket.do("POST", imur.Key, params, options, buffer, nil)
  181. if err != nil {
  182. return out, err
  183. }
  184. defer resp.Body.Close()
  185. err = xmlUnmarshal(resp.Body, &out)
  186. return out, err
  187. }
  188. // AbortMultipartUpload aborts the multipart upload.
  189. //
  190. // imur the return value of InitiateMultipartUpload.
  191. //
  192. // error it's nil if the operation succeeds, otherwise it's an error object.
  193. //
  194. func (bucket Bucket) AbortMultipartUpload(imur InitiateMultipartUploadResult, options ...Option) error {
  195. params := map[string]interface{}{}
  196. params["uploadId"] = imur.UploadID
  197. resp, err := bucket.do("DELETE", imur.Key, params, options, nil, nil)
  198. if err != nil {
  199. return err
  200. }
  201. defer resp.Body.Close()
  202. return checkRespCode(resp.StatusCode, []int{http.StatusNoContent})
  203. }
  204. // ListUploadedParts lists the uploaded parts.
  205. //
  206. // imur the return value of InitiateMultipartUpload.
  207. //
  208. // ListUploadedPartsResponse the return value if it succeeds, only valid when error is nil.
  209. // error it's nil if the operation succeeds, otherwise it's an error object.
  210. //
  211. func (bucket Bucket) ListUploadedParts(imur InitiateMultipartUploadResult, options ...Option) (ListUploadedPartsResult, error) {
  212. var out ListUploadedPartsResult
  213. options = append(options, EncodingType("url"))
  214. params := map[string]interface{}{}
  215. params, err := getRawParams(options)
  216. if err != nil {
  217. return out, err
  218. }
  219. params["uploadId"] = imur.UploadID
  220. resp, err := bucket.do("GET", imur.Key, params, nil, nil, nil)
  221. if err != nil {
  222. return out, err
  223. }
  224. defer resp.Body.Close()
  225. err = xmlUnmarshal(resp.Body, &out)
  226. if err != nil {
  227. return out, err
  228. }
  229. err = decodeListUploadedPartsResult(&out)
  230. return out, err
  231. }
  232. // ListMultipartUploads lists all ongoing multipart upload tasks
  233. //
  234. // options listObject's filter. Prefix specifies the returned object's prefix; KeyMarker specifies the returned object's start point in lexicographic order;
  235. // MaxKeys specifies the max entries to return; Delimiter is the character for grouping object keys.
  236. //
  237. // ListMultipartUploadResponse the return value if it succeeds, only valid when error is nil.
  238. // error it's nil if the operation succeeds, otherwise it's an error object.
  239. //
  240. func (bucket Bucket) ListMultipartUploads(options ...Option) (ListMultipartUploadResult, error) {
  241. var out ListMultipartUploadResult
  242. options = append(options, EncodingType("url"))
  243. params, err := getRawParams(options)
  244. if err != nil {
  245. return out, err
  246. }
  247. params["uploads"] = nil
  248. resp, err := bucket.do("GET", "", params, options, nil, nil)
  249. if err != nil {
  250. return out, err
  251. }
  252. defer resp.Body.Close()
  253. err = xmlUnmarshal(resp.Body, &out)
  254. if err != nil {
  255. return out, err
  256. }
  257. err = decodeListMultipartUploadResult(&out)
  258. return out, err
  259. }