multipart.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308
  1. package oss
  2. import (
  3. "bytes"
  4. "encoding/xml"
  5. "io"
  6. "net/http"
  7. "net/url"
  8. "os"
  9. "sort"
  10. "strconv"
  11. )
  12. // InitiateMultipartUpload initializes multipart upload
  13. //
  14. // objectKey object name
  15. // options the object constricts for upload. The valid options are CacheControl, ContentDisposition, ContentEncoding, Expires,
  16. // ServerSideEncryption, Meta, check out the following link:
  17. // https://help.aliyun.com/document_detail/oss/api-reference/multipart-upload/InitiateMultipartUpload.html
  18. //
  19. // InitiateMultipartUploadResult the return value of the InitiateMultipartUpload, which is used for calls later on such as UploadPartFromFile,UploadPartCopy.
  20. // error it's nil if the operation succeeds, otherwise it's an error object.
  21. //
  22. func (bucket Bucket) InitiateMultipartUpload(objectKey string, options ...Option) (InitiateMultipartUploadResult, error) {
  23. var imur InitiateMultipartUploadResult
  24. opts := AddContentType(options, objectKey)
  25. params, _ := GetRawParams(options)
  26. _, ok := params["sequential"]
  27. if ok {
  28. // convert "" to nil
  29. params["sequential"] = nil
  30. }
  31. params["uploads"] = nil
  32. resp, err := bucket.do("POST", objectKey, params, opts, nil, nil)
  33. if err != nil {
  34. return imur, err
  35. }
  36. defer resp.Body.Close()
  37. err = xmlUnmarshal(resp.Body, &imur)
  38. return imur, err
  39. }
  40. // UploadPart uploads parts
  41. //
  42. // After initializing a Multipart Upload, the upload Id and object key could be used for uploading the parts.
  43. // Each part has its part number (ranges from 1 to 10,000). And for each upload Id, the part number identifies the position of the part in the whole file.
  44. // And thus with the same part number and upload Id, another part upload will overwrite the data.
  45. // Except the last one, minimal part size is 100KB. There's no limit on the last part size.
  46. //
  47. // imur the returned value of InitiateMultipartUpload.
  48. // reader io.Reader the reader for the part's data.
  49. // size the part size.
  50. // partNumber the part number (ranges from 1 to 10,000). Invalid part number will lead to InvalidArgument error.
  51. //
  52. // UploadPart the return value of the upload part. It consists of PartNumber and ETag. It's valid when error is nil.
  53. // error it's nil if the operation succeeds, otherwise it's an error object.
  54. //
  55. func (bucket Bucket) UploadPart(imur InitiateMultipartUploadResult, reader io.Reader,
  56. partSize int64, partNumber int, options ...Option) (UploadPart, error) {
  57. request := &UploadPartRequest{
  58. InitResult: &imur,
  59. Reader: reader,
  60. PartSize: partSize,
  61. PartNumber: partNumber,
  62. }
  63. result, err := bucket.DoUploadPart(request, options)
  64. return result.Part, err
  65. }
  66. // UploadPartFromFile uploads part from the file.
  67. //
  68. // imur the return value of a successful InitiateMultipartUpload.
  69. // filePath the local file path to upload.
  70. // startPosition the start position in the local file.
  71. // partSize the part size.
  72. // partNumber the part number (from 1 to 10,000)
  73. //
  74. // UploadPart the return value consists of PartNumber and ETag.
  75. // error it's nil if the operation succeeds, otherwise it's an error object.
  76. //
  77. func (bucket Bucket) UploadPartFromFile(imur InitiateMultipartUploadResult, filePath string,
  78. startPosition, partSize int64, partNumber int, options ...Option) (UploadPart, error) {
  79. var part = UploadPart{}
  80. fd, err := os.Open(filePath)
  81. if err != nil {
  82. return part, err
  83. }
  84. defer fd.Close()
  85. fd.Seek(startPosition, os.SEEK_SET)
  86. request := &UploadPartRequest{
  87. InitResult: &imur,
  88. Reader: fd,
  89. PartSize: partSize,
  90. PartNumber: partNumber,
  91. }
  92. result, err := bucket.DoUploadPart(request, options)
  93. return result.Part, err
  94. }
  95. // DoUploadPart does the actual part upload.
  96. //
  97. // request part upload request
  98. //
  99. // UploadPartResult the result of uploading part.
  100. // error it's nil if the operation succeeds, otherwise it's an error object.
  101. //
  102. func (bucket Bucket) DoUploadPart(request *UploadPartRequest, options []Option) (*UploadPartResult, error) {
  103. listener := GetProgressListener(options)
  104. options = append(options, ContentLength(request.PartSize))
  105. params := map[string]interface{}{}
  106. params["partNumber"] = strconv.Itoa(request.PartNumber)
  107. params["uploadId"] = request.InitResult.UploadID
  108. resp, err := bucket.do("PUT", request.InitResult.Key, params, options,
  109. &io.LimitedReader{R: request.Reader, N: request.PartSize}, listener)
  110. if err != nil {
  111. return &UploadPartResult{}, err
  112. }
  113. defer resp.Body.Close()
  114. part := UploadPart{
  115. ETag: resp.Headers.Get(HTTPHeaderEtag),
  116. PartNumber: request.PartNumber,
  117. }
  118. if bucket.GetConfig().IsEnableCRC {
  119. err = CheckCRC(resp, "DoUploadPart")
  120. if err != nil {
  121. return &UploadPartResult{part}, err
  122. }
  123. }
  124. return &UploadPartResult{part}, nil
  125. }
  126. // UploadPartCopy uploads part copy
  127. //
  128. // imur the return value of InitiateMultipartUpload
  129. // copySrc source Object name
  130. // startPosition the part's start index in the source file
  131. // partSize the part size
  132. // partNumber the part number, ranges from 1 to 10,000. If it exceeds the range OSS returns InvalidArgument error.
  133. // options the constraints of source object for the copy. The copy happens only when these contraints are met. Otherwise it returns error.
  134. // CopySourceIfNoneMatch, CopySourceIfModifiedSince CopySourceIfUnmodifiedSince, check out the following link for the detail
  135. // https://help.aliyun.com/document_detail/oss/api-reference/multipart-upload/UploadPartCopy.html
  136. //
  137. // UploadPart the return value consists of PartNumber and ETag.
  138. // error it's nil if the operation succeeds, otherwise it's an error object.
  139. //
  140. func (bucket Bucket) UploadPartCopy(imur InitiateMultipartUploadResult, srcBucketName, srcObjectKey string,
  141. startPosition, partSize int64, partNumber int, options ...Option) (UploadPart, error) {
  142. var out UploadPartCopyResult
  143. var part UploadPart
  144. var opts []Option
  145. //first find version id
  146. versionIdKey := "versionId"
  147. versionId, _ := FindOption(options, versionIdKey, nil)
  148. if versionId == nil {
  149. opts = []Option{CopySource(srcBucketName, url.QueryEscape(srcObjectKey)),
  150. CopySourceRange(startPosition, partSize)}
  151. } else {
  152. opts = []Option{CopySourceVersion(srcBucketName, url.QueryEscape(srcObjectKey), versionId.(string)),
  153. CopySourceRange(startPosition, partSize)}
  154. options = DeleteOption(options, versionIdKey)
  155. }
  156. opts = append(opts, options...)
  157. params := map[string]interface{}{}
  158. params["partNumber"] = strconv.Itoa(partNumber)
  159. params["uploadId"] = imur.UploadID
  160. resp, err := bucket.do("PUT", imur.Key, params, opts, nil, nil)
  161. if err != nil {
  162. return part, err
  163. }
  164. defer resp.Body.Close()
  165. err = xmlUnmarshal(resp.Body, &out)
  166. if err != nil {
  167. return part, err
  168. }
  169. part.ETag = out.ETag
  170. part.PartNumber = partNumber
  171. return part, nil
  172. }
  173. // CompleteMultipartUpload completes the multipart upload.
  174. //
  175. // imur the return value of InitiateMultipartUpload.
  176. // parts the array of return value of UploadPart/UploadPartFromFile/UploadPartCopy.
  177. //
  178. // CompleteMultipartUploadResponse the return value when the call succeeds. Only valid when the error is nil.
  179. // error it's nil if the operation succeeds, otherwise it's an error object.
  180. //
  181. func (bucket Bucket) CompleteMultipartUpload(imur InitiateMultipartUploadResult,
  182. parts []UploadPart, options ...Option) (CompleteMultipartUploadResult, error) {
  183. var out CompleteMultipartUploadResult
  184. sort.Sort(UploadParts(parts))
  185. cxml := completeMultipartUploadXML{}
  186. cxml.Part = parts
  187. bs, err := xml.Marshal(cxml)
  188. if err != nil {
  189. return out, err
  190. }
  191. buffer := new(bytes.Buffer)
  192. buffer.Write(bs)
  193. params := map[string]interface{}{}
  194. params["uploadId"] = imur.UploadID
  195. resp, err := bucket.do("POST", imur.Key, params, options, buffer, nil)
  196. if err != nil {
  197. return out, err
  198. }
  199. defer resp.Body.Close()
  200. err = xmlUnmarshal(resp.Body, &out)
  201. return out, err
  202. }
  203. // AbortMultipartUpload aborts the multipart upload.
  204. //
  205. // imur the return value of InitiateMultipartUpload.
  206. //
  207. // error it's nil if the operation succeeds, otherwise it's an error object.
  208. //
  209. func (bucket Bucket) AbortMultipartUpload(imur InitiateMultipartUploadResult, options ...Option) error {
  210. params := map[string]interface{}{}
  211. params["uploadId"] = imur.UploadID
  212. resp, err := bucket.do("DELETE", imur.Key, params, options, nil, nil)
  213. if err != nil {
  214. return err
  215. }
  216. defer resp.Body.Close()
  217. return CheckRespCode(resp.StatusCode, []int{http.StatusNoContent})
  218. }
  219. // ListUploadedParts lists the uploaded parts.
  220. //
  221. // imur the return value of InitiateMultipartUpload.
  222. //
  223. // ListUploadedPartsResponse the return value if it succeeds, only valid when error is nil.
  224. // error it's nil if the operation succeeds, otherwise it's an error object.
  225. //
  226. func (bucket Bucket) ListUploadedParts(imur InitiateMultipartUploadResult, options ...Option) (ListUploadedPartsResult, error) {
  227. var out ListUploadedPartsResult
  228. options = append(options, EncodingType("url"))
  229. params := map[string]interface{}{}
  230. params, err := GetRawParams(options)
  231. if err != nil {
  232. return out, err
  233. }
  234. params["uploadId"] = imur.UploadID
  235. resp, err := bucket.do("GET", imur.Key, params, options, nil, nil)
  236. if err != nil {
  237. return out, err
  238. }
  239. defer resp.Body.Close()
  240. err = xmlUnmarshal(resp.Body, &out)
  241. if err != nil {
  242. return out, err
  243. }
  244. err = decodeListUploadedPartsResult(&out)
  245. return out, err
  246. }
  247. // ListMultipartUploads lists all ongoing multipart upload tasks
  248. //
  249. // options listObject's filter. Prefix specifies the returned object's prefix; KeyMarker specifies the returned object's start point in lexicographic order;
  250. // MaxKeys specifies the max entries to return; Delimiter is the character for grouping object keys.
  251. //
  252. // ListMultipartUploadResponse the return value if it succeeds, only valid when error is nil.
  253. // error it's nil if the operation succeeds, otherwise it's an error object.
  254. //
  255. func (bucket Bucket) ListMultipartUploads(options ...Option) (ListMultipartUploadResult, error) {
  256. var out ListMultipartUploadResult
  257. options = append(options, EncodingType("url"))
  258. params, err := GetRawParams(options)
  259. if err != nil {
  260. return out, err
  261. }
  262. params["uploads"] = nil
  263. resp, err := bucket.do("GET", "", params, options, nil, nil)
  264. if err != nil {
  265. return out, err
  266. }
  267. defer resp.Body.Close()
  268. err = xmlUnmarshal(resp.Body, &out)
  269. if err != nil {
  270. return out, err
  271. }
  272. err = decodeListMultipartUploadResult(&out)
  273. return out, err
  274. }