鸣镝 9 년 전
부모
커밋
0b4fbeec3f
15개의 변경된 파일784개의 추가작업 그리고 85개의 파일을 삭제
  1. 1 1
      README.md
  2. 39 21
      oss/bucket.go
  3. 1 1
      oss/client.go
  4. 4 4
      oss/client_test.go
  5. 44 30
      oss/conn.go
  6. 1 1
      oss/const.go
  7. 2 2
      oss/crc_test.go
  8. 58 2
      oss/download.go
  9. 4 0
      oss/model.go
  10. 47 0
      oss/multicopy.go
  11. 16 12
      oss/multipart.go
  12. 15 9
      oss/option.go
  13. 116 0
      oss/progress.go
  14. 377 0
      oss/progress_test.go
  15. 59 2
      oss/upload.go

+ 1 - 1
README.md

@@ -9,7 +9,7 @@
 > - 使用此SDK,用户可以方便地在任何应用、任何时间、任何地点上传,下载和管理数据。
 
 ## 版本
-> - 当前版本:1.2.3
+> - 当前版本:1.3.0
 
 ## 运行环境
 > - 推荐使用Go 1.4及以上。

+ 39 - 21
oss/bucket.go

@@ -5,6 +5,7 @@ import (
 	"crypto/md5"
 	"encoding/base64"
 	"encoding/xml"
+	"hash"
 	"hash/crc64"
 	"io"
 	"io/ioutil"
@@ -93,7 +94,11 @@ func (bucket Bucket) DoPutObject(request *PutObjectRequest, options []Option) (*
 		options = addContentType(options, request.ObjectKey)
 	}
 
-	resp, err := bucket.do("PUT", request.ObjectKey, "", "", options, request.Reader)
+	if request.Listener == nil {
+		request.Listener = getProgressListener(options)
+	}
+
+	resp, err := bucket.do("PUT", request.ObjectKey, "", "", options, request.Reader, request.Listener)
 	if err != nil {
 		return nil, err
 	}
@@ -122,7 +127,7 @@ func (bucket Bucket) DoPutObject(request *PutObjectRequest, options []Option) (*
 // error  操作无错误为nil,非nil为错误信息。
 //
 func (bucket Bucket) GetObject(objectKey string, options ...Option) (io.ReadCloser, error) {
-	result, err := bucket.DoGetObject(&GetObjectRequest{objectKey}, options)
+	result, err := bucket.DoGetObject(&GetObjectRequest{objectKey, nil}, options)
 	if err != nil {
 		return nil, err
 	}
@@ -142,7 +147,7 @@ func (bucket Bucket) GetObjectToFile(objectKey, filePath string, options ...Opti
 	tempFilePath := filePath + TempFileSuffix
 
 	// 读取Object内容
-	result, err := bucket.DoGetObject(&GetObjectRequest{objectKey}, options)
+	result, err := bucket.DoGetObject(&GetObjectRequest{objectKey, nil}, options)
 	if err != nil {
 		return err
 	}
@@ -185,7 +190,7 @@ func (bucket Bucket) GetObjectToFile(objectKey, filePath string, options ...Opti
 // error  操作无错误为nil,非nil为错误信息。
 //
 func (bucket Bucket) DoGetObject(request *GetObjectRequest, options []Option) (*GetObjectResult, error) {
-	resp, err := bucket.do("GET", request.ObjectKey, "", "", options, nil)
+	resp, err := bucket.do("GET", request.ObjectKey, "", "", options, nil, nil)
 	if err != nil {
 		return nil, err
 	}
@@ -194,14 +199,22 @@ func (bucket Bucket) DoGetObject(request *GetObjectRequest, options []Option) (*
 		Response: resp,
 	}
 
+	// crc
+	var crcCalc hash.Hash64
 	hasRange, _, _ := isOptionSet(options, HTTPHeaderRange)
 	if bucket.getConfig().IsEnableCRC && !hasRange {
-		crcCalc := crc64.New(crcTable())
-		resp.Body = ioutil.NopCloser(io.TeeReader(resp.Body, crcCalc))
+		crcCalc = crc64.New(crcTable())
 		result.ServerCRC = resp.ServerCRC
 		result.ClientCRC = crcCalc
 	}
 
+	// progress
+	if request.Listener == nil {
+		request.Listener = getProgressListener(options)
+	}
+	contentLen, _ := strconv.ParseInt(resp.Headers.Get(HTTPHeaderContentLength), 10, 64)
+	resp.Body = ioutil.NopCloser(TeeReader(resp.Body, crcCalc, contentLen, request.Listener, nil))
+
 	return result, nil
 }
 
@@ -221,7 +234,7 @@ func (bucket Bucket) DoGetObject(request *GetObjectRequest, options []Option) (*
 func (bucket Bucket) CopyObject(srcObjectKey, destObjectKey string, options ...Option) (CopyObjectResult, error) {
 	var out CopyObjectResult
 	options = append(options, CopySource(bucket.BucketName, url.QueryEscape(srcObjectKey)))
-	resp, err := bucket.do("PUT", destObjectKey, "", "", options, nil)
+	resp, err := bucket.do("PUT", destObjectKey, "", "", options, nil, nil)
 	if err != nil {
 		return out, err
 	}
@@ -274,7 +287,7 @@ func (bucket Bucket) copy(srcObjectKey, destBucketName, destObjectKey string, op
 	if err != nil {
 		return out, err
 	}
-	resp, err := bucket.Client.Conn.Do("PUT", destBucketName, destObjectKey, "", "", headers, nil, 0)
+	resp, err := bucket.Client.Conn.Do("PUT", destBucketName, destObjectKey, "", "", headers, nil, 0, nil)
 	if err != nil {
 		return out, err
 	}
@@ -332,11 +345,16 @@ func (bucket Bucket) DoAppendObject(request *AppendObjectRequest, options []Opti
 	var initCRC uint64
 	isCRCSet, initCRCStr, _ := isOptionSet(options, initCRC64)
 	if isCRCSet {
-		initCRC, _ = strconv.ParseUint(initCRCStr, 10, 64)
+		initCRC, _ = strconv.ParseUint(initCRCStr.(string), 10, 64)
+	}
+
+	if request.Listener == nil {
+		request.Listener = getProgressListener(options)
 	}
 
 	handleOptions(headers, opts)
-	resp, err := bucket.Client.Conn.Do("POST", bucket.BucketName, request.ObjectKey, params, params, headers, request.Reader, initCRC)
+	resp, err := bucket.Client.Conn.Do("POST", bucket.BucketName, request.ObjectKey, params, params, headers,
+		request.Reader, initCRC, request.Listener)
 	if err != nil {
 		return nil, err
 	}
@@ -366,7 +384,7 @@ func (bucket Bucket) DoAppendObject(request *AppendObjectRequest, options []Opti
 // error 操作无错误为nil,非nil为错误信息。
 //
 func (bucket Bucket) DeleteObject(objectKey string) error {
-	resp, err := bucket.do("DELETE", objectKey, "", "", nil, nil)
+	resp, err := bucket.do("DELETE", objectKey, "", "", nil, nil, nil)
 	if err != nil {
 		return err
 	}
@@ -390,7 +408,7 @@ func (bucket Bucket) DeleteObjects(objectKeys []string, options ...Option) (Dele
 		dxml.Objects = append(dxml.Objects, DeleteObject{Key: key})
 	}
 	isQuietStr, _ := findOption(options, deleteObjectsQuiet, "FALSE")
-	isQuiet, _ := strconv.ParseBool(isQuietStr)
+	isQuiet, _ := strconv.ParseBool(isQuietStr.(string))
 	dxml.Quiet = isQuiet
 	encode := "&encoding-type=url"
 
@@ -406,7 +424,7 @@ func (bucket Bucket) DeleteObjects(objectKeys []string, options ...Option) (Dele
 	sum := md5.Sum(bs)
 	b64 := base64.StdEncoding.EncodeToString(sum[:])
 	options = append(options, ContentMD5(b64))
-	resp, err := bucket.do("POST", "", "delete"+encode, "delete", options, buffer)
+	resp, err := bucket.do("POST", "", "delete"+encode, "delete", options, buffer, nil)
 	if err != nil {
 		return out, err
 	}
@@ -467,7 +485,7 @@ func (bucket Bucket) ListObjects(options ...Option) (ListObjectsResult, error) {
 		return out, err
 	}
 
-	resp, err := bucket.do("GET", "", params, "", nil, nil)
+	resp, err := bucket.do("GET", "", params, "", nil, nil, nil)
 	if err != nil {
 		return out, err
 	}
@@ -508,7 +526,7 @@ func (bucket Bucket) SetObjectMeta(objectKey string, options ...Option) error {
 // error  操作无错误为nil,非nil为错误信息。
 //
 func (bucket Bucket) GetObjectDetailedMeta(objectKey string, options ...Option) (http.Header, error) {
-	resp, err := bucket.do("HEAD", objectKey, "", "", options, nil)
+	resp, err := bucket.do("HEAD", objectKey, "", "", options, nil, nil)
 	if err != nil {
 		return nil, err
 	}
@@ -529,7 +547,7 @@ func (bucket Bucket) GetObjectDetailedMeta(objectKey string, options ...Option)
 // error 操作无错误为nil,非nil为错误信息。
 //
 func (bucket Bucket) GetObjectMeta(objectKey string) (http.Header, error) {
-	resp, err := bucket.do("GET", objectKey, "?objectMeta", "", nil, nil)
+	resp, err := bucket.do("GET", objectKey, "?objectMeta", "", nil, nil, nil)
 	if err != nil {
 		return nil, err
 	}
@@ -557,7 +575,7 @@ func (bucket Bucket) GetObjectMeta(objectKey string) (http.Header, error) {
 //
 func (bucket Bucket) SetObjectACL(objectKey string, objectACL ACLType) error {
 	options := []Option{ObjectACL(objectACL)}
-	resp, err := bucket.do("PUT", objectKey, "acl", "acl", options, nil)
+	resp, err := bucket.do("PUT", objectKey, "acl", "acl", options, nil, nil)
 	if err != nil {
 		return err
 	}
@@ -575,7 +593,7 @@ func (bucket Bucket) SetObjectACL(objectKey string, objectACL ACLType) error {
 //
 func (bucket Bucket) GetObjectACL(objectKey string) (GetObjectACLResult, error) {
 	var out GetObjectACLResult
-	resp, err := bucket.do("GET", objectKey, "acl", "acl", nil, nil)
+	resp, err := bucket.do("GET", objectKey, "acl", "acl", nil, nil, nil)
 	if err != nil {
 		return out, err
 	}
@@ -586,15 +604,15 @@ func (bucket Bucket) GetObjectACL(objectKey string) (GetObjectACLResult, error)
 }
 
 // Private
-func (bucket Bucket) do(method, objectName, urlParams, subResource string,
-	options []Option, data io.Reader) (*Response, error) {
+func (bucket Bucket) do(method, objectName, urlParams, subResource string, options []Option,
+	data io.Reader, listener ProgressListener) (*Response, error) {
 	headers := make(map[string]string)
 	err := handleOptions(headers, options)
 	if err != nil {
 		return nil, err
 	}
 	return bucket.Client.Conn.Do(method, bucket.BucketName, objectName,
-		urlParams, subResource, headers, data, 0)
+		urlParams, subResource, headers, data, 0, listener)
 }
 
 func (bucket Bucket) getConfig() *Config {

+ 1 - 1
oss/client.go

@@ -744,5 +744,5 @@ func AuthProxy(proxyHost, proxyUser, proxyPassword string) ClientOption {
 func (client Client) do(method, bucketName, urlParams, subResource string,
 	headers map[string]string, data io.Reader) (*Response, error) {
 	return client.Conn.Do(method, bucketName, "", urlParams,
-		subResource, headers, data, 0)
+		subResource, headers, data, 0, nil)
 }

+ 4 - 4
oss/client_test.go

@@ -1,4 +1,4 @@
-// client test
+// client test
 // use gocheck, install gocheck to execute "go get gopkg.in/check.v1",
 // see https://labix.org/gocheck
 
@@ -42,11 +42,11 @@ var (
 
 const (
 	// prefix of bucket name for bucket ops test
-	bucketNamePrefix = "go-sdk-test-bucket-xyz-"
+	bucketNamePrefix = "go-sdk-test-bucket-xyza-"
 	// bucket name for object ops test
-	bucketName = "go-sdk-test-bucket-xyz-for-object"
+	bucketName = "go-sdk-test-bucket-xyza-for-object"
 	// object name for object ops test
-	objectNamePrefix = "go-sdk-test-object-"
+	objectNamePrefix = "go-sdk-test-object-xyza-"
 
 	stsRegion = "cn-hangzhou"
 )

+ 44 - 30
oss/conn.go

@@ -59,15 +59,15 @@ func (conn *Conn) init(config *Config, urlMaker *urlMaker) error {
 }
 
 // Do 处理请求,返回响应结果。
-func (conn Conn) Do(method, bucketName, objectName, urlParams, subResource string,
-	headers map[string]string, data io.Reader, initCRC uint64) (*Response, error) {
+func (conn Conn) Do(method, bucketName, objectName, urlParams, subResource string, headers map[string]string,
+	data io.Reader, initCRC uint64, listener ProgressListener) (*Response, error) {
 	uri := conn.url.getURL(bucketName, objectName, urlParams)
 	resource := conn.url.getResource(bucketName, objectName, subResource)
-	return conn.doRequest(method, uri, resource, headers, data, initCRC)
+	return conn.doRequest(method, uri, resource, headers, data, initCRC, listener)
 }
 
-func (conn Conn) doRequest(method string, uri *url.URL, canonicalizedResource string,
-	headers map[string]string, data io.Reader, initCRC uint64) (*Response, error) {
+func (conn Conn) doRequest(method string, uri *url.URL, canonicalizedResource string, headers map[string]string,
+	data io.Reader, initCRC uint64, listener ProgressListener) (*Response, error) {
 	method = strings.ToUpper(method)
 	if !conn.config.IsUseProxy {
 		uri.Opaque = uri.Path
@@ -82,7 +82,8 @@ func (conn Conn) doRequest(method string, uri *url.URL, canonicalizedResource st
 		Host:       uri.Host,
 	}
 
-	fd, crc := conn.handleBody(req, data, initCRC)
+	tracker := &readerTracker{completedBytes: 0}
+	fd, crc := conn.handleBody(req, data, initCRC, listener, tracker)
 	if fd != nil {
 		defer func() {
 			fd.Close()
@@ -114,6 +115,9 @@ func (conn Conn) doRequest(method string, uri *url.URL, canonicalizedResource st
 
 	resp, err := conn.client.Do(req)
 	if err != nil {
+		// fail transfer
+		event := newProgressEvent(TransferFailedEvent, tracker.completedBytes, req.ContentLength)
+		publishProgress(listener, event)
 		return nil, err
 	}
 
@@ -121,7 +125,8 @@ func (conn Conn) doRequest(method string, uri *url.URL, canonicalizedResource st
 }
 
 // handle request body
-func (conn Conn) handleBody(req *http.Request, body io.Reader, initCRC uint64) (*os.File, hash.Hash64) {
+func (conn Conn) handleBody(req *http.Request, body io.Reader, initCRC uint64,
+	listener ProgressListener, tracker *readerTracker) (*os.File, hash.Hash64) {
 	var file *os.File
 	var crc hash.Hash64
 	reader := body
@@ -136,40 +141,25 @@ func (conn Conn) handleBody(req *http.Request, body io.Reader, initCRC uint64) (
 		req.ContentLength = int64(v.Len())
 	case *os.File:
 		req.ContentLength = tryGetFileSize(v)
+	case *io.LimitedReader:
+		req.ContentLength = int64(v.N)
 	}
 	req.Header.Set(HTTPHeaderContentLength, strconv.FormatInt(req.ContentLength, 10))
 
 	// md5
 	if body != nil && conn.config.IsEnableMD5 && req.Header.Get(HTTPHeaderContentMD5) == "" {
-		if req.ContentLength == 0 || req.ContentLength > conn.config.MD5Threshold {
-			// huge body, use temporary file
-			file, _ = ioutil.TempFile(os.TempDir(), TempFilePrefix)
-			if file != nil {
-				io.Copy(file, body)
-				file.Seek(0, os.SEEK_SET)
-				md5 := md5.New()
-				io.Copy(md5, file)
-				sum := md5.Sum(nil)
-				b64 := base64.StdEncoding.EncodeToString(sum[:])
-				req.Header.Set(HTTPHeaderContentMD5, b64)
-				file.Seek(0, os.SEEK_SET)
-				reader = file
-			}
-		} else {
-			// small body, use memory
-			buf, _ := ioutil.ReadAll(body)
-			sum := md5.Sum(buf)
-			b64 := base64.StdEncoding.EncodeToString(sum[:])
-			req.Header.Set(HTTPHeaderContentMD5, b64)
-			reader = bytes.NewReader(buf)
-		}
+		md5 := ""
+		reader, md5, file, _ = calcMD5(body, req.ContentLength, conn.config.MD5Threshold)
+		req.Header.Set(HTTPHeaderContentMD5, md5)
 	}
 
+	// crc
 	if reader != nil && conn.config.IsEnableCRC {
 		crc = NewCRC(crcTable(), initCRC)
-		reader = io.TeeReader(reader, crc)
+		reader = TeeReader(reader, crc, req.ContentLength, listener, tracker)
 	}
 
+	// http body
 	rc, ok := reader.(io.ReadCloser)
 	if !ok && reader != nil {
 		rc = ioutil.NopCloser(reader)
@@ -240,6 +230,30 @@ func (conn Conn) handleResponse(resp *http.Response, crc hash.Hash64) (*Response
 	}, nil
 }
 
+func calcMD5(body io.Reader, contentLen, md5Threshold int64) (reader io.Reader, b64 string, tempFile *os.File, err error) {
+	if contentLen == 0 || contentLen > md5Threshold {
+		// huge body, use temporary file
+		tempFile, err = ioutil.TempFile(os.TempDir(), TempFilePrefix)
+		if tempFile != nil {
+			io.Copy(tempFile, body)
+			tempFile.Seek(0, os.SEEK_SET)
+			md5 := md5.New()
+			io.Copy(md5, tempFile)
+			sum := md5.Sum(nil)
+			b64 = base64.StdEncoding.EncodeToString(sum[:])
+			tempFile.Seek(0, os.SEEK_SET)
+			reader = tempFile
+		}
+	} else {
+		// small body, use memory
+		buf, _ := ioutil.ReadAll(body)
+		sum := md5.Sum(buf)
+		b64 = base64.StdEncoding.EncodeToString(sum[:])
+		reader = bytes.NewReader(buf)
+	}
+	return
+}
+
 func readResponseBody(resp *http.Response) ([]byte, error) {
 	defer resp.Body.Close()
 	out, err := ioutil.ReadAll(resp.Body)

+ 1 - 1
oss/const.go

@@ -85,5 +85,5 @@ const (
 
 	CheckpointFileSuffix = ".cp" // Checkpoint文件后缀
 
-	Version = "1.2.3" // Go sdk版本
+	Version = "1.3.0" // Go sdk版本
 )

+ 2 - 2
oss/crc_test.go

@@ -150,7 +150,7 @@ func (s *OssCrcSuite) TestEnableCRCAndMD5(c *C) {
 	body.Close()
 
 	// GetObjectWithCRC
-	getResult, err := bucket.DoGetObject(&GetObjectRequest{objectName}, nil)
+	getResult, err := bucket.DoGetObject(&GetObjectRequest{objectName, nil}, nil)
 	c.Assert(err, IsNil)
 	str, err := readBody(getResult.Response.Body)
 	c.Assert(err, IsNil)
@@ -247,7 +247,7 @@ func (s *OssCrcSuite) TestDisableCRCAndMD5(c *C) {
 	body.Close()
 
 	// GetObjectWithCRC
-	getResult, err := bucket.DoGetObject(&GetObjectRequest{objectName}, nil)
+	getResult, err := bucket.DoGetObject(&GetObjectRequest{objectName, nil}, nil)
 	c.Assert(err, IsNil)
 	str, err := readBody(getResult.Response.Body)
 	c.Assert(err, IsNil)

+ 58 - 2
oss/download.go

@@ -60,6 +60,14 @@ func defaultDownloadPartHook(part downloadPart) error {
 	return nil
 }
 
+// 默认ProgressListener,屏蔽GetObject的Options中ProgressListener
+type defaultDownloadProgressListener struct {
+}
+
+// ProgressChanged 静默处理
+func (listener *defaultDownloadProgressListener) ProgressChanged(event *ProgressEvent) {
+}
+
 // 工作协程
 func downloadWorker(id int, arg downloadWorkerArg, jobs <-chan downloadPart, results chan<- downloadPart, failed chan<- error, die <-chan bool) {
 	for part := range jobs {
@@ -68,8 +76,9 @@ func downloadWorker(id int, arg downloadWorkerArg, jobs <-chan downloadPart, res
 			break
 		}
 
-		opt := Range(part.Start, part.End)
-		opts := append(arg.options, opt)
+		r := Range(part.Start, part.End)
+		p := Progress(&defaultDownloadProgressListener{})
+		opts := append(arg.options, r, p)
 		rd, err := arg.bucket.GetObject(arg.key, opts...)
 		if err != nil {
 			failed <- err
@@ -146,9 +155,19 @@ func getDownloadParts(bucket *Bucket, objectKey string, partSize int64) ([]downl
 	return parts, nil
 }
 
+// 文件大小
+func getObjectBytes(parts []downloadPart) int64 {
+	var ob int64
+	for _, part := range parts {
+		ob += (part.End - part.Start + 1)
+	}
+	return ob
+}
+
 // 并发无断点续传的下载
 func (bucket Bucket) downloadFile(objectKey, filePath string, partSize int64, options []Option, routines int) error {
 	tempFilePath := filePath + TempFileSuffix
+	listener := getProgressListener(options)
 
 	// 如果文件不存在则创建,存在不清空,下载分片会重写文件内容
 	fd, err := os.OpenFile(tempFilePath, os.O_WRONLY|os.O_CREATE, FilePermMode)
@@ -168,6 +187,11 @@ func (bucket Bucket) downloadFile(objectKey, filePath string, partSize int64, op
 	failed := make(chan error)
 	die := make(chan bool)
 
+	var completedBytes int64
+	totalBytes := getObjectBytes(parts)
+	event := newProgressEvent(TransferStartedEvent, 0, totalBytes)
+	publishProgress(listener, event)
+
 	// 启动工作协程
 	arg := downloadWorkerArg{&bucket, objectKey, tempFilePath, options, downloadPartHooker}
 	for w := 1; w <= routines; w++ {
@@ -185,8 +209,13 @@ func (bucket Bucket) downloadFile(objectKey, filePath string, partSize int64, op
 		case part := <-results:
 			completed++
 			ps[part.Index] = part
+			completedBytes += (part.End - part.Start + 1)
+			event = newProgressEvent(TransferDataEvent, completedBytes, totalBytes)
+			publishProgress(listener, event)
 		case err := <-failed:
 			close(die)
+			event = newProgressEvent(TransferFailedEvent, completedBytes, totalBytes)
+			publishProgress(listener, event)
 			return err
 		}
 
@@ -195,6 +224,9 @@ func (bucket Bucket) downloadFile(objectKey, filePath string, partSize int64, op
 		}
 	}
 
+	event = newProgressEvent(TransferCompletedEvent, completedBytes, totalBytes)
+	publishProgress(listener, event)
+
 	return os.Rename(tempFilePath, filePath)
 }
 
@@ -298,6 +330,17 @@ func (cp downloadCheckpoint) todoParts() []downloadPart {
 	return dps
 }
 
+// 完成的字节数
+func (cp downloadCheckpoint) getCompletedBytes() int64 {
+	var completedBytes int64
+	for i, part := range cp.Parts {
+		if cp.PartStat[i] {
+			completedBytes += (part.End - part.Start + 1)
+		}
+	}
+	return completedBytes
+}
+
 // 初始化下载任务
 func (cp *downloadCheckpoint) prepare(bucket *Bucket, objectKey, filePath string, partSize int64) error {
 	// cp
@@ -341,6 +384,7 @@ func (cp *downloadCheckpoint) complete(cpFilePath, downFilepath string) error {
 // 并发带断点的下载
 func (bucket Bucket) downloadFileWithCp(objectKey, filePath string, partSize int64, options []Option, cpFilePath string, routines int) error {
 	tempFilePath := filePath + TempFileSuffix
+	listener := getProgressListener(options)
 
 	// LOAD CP数据
 	dcp := downloadCheckpoint{}
@@ -372,6 +416,10 @@ func (bucket Bucket) downloadFileWithCp(objectKey, filePath string, partSize int
 	failed := make(chan error)
 	die := make(chan bool)
 
+	completedBytes := dcp.getCompletedBytes()
+	event := newProgressEvent(TransferStartedEvent, completedBytes, dcp.ObjStat.Size)
+	publishProgress(listener, event)
+
 	// 启动工作协程
 	arg := downloadWorkerArg{&bucket, objectKey, tempFilePath, options, downloadPartHooker}
 	for w := 1; w <= routines; w++ {
@@ -389,8 +437,13 @@ func (bucket Bucket) downloadFileWithCp(objectKey, filePath string, partSize int
 			completed++
 			dcp.PartStat[part.Index] = true
 			dcp.dump(cpFilePath)
+			completedBytes += (part.End - part.Start + 1)
+			event = newProgressEvent(TransferDataEvent, completedBytes, dcp.ObjStat.Size)
+			publishProgress(listener, event)
 		case err := <-failed:
 			close(die)
+			event = newProgressEvent(TransferFailedEvent, completedBytes, dcp.ObjStat.Size)
+			publishProgress(listener, event)
 			return err
 		}
 
@@ -399,5 +452,8 @@ func (bucket Bucket) downloadFileWithCp(objectKey, filePath string, partSize int
 		}
 	}
 
+	event = newProgressEvent(TransferCompletedEvent, completedBytes, dcp.ObjStat.Size)
+	publishProgress(listener, event)
+
 	return dcp.complete(cpFilePath, tempFilePath)
 }

+ 4 - 0
oss/model.go

@@ -19,11 +19,13 @@ type Response struct {
 type PutObjectRequest struct {
 	ObjectKey string
 	Reader    io.Reader
+	Listener  ProgressListener
 }
 
 // GetObjectRequest The request of DoGetObject
 type GetObjectRequest struct {
 	ObjectKey string
+	Listener  ProgressListener
 }
 
 // GetObjectResult The result of DoGetObject
@@ -38,6 +40,7 @@ type AppendObjectRequest struct {
 	ObjectKey string
 	Reader    io.Reader
 	Position  int64
+	Listener  ProgressListener
 }
 
 // AppendObjectResult The result of DoAppendObject
@@ -52,6 +55,7 @@ type UploadPartRequest struct {
 	Reader     io.Reader
 	PartSize   int64
 	PartNumber int
+	Listener   ProgressListener
 }
 
 // UploadPartResult The result of DoUploadPart

+ 47 - 0
oss/multicopy.go

@@ -128,11 +128,21 @@ func getCopyParts(bucket *Bucket, objectKey string, partSize int64) ([]copyPart,
 	return parts, nil
 }
 
+// 获取源文件大小
+func getSrcObjectBytes(parts []copyPart) int64 {
+	var ob int64
+	for _, part := range parts {
+		ob += (part.End - part.Start + 1)
+	}
+	return ob
+}
+
 // 并发无断点续传的下载
 func (bucket Bucket) copyFile(srcBucketName, srcObjectKey, destBucketName, destObjectKey string,
 	partSize int64, options []Option, routines int) error {
 	descBucket, err := bucket.Client.Bucket(destBucketName)
 	srcBucket, err := bucket.Client.Bucket(srcBucketName)
+	listener := getProgressListener(options)
 
 	// 分割文件
 	parts, err := getCopyParts(srcBucket, srcObjectKey, partSize)
@@ -151,6 +161,11 @@ func (bucket Bucket) copyFile(srcBucketName, srcObjectKey, destBucketName, destO
 	failed := make(chan error)
 	die := make(chan bool)
 
+	var completedBytes int64
+	totalBytes := getSrcObjectBytes(parts)
+	event := newProgressEvent(TransferStartedEvent, 0, totalBytes)
+	publishProgress(listener, event)
+
 	// 启动工作协程
 	arg := copyWorkerArg{descBucket, imur, srcBucketName, srcObjectKey, options, copyPartHooker}
 	for w := 1; w <= routines; w++ {
@@ -168,9 +183,14 @@ func (bucket Bucket) copyFile(srcBucketName, srcObjectKey, destBucketName, destO
 		case part := <-results:
 			completed++
 			ups[part.PartNumber-1] = part
+			completedBytes += (parts[part.PartNumber-1].End - parts[part.PartNumber-1].Start + 1)
+			event = newProgressEvent(TransferDataEvent, completedBytes, totalBytes)
+			publishProgress(listener, event)
 		case err := <-failed:
 			close(die)
 			descBucket.AbortMultipartUpload(imur)
+			event = newProgressEvent(TransferFailedEvent, completedBytes, totalBytes)
+			publishProgress(listener, event)
 			return err
 		}
 
@@ -179,6 +199,9 @@ func (bucket Bucket) copyFile(srcBucketName, srcObjectKey, destBucketName, destO
 		}
 	}
 
+	event = newProgressEvent(TransferCompletedEvent, completedBytes, totalBytes)
+	publishProgress(listener, event)
+
 	// 提交任务
 	_, err = descBucket.CompleteMultipartUpload(imur, ups)
 	if err != nil {
@@ -292,6 +315,17 @@ func (cp copyCheckpoint) todoParts() []copyPart {
 	return dps
 }
 
+// 完成的字节数
+func (cp copyCheckpoint) getCompletedBytes() int64 {
+	var completedBytes int64
+	for i, part := range cp.Parts {
+		if cp.PartStat[i] {
+			completedBytes += (part.End - part.Start + 1)
+		}
+	}
+	return completedBytes
+}
+
 // 初始化下载任务
 func (cp *copyCheckpoint) prepare(srcBucket *Bucket, srcObjectKey string, destBucket *Bucket, destObjectKey string,
 	partSize int64, options []Option) error {
@@ -354,6 +388,7 @@ func (bucket Bucket) copyFileWithCp(srcBucketName, srcObjectKey, destBucketName,
 	partSize int64, options []Option, cpFilePath string, routines int) error {
 	descBucket, err := bucket.Client.Bucket(destBucketName)
 	srcBucket, err := bucket.Client.Bucket(srcBucketName)
+	listener := getProgressListener(options)
 
 	// LOAD CP数据
 	ccp := copyCheckpoint{}
@@ -383,6 +418,10 @@ func (bucket Bucket) copyFileWithCp(srcBucketName, srcObjectKey, destBucketName,
 	failed := make(chan error)
 	die := make(chan bool)
 
+	completedBytes := ccp.getCompletedBytes()
+	event := newProgressEvent(TransferStartedEvent, completedBytes, ccp.ObjStat.Size)
+	publishProgress(listener, event)
+
 	// 启动工作协程
 	arg := copyWorkerArg{descBucket, imur, srcBucketName, srcObjectKey, options, copyPartHooker}
 	for w := 1; w <= routines; w++ {
@@ -400,8 +439,13 @@ func (bucket Bucket) copyFileWithCp(srcBucketName, srcObjectKey, destBucketName,
 			completed++
 			ccp.update(part)
 			ccp.dump(cpFilePath)
+			completedBytes += (parts[part.PartNumber-1].End - parts[part.PartNumber-1].Start + 1)
+			event = newProgressEvent(TransferDataEvent, completedBytes, ccp.ObjStat.Size)
+			publishProgress(listener, event)
 		case err := <-failed:
 			close(die)
+			event = newProgressEvent(TransferFailedEvent, completedBytes, ccp.ObjStat.Size)
+			publishProgress(listener, event)
 			return err
 		}
 
@@ -410,5 +454,8 @@ func (bucket Bucket) copyFileWithCp(srcBucketName, srcObjectKey, destBucketName,
 		}
 	}
 
+	event = newProgressEvent(TransferCompletedEvent, completedBytes, ccp.ObjStat.Size)
+	publishProgress(listener, event)
+
 	return ccp.complete(descBucket, ccp.CopyParts, cpFilePath)
 }

+ 16 - 12
oss/multipart.go

@@ -24,7 +24,7 @@ import (
 func (bucket Bucket) InitiateMultipartUpload(objectKey string, options ...Option) (InitiateMultipartUploadResult, error) {
 	var imur InitiateMultipartUploadResult
 	opts := addContentType(options, objectKey)
-	resp, err := bucket.do("POST", objectKey, "uploads", "uploads", opts, nil)
+	resp, err := bucket.do("POST", objectKey, "uploads", "uploads", opts, nil, nil)
 	if err != nil {
 		return imur, err
 	}
@@ -53,7 +53,7 @@ func (bucket Bucket) InitiateMultipartUpload(objectKey string, options ...Option
 // error 操作成功error为nil,非nil为错误信息。
 //
 func (bucket Bucket) UploadPart(imur InitiateMultipartUploadResult, reader io.Reader,
-	partSize int64, partNumber int) (UploadPart, error) {
+	partSize int64, partNumber int, options ...Option) (UploadPart, error) {
 	request := &UploadPartRequest{
 		InitResult: &imur,
 		Reader:     reader,
@@ -61,7 +61,7 @@ func (bucket Bucket) UploadPart(imur InitiateMultipartUploadResult, reader io.Re
 		PartNumber: partNumber,
 	}
 
-	result, err := bucket.DoUploadPart(request)
+	result, err := bucket.DoUploadPart(request, options)
 
 	return result.Part, err
 }
@@ -80,7 +80,7 @@ func (bucket Bucket) UploadPart(imur InitiateMultipartUploadResult, reader io.Re
 // error 操作成功error为nil,非nil为错误信息。
 //
 func (bucket Bucket) UploadPartFromFile(imur InitiateMultipartUploadResult, filePath string,
-	startPosition, partSize int64, partNumber int) (UploadPart, error) {
+	startPosition, partSize int64, partNumber int, options ...Option) (UploadPart, error) {
 	var part = UploadPart{}
 	fd, err := os.Open(filePath)
 	if err != nil {
@@ -96,7 +96,7 @@ func (bucket Bucket) UploadPartFromFile(imur InitiateMultipartUploadResult, file
 		PartNumber: partNumber,
 	}
 
-	result, err := bucket.DoUploadPart(request)
+	result, err := bucket.DoUploadPart(request, options)
 
 	return result.Part, err
 }
@@ -109,11 +109,15 @@ func (bucket Bucket) UploadPartFromFile(imur InitiateMultipartUploadResult, file
 // UploadPartResult 上传分片请求返回值。
 // error  操作无错误为nil,非nil为错误信息。
 //
-func (bucket Bucket) DoUploadPart(request *UploadPartRequest) (*UploadPartResult, error) {
+func (bucket Bucket) DoUploadPart(request *UploadPartRequest, options []Option) (*UploadPartResult, error) {
+	if request.Listener == nil {
+		request.Listener = getProgressListener(options)
+	}
+
 	params := "partNumber=" + strconv.Itoa(request.PartNumber) + "&uploadId=" + request.InitResult.UploadID
 	opts := []Option{ContentLength(request.PartSize)}
 	resp, err := bucket.do("PUT", request.InitResult.Key, params, params, opts,
-		&io.LimitedReader{R: request.Reader, N: request.PartSize})
+		&io.LimitedReader{R: request.Reader, N: request.PartSize}, request.Listener)
 	if err != nil {
 		return &UploadPartResult{}, err
 	}
@@ -159,7 +163,7 @@ func (bucket Bucket) UploadPartCopy(imur InitiateMultipartUploadResult, srcBucke
 		CopySourceRange(startPosition, partSize)}
 	opts = append(opts, options...)
 	params := "partNumber=" + strconv.Itoa(partNumber) + "&uploadId=" + imur.UploadID
-	resp, err := bucket.do("PUT", imur.Key, params, params, opts, nil)
+	resp, err := bucket.do("PUT", imur.Key, params, params, opts, nil, nil)
 	if err != nil {
 		return part, err
 	}
@@ -199,7 +203,7 @@ func (bucket Bucket) CompleteMultipartUpload(imur InitiateMultipartUploadResult,
 	buffer.Write(bs)
 
 	params := "uploadId=" + imur.UploadID
-	resp, err := bucket.do("POST", imur.Key, params, params, nil, buffer)
+	resp, err := bucket.do("POST", imur.Key, params, params, nil, buffer, nil)
 	if err != nil {
 		return out, err
 	}
@@ -218,7 +222,7 @@ func (bucket Bucket) CompleteMultipartUpload(imur InitiateMultipartUploadResult,
 //
 func (bucket Bucket) AbortMultipartUpload(imur InitiateMultipartUploadResult) error {
 	params := "uploadId=" + imur.UploadID
-	resp, err := bucket.do("DELETE", imur.Key, params, params, nil, nil)
+	resp, err := bucket.do("DELETE", imur.Key, params, params, nil, nil, nil)
 	if err != nil {
 		return err
 	}
@@ -237,7 +241,7 @@ func (bucket Bucket) AbortMultipartUpload(imur InitiateMultipartUploadResult) er
 func (bucket Bucket) ListUploadedParts(imur InitiateMultipartUploadResult) (ListUploadedPartsResult, error) {
 	var out ListUploadedPartsResult
 	params := "uploadId=" + imur.UploadID
-	resp, err := bucket.do("GET", imur.Key, params, params, nil, nil)
+	resp, err := bucket.do("GET", imur.Key, params, params, nil, nil, nil)
 	if err != nil {
 		return out, err
 	}
@@ -265,7 +269,7 @@ func (bucket Bucket) ListMultipartUploads(options ...Option) (ListMultipartUploa
 		return out, err
 	}
 
-	resp, err := bucket.do("GET", "", "uploads&"+params, "uploads", nil, nil)
+	resp, err := bucket.do("GET", "", "uploads&"+params, "uploads", nil, nil, nil)
 	if err != nil {
 		return out, err
 	}

+ 15 - 9
oss/option.go

@@ -24,11 +24,12 @@ const (
 	routineNum         = "x-routine-num"
 	checkpointConfig   = "x-cp-config"
 	initCRC64          = "init-crc64"
+	progressListener   = "x-progress-listener"
 )
 
 type (
 	optionValue struct {
-		Value string
+		Value interface{}
 		Type  optionType
 	}
 
@@ -229,7 +230,12 @@ func InitCRC(initCRC uint64) Option {
 	return addArg(initCRC64, strconv.FormatUint(initCRC, 10))
 }
 
-func setHeader(key, value string) Option {
+// Progress set progress listener
+func Progress(listener ProgressListener) Option {
+	return addArg(progressListener, listener)
+}
+
+func setHeader(key string, value interface{}) Option {
 	return func(params map[string]optionValue) error {
 		if value == "" {
 			return nil
@@ -239,7 +245,7 @@ func setHeader(key, value string) Option {
 	}
 }
 
-func addParam(key, value string) Option {
+func addParam(key string, value interface{}) Option {
 	return func(params map[string]optionValue) error {
 		if value == "" {
 			return nil
@@ -249,7 +255,7 @@ func addParam(key, value string) Option {
 	}
 }
 
-func addArg(key, value string) Option {
+func addArg(key string, value interface{}) Option {
 	return func(params map[string]optionValue) error {
 		if value == "" {
 			return nil
@@ -271,7 +277,7 @@ func handleOptions(headers map[string]string, options []Option) error {
 
 	for k, v := range params {
 		if v.Type == optionHTTP {
-			headers[k] = v.Value
+			headers[k] = v.Value.(string)
 		}
 	}
 	return nil
@@ -307,13 +313,13 @@ func handleParams(options []Option) (string, error) {
 			buf.WriteByte('&')
 		}
 		buf.WriteString(prefix)
-		buf.WriteString(url.QueryEscape(vs.Value))
+		buf.WriteString(url.QueryEscape(vs.Value.(string)))
 	}
 
 	return buf.String(), nil
 }
 
-func findOption(options []Option, param, defaultVal string) (string, error) {
+func findOption(options []Option, param string, defaultVal interface{}) (interface{}, error) {
 	params := map[string]optionValue{}
 	for _, option := range options {
 		if option != nil {
@@ -329,7 +335,7 @@ func findOption(options []Option, param, defaultVal string) (string, error) {
 	return defaultVal, nil
 }
 
-func isOptionSet(options []Option, option string) (bool, string, error) {
+func isOptionSet(options []Option, option string) (bool, interface{}, error) {
 	params := map[string]optionValue{}
 	for _, option := range options {
 		if option != nil {
@@ -342,5 +348,5 @@ func isOptionSet(options []Option, option string) (bool, string, error) {
 	if val, ok := params[option]; ok {
 		return true, val.Value, nil
 	}
-	return false, "", nil
+	return false, nil, nil
 }

+ 116 - 0
oss/progress.go

@@ -0,0 +1,116 @@
+package oss
+
+import "io"
+
+// ProgressEventType transfer progress event type
+type ProgressEventType int
+
+const (
+	// TransferStartedEvent transfer started, set TotalBytes
+	TransferStartedEvent ProgressEventType = 1 + iota
+	// TransferDataEvent transfer data, set ConsumedBytes anmd TotalBytes
+	TransferDataEvent
+	// TransferCompletedEvent transfer completed
+	TransferCompletedEvent
+	// TransferFailedEvent transfer encounters an error
+	TransferFailedEvent
+)
+
+// ProgressEvent progress event
+type ProgressEvent struct {
+	ConsumedBytes int64
+	TotalBytes    int64
+	EventType     ProgressEventType
+}
+
+// ProgressListener listen progress change
+type ProgressListener interface {
+	ProgressChanged(event *ProgressEvent)
+}
+
+// -------------------- private --------------------
+
+func newProgressEvent(eventType ProgressEventType, consumed, total int64) *ProgressEvent {
+	return &ProgressEvent{
+		ConsumedBytes: consumed,
+		TotalBytes:    total,
+		EventType:     eventType}
+}
+
+// publishProgress
+func publishProgress(listener ProgressListener, event *ProgressEvent) {
+	if listener != nil && event != nil {
+		listener.ProgressChanged(event)
+	}
+}
+
+type readerTracker struct {
+	completedBytes int64
+}
+
+type teeReader struct {
+	reader        io.Reader
+	writer        io.Writer
+	listener      ProgressListener
+	consumedBytes int64
+	totalBytes    int64
+	tracker       *readerTracker
+}
+
+// TeeReader returns a Reader that writes to w what it reads from r.
+// All reads from r performed through it are matched with
+// corresponding writes to w.  There is no internal buffering -
+// the write must complete before the read completes.
+// Any error encountered while writing is reported as a read error.
+func TeeReader(reader io.Reader, writer io.Writer, totalBytes int64, listener ProgressListener, tracker *readerTracker) io.Reader {
+	t := &teeReader{
+		reader:        reader,
+		writer:        writer,
+		listener:      listener,
+		consumedBytes: 0,
+		totalBytes:    totalBytes,
+		tracker:       tracker,
+	}
+
+	// start transfer
+	event := newProgressEvent(TransferStartedEvent, 0, totalBytes)
+	publishProgress(listener, event)
+
+	return t
+}
+
+func (t *teeReader) Read(p []byte) (n int, err error) {
+	n, err = t.reader.Read(p)
+
+	// read encountered error
+	if err != nil && err != io.EOF {
+		event := newProgressEvent(TransferFailedEvent, t.consumedBytes, t.totalBytes)
+		publishProgress(t.listener, event)
+	}
+
+	if n > 0 {
+		t.consumedBytes += int64(n)
+		// crc
+		if t.writer != nil {
+			if n, err := t.writer.Write(p[:n]); err != nil {
+				return n, err
+			}
+		}
+		// progress
+		if t.listener != nil {
+			event := newProgressEvent(TransferDataEvent, t.consumedBytes, t.totalBytes)
+			publishProgress(t.listener, event)
+		}
+		// tracke
+		if t.tracker != nil {
+			t.tracker.completedBytes = t.consumedBytes
+		}
+	}
+
+	// read completed
+	if err == io.EOF {
+		event := newProgressEvent(TransferCompletedEvent, t.consumedBytes, t.totalBytes)
+		publishProgress(t.listener, event)
+	}
+	return
+}

+ 377 - 0
oss/progress_test.go

@@ -0,0 +1,377 @@
+// bucket test
+
+package oss
+
+import (
+	"bytes"
+	"io/ioutil"
+	"math/rand"
+	"os"
+	"strings"
+	"time"
+
+	. "gopkg.in/check.v1"
+)
+
+type OssProgressSuite struct {
+	client *Client
+	bucket *Bucket
+}
+
+var _ = Suite(&OssProgressSuite{})
+
+// Run once when the suite starts running
+func (s *OssProgressSuite) SetUpSuite(c *C) {
+	client, err := New(endpoint, accessID, accessKey)
+	c.Assert(err, IsNil)
+	s.client = client
+
+	s.client.CreateBucket(bucketName)
+	time.Sleep(5 * time.Second)
+
+	bucket, err := s.client.Bucket(bucketName)
+	c.Assert(err, IsNil)
+	s.bucket = bucket
+
+	testLogger.Println("test progress started")
+}
+
+// Run before each test or benchmark starts running
+func (s *OssProgressSuite) TearDownSuite(c *C) {
+	// Delete Multipart
+	lmu, err := s.bucket.ListMultipartUploads()
+	c.Assert(err, IsNil)
+
+	for _, upload := range lmu.Uploads {
+		imur := InitiateMultipartUploadResult{Bucket: bucketName, Key: upload.Key, UploadID: upload.UploadID}
+		err = s.bucket.AbortMultipartUpload(imur)
+		c.Assert(err, IsNil)
+	}
+
+	// Delete Objects
+	lor, err := s.bucket.ListObjects()
+	c.Assert(err, IsNil)
+
+	for _, object := range lor.Objects {
+		err = s.bucket.DeleteObject(object.Key)
+		c.Assert(err, IsNil)
+	}
+
+	testLogger.Println("test progress completed")
+}
+
+// Run after each test or benchmark runs
+func (s *OssProgressSuite) SetUpTest(c *C) {
+	err := removeTempFiles("../oss", ".jpg")
+	c.Assert(err, IsNil)
+
+	err = removeTempFiles("../oss", ".txt")
+	c.Assert(err, IsNil)
+
+	err = removeTempFiles("../oss", ".html")
+	c.Assert(err, IsNil)
+}
+
+// Run once after all tests or benchmarks have finished running
+func (s *OssProgressSuite) TearDownTest(c *C) {
+	err := removeTempFiles("../oss", ".jpg")
+	c.Assert(err, IsNil)
+
+	err = removeTempFiles("../oss", ".txt")
+	c.Assert(err, IsNil)
+
+	err = removeTempFiles("../oss", ".html")
+	c.Assert(err, IsNil)
+}
+
+// OssProgressListener progress listener
+type OssProgressListener struct {
+}
+
+// ProgressChanged handle progress event
+func (listener *OssProgressListener) ProgressChanged(event *ProgressEvent) {
+	switch event.EventType {
+	case TransferStartedEvent:
+		testLogger.Printf("Transfer Started, ConsumedBytes: %d, TotalBytes %d.\n",
+			event.ConsumedBytes, event.TotalBytes)
+	case TransferDataEvent:
+		testLogger.Printf("Transfer Data, ConsumedBytes: %d, TotalBytes %d, %d%%.\n",
+			event.ConsumedBytes, event.TotalBytes, event.ConsumedBytes*100/event.TotalBytes)
+	case TransferCompletedEvent:
+		testLogger.Printf("Transfer Completed, ConsumedBytes: %d, TotalBytes %d.\n",
+			event.ConsumedBytes, event.TotalBytes)
+	case TransferFailedEvent:
+		testLogger.Printf("Transfer Failed, ConsumedBytes: %d, TotalBytes %d.\n",
+			event.ConsumedBytes, event.TotalBytes)
+	default:
+	}
+}
+
+// TestPutObject
+func (s *OssProgressSuite) TestPutObject(c *C) {
+	objectName := objectNamePrefix + "tpo.html"
+	localFile := "../sample/The Go Programming Language.html"
+
+	// PutObject
+	fd, err := os.Open(localFile)
+	c.Assert(err, IsNil)
+	defer fd.Close()
+
+	err = s.bucket.PutObject(objectName, fd, Progress(&OssProgressListener{}))
+	c.Assert(err, IsNil)
+
+	// PutObjectFromFile
+	err = s.bucket.PutObjectFromFile(objectName, localFile, Progress(&OssProgressListener{}))
+	c.Assert(err, IsNil)
+
+	// DoPutObject
+	fd, err = os.Open(localFile)
+	c.Assert(err, IsNil)
+	defer fd.Close()
+
+	request := &PutObjectRequest{
+		ObjectKey: objectName,
+		Reader:    fd,
+		Listener:  &OssProgressListener{},
+	}
+
+	options := []Option{}
+	_, err = s.bucket.DoPutObject(request, options)
+	c.Assert(err, IsNil)
+
+	// PutObject size is 0
+	err = s.bucket.PutObject(objectName, strings.NewReader(""), Progress(&OssProgressListener{}))
+	c.Assert(err, IsNil)
+
+	testLogger.Println("OssProgressSuite.TestPutObject")
+}
+
+// TestAppendObject
+func (s *OssProgressSuite) TestAppendObject(c *C) {
+	objectName := objectNamePrefix + "tao"
+	objectValue := "昨夜雨疏风骤,浓睡不消残酒。试问卷帘人,却道海棠依旧。知否?知否?应是绿肥红瘦。"
+	var val = []byte(objectValue)
+	var nextPos int64
+	var midPos = 1 + rand.Intn(len(val)-1)
+
+	// AppendObject
+	nextPos, err := s.bucket.AppendObject(objectName, bytes.NewReader(val[0:midPos]), nextPos, Progress(&OssProgressListener{}))
+	c.Assert(err, IsNil)
+
+	// DoAppendObject
+	request := &AppendObjectRequest{
+		ObjectKey: objectName,
+		Reader:    bytes.NewReader(val[midPos:]),
+		Position:  nextPos,
+	}
+	options := []Option{Progress(&OssProgressListener{})}
+	_, err = s.bucket.DoAppendObject(request, options)
+	c.Assert(err, IsNil)
+
+	testLogger.Println("OssProgressSuite.TestAppendObject")
+}
+
+// TestMultipartUpload
+func (s *OssProgressSuite) TestMultipartUpload(c *C) {
+	objectName := objectNamePrefix + "tmu.jpg"
+	var fileName = "../sample/BingWallpaper-2015-11-07.jpg"
+
+	chunks, err := SplitFileByPartNum(fileName, 3)
+	c.Assert(err, IsNil)
+	testLogger.Println("chunks:", chunks)
+
+	fd, err := os.Open(fileName)
+	c.Assert(err, IsNil)
+	defer fd.Close()
+
+	// Initiate
+	imur, err := s.bucket.InitiateMultipartUpload(objectName)
+	c.Assert(err, IsNil)
+
+	// UploadPart
+	var parts []UploadPart
+	for _, chunk := range chunks {
+		fd.Seek(chunk.Offset, os.SEEK_SET)
+		part, err := s.bucket.UploadPart(imur, fd, chunk.Size, chunk.Number, Progress(&OssProgressListener{}))
+		c.Assert(err, IsNil)
+		parts = append(parts, part)
+	}
+
+	// Complete
+	_, err = s.bucket.CompleteMultipartUpload(imur, parts)
+	c.Assert(err, IsNil)
+
+	err = s.bucket.DeleteObject(objectName)
+	c.Assert(err, IsNil)
+
+	testLogger.Println("OssProgressSuite.TestMultipartUpload")
+}
+
+// TestMultipartUploadFromFile
+func (s *OssProgressSuite) TestMultipartUploadFromFile(c *C) {
+	objectName := objectNamePrefix + "tmuff.jpg"
+	var fileName = "../sample/BingWallpaper-2015-11-07.jpg"
+
+	chunks, err := SplitFileByPartNum(fileName, 3)
+	c.Assert(err, IsNil)
+
+	// Initiate
+	imur, err := s.bucket.InitiateMultipartUpload(objectName)
+	c.Assert(err, IsNil)
+
+	// UploadPart
+	var parts []UploadPart
+	for _, chunk := range chunks {
+		part, err := s.bucket.UploadPartFromFile(imur, fileName, chunk.Offset, chunk.Size, chunk.Number, Progress(&OssProgressListener{}))
+		c.Assert(err, IsNil)
+		parts = append(parts, part)
+	}
+
+	// Complete
+	_, err = s.bucket.CompleteMultipartUpload(imur, parts)
+	c.Assert(err, IsNil)
+
+	err = s.bucket.DeleteObject(objectName)
+	c.Assert(err, IsNil)
+
+	testLogger.Println("OssProgressSuite.TestMultipartUploadFromFile")
+}
+
+// TestGetObject
+func (s *OssProgressSuite) TestGetObject(c *C) {
+	objectName := objectNamePrefix + "tgo.jpg"
+	localFile := "../sample/BingWallpaper-2015-11-07.jpg"
+	newFile := "newpic-progress-1.jpg"
+
+	// PutObject
+	err := s.bucket.PutObjectFromFile(objectName, localFile, Progress(&OssProgressListener{}))
+	c.Assert(err, IsNil)
+
+	// GetObject
+	body, err := s.bucket.GetObject(objectName, Progress(&OssProgressListener{}))
+	c.Assert(err, IsNil)
+	_, err = ioutil.ReadAll(body)
+	c.Assert(err, IsNil)
+	body.Close()
+
+	// GetObjectToFile
+	err = s.bucket.GetObjectToFile(objectName, newFile, Progress(&OssProgressListener{}))
+	c.Assert(err, IsNil)
+
+	// DoGetObject
+	request := &GetObjectRequest{
+		objectName,
+		&OssProgressListener{},
+	}
+	options := []Option{}
+	result, err := s.bucket.DoGetObject(request, options)
+	c.Assert(err, IsNil)
+	_, err = ioutil.ReadAll(result.Response.Body)
+	c.Assert(err, IsNil)
+	result.Response.Body.Close()
+
+	// GetObject with range
+	body, err = s.bucket.GetObject(objectName, Range(1024, 4*1024), Progress(&OssProgressListener{}))
+	c.Assert(err, IsNil)
+	_, err = ioutil.ReadAll(body)
+	c.Assert(err, IsNil)
+	body.Close()
+
+	// PutObject size is 0
+	err = s.bucket.PutObject(objectName, strings.NewReader(""), Progress(&OssProgressListener{}))
+	c.Assert(err, IsNil)
+
+	// GetObject size is 0
+	body, err = s.bucket.GetObject(objectName, Progress(&OssProgressListener{}))
+	c.Assert(err, IsNil)
+	_, err = ioutil.ReadAll(body)
+	c.Assert(err, IsNil)
+	body.Close()
+
+	testLogger.Println("OssProgressSuite.TestGetObject")
+}
+
+// TestGetObjectNegative
+func (s *OssProgressSuite) TestGetObjectNegative(c *C) {
+	objectName := objectNamePrefix + "tgon.jpg"
+	localFile := "../sample/BingWallpaper-2015-11-07.jpg"
+
+	// PutObject
+	err := s.bucket.PutObjectFromFile(objectName, localFile)
+	c.Assert(err, IsNil)
+
+	// GetObject
+	body, err := s.bucket.GetObject(objectName, Progress(&OssProgressListener{}))
+	c.Assert(err, IsNil)
+
+	buf := make([]byte, 4*1024)
+	n, err := body.Read(buf)
+	c.Assert(err, IsNil)
+
+	//time.Sleep(70 * time.Second) TODO
+
+	// read should fail
+	for err == nil {
+		n, err = body.Read(buf)
+		n += n
+	}
+	c.Assert(err, NotNil)
+	body.Close()
+
+	testLogger.Println("OssProgressSuite.TestGetObjectNegative")
+}
+
+// TestUploadFile
+func (s *OssProgressSuite) TestUploadFile(c *C) {
+	objectName := objectNamePrefix + "tuf.jpg"
+	fileName := "../sample/BingWallpaper-2015-11-07.jpg"
+
+	err := s.bucket.UploadFile(objectName, fileName, 100*1024, Routines(5), Progress(&OssProgressListener{}))
+	c.Assert(err, IsNil)
+
+	err = s.bucket.UploadFile(objectName, fileName, 100*1024, Routines(3), Checkpoint(true, objectName+".cp"), Progress(&OssProgressListener{}))
+	c.Assert(err, IsNil)
+
+	testLogger.Println("OssProgressSuite.TestUploadFile")
+}
+
+// TestDownloadFile
+func (s *OssProgressSuite) TestDownloadFile(c *C) {
+	objectName := objectNamePrefix + "tdf.jpg"
+	fileName := "../sample/BingWallpaper-2015-11-07.jpg"
+	newFile := "down-new-file-progress-2.jpg"
+
+	// upload
+	err := s.bucket.UploadFile(objectName, fileName, 100*1024, Routines(3))
+	c.Assert(err, IsNil)
+
+	err = s.bucket.DownloadFile(objectName, newFile, 100*1024, Routines(5), Progress(&OssProgressListener{}))
+	c.Assert(err, IsNil)
+
+	err = s.bucket.DownloadFile(objectName, newFile, 1024*1024, Routines(3), Progress(&OssProgressListener{}))
+	c.Assert(err, IsNil)
+
+	err = s.bucket.DownloadFile(objectName, newFile, 50*1024, Routines(3), Checkpoint(true, ""), Progress(&OssProgressListener{}))
+	c.Assert(err, IsNil)
+
+	testLogger.Println("OssProgressSuite.TestDownloadFile")
+}
+
+// TestCopyFile
+func (s *OssProgressSuite) TestCopyFile(c *C) {
+	srcObjectName := objectNamePrefix + "tcf.jpg"
+	destObjectName := srcObjectName + "-copy"
+	fileName := "../sample/BingWallpaper-2015-11-07.jpg"
+
+	// upload
+	err := s.bucket.UploadFile(srcObjectName, fileName, 100*1024, Routines(3))
+	c.Assert(err, IsNil)
+
+	err = s.bucket.CopyFile(bucketName, srcObjectName, destObjectName, 100*1024, Routines(5), Progress(&OssProgressListener{}))
+	c.Assert(err, IsNil)
+
+	err = s.bucket.CopyFile(bucketName, srcObjectName, destObjectName, 1024*100, Routines(3), Checkpoint(true, ""), Progress(&OssProgressListener{}))
+	c.Assert(err, IsNil)
+
+	testLogger.Println("OssProgressSuite.TestCopyFile")
+}

+ 59 - 2
oss/upload.go

@@ -51,7 +51,7 @@ func getCpConfig(options []Option, filePath string) (*cpConfig, error) {
 	}
 
 	if cpStr != "" {
-		if err = json.Unmarshal([]byte(cpStr), &cpc); err != nil {
+		if err = json.Unmarshal([]byte(cpStr.(string)), &cpc); err != nil {
 			return nil, err
 		}
 	}
@@ -70,7 +70,7 @@ func getRoutines(options []Option) int {
 		return 1
 	}
 
-	rs, err := strconv.Atoi(rStr)
+	rs, err := strconv.Atoi(rStr.(string))
 	if err != nil {
 		return 1
 	}
@@ -84,6 +84,15 @@ func getRoutines(options []Option) int {
 	return rs
 }
 
+// 获取进度回调
+func getProgressListener(options []Option) ProgressListener {
+	isSet, listener, _ := isOptionSet(options, progressListener)
+	if !isSet {
+		return nil
+	}
+	return listener.(ProgressListener)
+}
+
 // 测试使用
 type uploadPartHook func(id int, chunk FileChunk) error
 
@@ -130,8 +139,18 @@ func scheduler(jobs chan FileChunk, chunks []FileChunk) {
 	close(jobs)
 }
 
+func getTotalBytes(chunks []FileChunk) int64 {
+	var tb int64
+	for _, chunk := range chunks {
+		tb += chunk.Size
+	}
+	return tb
+}
+
 // 并发上传,不带断点续传功能
 func (bucket Bucket) uploadFile(objectKey, filePath string, partSize int64, options []Option, routines int) error {
+	listener := getProgressListener(options)
+
 	chunks, err := SplitFileByPartSize(filePath, partSize)
 	if err != nil {
 		return err
@@ -148,6 +167,11 @@ func (bucket Bucket) uploadFile(objectKey, filePath string, partSize int64, opti
 	failed := make(chan error)
 	die := make(chan bool)
 
+	var completedBytes int64
+	totalBytes := getTotalBytes(chunks)
+	event := newProgressEvent(TransferStartedEvent, 0, totalBytes)
+	publishProgress(listener, event)
+
 	// 启动工作协程
 	arg := workerArg{&bucket, filePath, imur, uploadPartHooker}
 	for w := 1; w <= routines; w++ {
@@ -165,8 +189,13 @@ func (bucket Bucket) uploadFile(objectKey, filePath string, partSize int64, opti
 		case part := <-results:
 			completed++
 			parts[part.PartNumber-1] = part
+			completedBytes += chunks[part.PartNumber-1].Size
+			event = newProgressEvent(TransferDataEvent, completedBytes, totalBytes)
+			publishProgress(listener, event)
 		case err := <-failed:
 			close(die)
+			event = newProgressEvent(TransferFailedEvent, completedBytes, totalBytes)
+			publishProgress(listener, event)
 			bucket.AbortMultipartUpload(imur)
 			return err
 		}
@@ -176,6 +205,9 @@ func (bucket Bucket) uploadFile(objectKey, filePath string, partSize int64, opti
 		}
 	}
 
+	event = newProgressEvent(TransferStartedEvent, completedBytes, totalBytes)
+	publishProgress(listener, event)
+
 	// 提交任务
 	_, err = bucket.CompleteMultipartUpload(imur, parts)
 	if err != nil {
@@ -311,6 +343,17 @@ func (cp *uploadCheckpoint) allParts() []UploadPart {
 	return ps
 }
 
+// 完成的字节数
+func (cp *uploadCheckpoint) getCompletedBytes() int64 {
+	var completedBytes int64
+	for _, part := range cp.Parts {
+		if part.IsCompleted {
+			completedBytes += part.Chunk.Size
+		}
+	}
+	return completedBytes
+}
+
 // 计算文件文件MD5
 func calcFileMD5(filePath string) (string, error) {
 	return "", nil
@@ -378,6 +421,8 @@ func complete(cp *uploadCheckpoint, bucket *Bucket, parts []UploadPart, cpFilePa
 
 // 并发带断点的上传
 func (bucket Bucket) uploadFileWithCp(objectKey, filePath string, partSize int64, options []Option, cpFilePath string, routines int) error {
+	listener := getProgressListener(options)
+
 	// LOAD CP数据
 	ucp := uploadCheckpoint{}
 	err := ucp.load(cpFilePath)
@@ -405,6 +450,10 @@ func (bucket Bucket) uploadFileWithCp(objectKey, filePath string, partSize int64
 	failed := make(chan error)
 	die := make(chan bool)
 
+	completedBytes := ucp.getCompletedBytes()
+	event := newProgressEvent(TransferStartedEvent, completedBytes, ucp.FileStat.Size)
+	publishProgress(listener, event)
+
 	// 启动工作协程
 	arg := workerArg{&bucket, filePath, imur, uploadPartHooker}
 	for w := 1; w <= routines; w++ {
@@ -422,8 +471,13 @@ func (bucket Bucket) uploadFileWithCp(objectKey, filePath string, partSize int64
 			completed++
 			ucp.updatePart(part)
 			ucp.dump(cpFilePath)
+			completedBytes += ucp.Parts[part.PartNumber-1].Chunk.Size
+			event = newProgressEvent(TransferDataEvent, completedBytes, ucp.FileStat.Size)
+			publishProgress(listener, event)
 		case err := <-failed:
 			close(die)
+			event = newProgressEvent(TransferFailedEvent, completedBytes, ucp.FileStat.Size)
+			publishProgress(listener, event)
 			return err
 		}
 
@@ -432,6 +486,9 @@ func (bucket Bucket) uploadFileWithCp(objectKey, filePath string, partSize int64
 		}
 	}
 
+	event = newProgressEvent(TransferCompletedEvent, completedBytes, ucp.FileStat.Size)
+	publishProgress(listener, event)
+
 	// 提交分片上传
 	err = complete(&ucp, &bucket, ucp.allParts(), cpFilePath)
 	return err