Pārlūkot izejas kodu

add RwBytes in ProgressEvent for conveniently statistic transfer speed

taowei.wtw 6 gadi atpakaļ
vecāks
revīzija
2f68876ebd
6 mainītis faili ar 159 papildinājumiem un 70 dzēšanām
  1. 6 6
      oss/conn.go
  2. 12 10
      oss/download.go
  3. 12 10
      oss/multicopy.go
  4. 5 3
      oss/progress.go
  5. 110 33
      oss/progress_test.go
  6. 14 8
      oss/upload.go

+ 6 - 6
oss/conn.go

@@ -123,7 +123,7 @@ func (conn Conn) DoURL(method HTTPMethod, signedURL string, headers map[string]s
 	}
 
 	// Transfer started
-	event := newProgressEvent(TransferStartedEvent, 0, req.ContentLength)
+	event := newProgressEvent(TransferStartedEvent, 0, req.ContentLength, 0)
 	publishProgress(listener, event)
 
 	if conn.config.LogLevel >= Debug {
@@ -133,7 +133,7 @@ func (conn Conn) DoURL(method HTTPMethod, signedURL string, headers map[string]s
 	resp, err := conn.client.Do(req)
 	if err != nil {
 		// Transfer failed
-		event = newProgressEvent(TransferFailedEvent, tracker.completedBytes, req.ContentLength)
+		event = newProgressEvent(TransferFailedEvent, tracker.completedBytes, req.ContentLength, 0)
 		publishProgress(listener, event)
 		return nil, err
 	}
@@ -144,7 +144,7 @@ func (conn Conn) DoURL(method HTTPMethod, signedURL string, headers map[string]s
 	}
 
 	// Transfer completed
-	event = newProgressEvent(TransferCompletedEvent, tracker.completedBytes, req.ContentLength)
+	event = newProgressEvent(TransferCompletedEvent, tracker.completedBytes, req.ContentLength, 0)
 	publishProgress(listener, event)
 
 	return conn.handleResponse(resp, crc)
@@ -252,7 +252,7 @@ func (conn Conn) doRequest(method string, uri *url.URL, canonicalizedResource st
 	conn.signHeader(req, canonicalizedResource)
 
 	// Transfer started
-	event := newProgressEvent(TransferStartedEvent, 0, req.ContentLength)
+	event := newProgressEvent(TransferStartedEvent, 0, req.ContentLength, 0)
 	publishProgress(listener, event)
 
 	if conn.config.LogLevel >= Debug {
@@ -263,7 +263,7 @@ func (conn Conn) doRequest(method string, uri *url.URL, canonicalizedResource st
 
 	if err != nil {
 		// Transfer failed
-		event = newProgressEvent(TransferFailedEvent, tracker.completedBytes, req.ContentLength)
+		event = newProgressEvent(TransferFailedEvent, tracker.completedBytes, req.ContentLength, 0)
 		publishProgress(listener, event)
 		return nil, err
 	}
@@ -274,7 +274,7 @@ func (conn Conn) doRequest(method string, uri *url.URL, canonicalizedResource st
 	}
 
 	// Transfer completed
-	event = newProgressEvent(TransferCompletedEvent, tracker.completedBytes, req.ContentLength)
+	event = newProgressEvent(TransferCompletedEvent, tracker.completedBytes, req.ContentLength, 0)
 	publishProgress(listener, event)
 
 	return conn.handleResponse(resp, crc)

+ 12 - 10
oss/download.go

@@ -263,7 +263,7 @@ func (bucket Bucket) downloadFile(objectKey, filePath string, partSize int64, op
 
 	var completedBytes int64
 	totalBytes := getObjectBytes(parts)
-	event := newProgressEvent(TransferStartedEvent, 0, totalBytes)
+	event := newProgressEvent(TransferStartedEvent, 0, totalBytes, 0)
 	publishProgress(listener, event)
 
 	// Start the download workers
@@ -281,13 +281,14 @@ func (bucket Bucket) downloadFile(objectKey, filePath string, partSize int64, op
 		select {
 		case part := <-results:
 			completed++
-			completedBytes += (part.End - part.Start + 1)
+			downBytes := (part.End - part.Start + 1)
+			completedBytes += downBytes
 			parts[part.Index].CRC64 = part.CRC64
-			event = newProgressEvent(TransferDataEvent, completedBytes, totalBytes)
+			event = newProgressEvent(TransferDataEvent, completedBytes, totalBytes, downBytes)
 			publishProgress(listener, event)
 		case err := <-failed:
 			close(die)
-			event = newProgressEvent(TransferFailedEvent, completedBytes, totalBytes)
+			event = newProgressEvent(TransferFailedEvent, completedBytes, totalBytes, 0)
 			publishProgress(listener, event)
 			return err
 		}
@@ -297,7 +298,7 @@ func (bucket Bucket) downloadFile(objectKey, filePath string, partSize int64, op
 		}
 	}
 
-	event = newProgressEvent(TransferCompletedEvent, completedBytes, totalBytes)
+	event = newProgressEvent(TransferCompletedEvent, completedBytes, totalBytes, 0)
 	publishProgress(listener, event)
 
 	if enableCRC {
@@ -510,7 +511,7 @@ func (bucket Bucket) downloadFileWithCp(objectKey, filePath string, partSize int
 	die := make(chan bool)
 
 	completedBytes := dcp.getCompletedBytes()
-	event := newProgressEvent(TransferStartedEvent, completedBytes, dcp.ObjStat.Size)
+	event := newProgressEvent(TransferStartedEvent, completedBytes, dcp.ObjStat.Size, 0)
 	publishProgress(listener, event)
 
 	// Start the download workers routine
@@ -531,12 +532,13 @@ func (bucket Bucket) downloadFileWithCp(objectKey, filePath string, partSize int
 			dcp.PartStat[part.Index] = true
 			dcp.Parts[part.Index].CRC64 = part.CRC64
 			dcp.dump(cpFilePath)
-			completedBytes += (part.End - part.Start + 1)
-			event = newProgressEvent(TransferDataEvent, completedBytes, dcp.ObjStat.Size)
+			downBytes := (part.End - part.Start + 1)
+			completedBytes += downBytes
+			event = newProgressEvent(TransferDataEvent, completedBytes, dcp.ObjStat.Size, downBytes)
 			publishProgress(listener, event)
 		case err := <-failed:
 			close(die)
-			event = newProgressEvent(TransferFailedEvent, completedBytes, dcp.ObjStat.Size)
+			event = newProgressEvent(TransferFailedEvent, completedBytes, dcp.ObjStat.Size, 0)
 			publishProgress(listener, event)
 			return err
 		}
@@ -546,7 +548,7 @@ func (bucket Bucket) downloadFileWithCp(objectKey, filePath string, partSize int
 		}
 	}
 
-	event = newProgressEvent(TransferCompletedEvent, completedBytes, dcp.ObjStat.Size)
+	event = newProgressEvent(TransferCompletedEvent, completedBytes, dcp.ObjStat.Size, 0)
 	publishProgress(listener, event)
 
 	if dcp.enableCRC {

+ 12 - 10
oss/multicopy.go

@@ -169,7 +169,7 @@ func (bucket Bucket) copyFile(srcBucketName, srcObjectKey, destBucketName, destO
 
 	var completedBytes int64
 	totalBytes := getSrcObjectBytes(parts)
-	event := newProgressEvent(TransferStartedEvent, 0, totalBytes)
+	event := newProgressEvent(TransferStartedEvent, 0, totalBytes, 0)
 	publishProgress(listener, event)
 
 	// Start to copy workers
@@ -189,13 +189,14 @@ func (bucket Bucket) copyFile(srcBucketName, srcObjectKey, destBucketName, destO
 		case part := <-results:
 			completed++
 			ups[part.PartNumber-1] = part
-			completedBytes += (parts[part.PartNumber-1].End - parts[part.PartNumber-1].Start + 1)
-			event = newProgressEvent(TransferDataEvent, completedBytes, totalBytes)
+			copyBytes := (parts[part.PartNumber-1].End - parts[part.PartNumber-1].Start + 1)
+			completedBytes += copyBytes
+			event = newProgressEvent(TransferDataEvent, completedBytes, totalBytes, copyBytes)
 			publishProgress(listener, event)
 		case err := <-failed:
 			close(die)
 			descBucket.AbortMultipartUpload(imur, options...)
-			event = newProgressEvent(TransferFailedEvent, completedBytes, totalBytes)
+			event = newProgressEvent(TransferFailedEvent, completedBytes, totalBytes, 0)
 			publishProgress(listener, event)
 			return err
 		}
@@ -205,7 +206,7 @@ func (bucket Bucket) copyFile(srcBucketName, srcObjectKey, destBucketName, destO
 		}
 	}
 
-	event = newProgressEvent(TransferCompletedEvent, completedBytes, totalBytes)
+	event = newProgressEvent(TransferCompletedEvent, completedBytes, totalBytes, 0)
 	publishProgress(listener, event)
 
 	// Complete the multipart upload
@@ -418,7 +419,7 @@ func (bucket Bucket) copyFileWithCp(srcBucketName, srcObjectKey, destBucketName,
 	die := make(chan bool)
 
 	completedBytes := ccp.getCompletedBytes()
-	event := newProgressEvent(TransferStartedEvent, completedBytes, ccp.ObjStat.Size)
+	event := newProgressEvent(TransferStartedEvent, completedBytes, ccp.ObjStat.Size, 0)
 	publishProgress(listener, event)
 
 	// Start the worker coroutines
@@ -438,12 +439,13 @@ func (bucket Bucket) copyFileWithCp(srcBucketName, srcObjectKey, destBucketName,
 			completed++
 			ccp.update(part)
 			ccp.dump(cpFilePath)
-			completedBytes += (parts[part.PartNumber-1].End - parts[part.PartNumber-1].Start + 1)
-			event = newProgressEvent(TransferDataEvent, completedBytes, ccp.ObjStat.Size)
+			copyBytes := (parts[part.PartNumber-1].End - parts[part.PartNumber-1].Start + 1)
+			completedBytes += copyBytes
+			event = newProgressEvent(TransferDataEvent, completedBytes, ccp.ObjStat.Size, copyBytes)
 			publishProgress(listener, event)
 		case err := <-failed:
 			close(die)
-			event = newProgressEvent(TransferFailedEvent, completedBytes, ccp.ObjStat.Size)
+			event = newProgressEvent(TransferFailedEvent, completedBytes, ccp.ObjStat.Size, 0)
 			publishProgress(listener, event)
 			return err
 		}
@@ -453,7 +455,7 @@ func (bucket Bucket) copyFileWithCp(srcBucketName, srcObjectKey, destBucketName,
 		}
 	}
 
-	event = newProgressEvent(TransferCompletedEvent, completedBytes, ccp.ObjStat.Size)
+	event = newProgressEvent(TransferCompletedEvent, completedBytes, ccp.ObjStat.Size,0)
 	publishProgress(listener, event)
 
 	return ccp.complete(descBucket, ccp.CopyParts, cpFilePath, options)

+ 5 - 3
oss/progress.go

@@ -20,6 +20,7 @@ const (
 type ProgressEvent struct {
 	ConsumedBytes int64
 	TotalBytes    int64
+	RwBytes       int64
 	EventType     ProgressEventType
 }
 
@@ -30,10 +31,11 @@ type ProgressListener interface {
 
 // -------------------- Private --------------------
 
-func newProgressEvent(eventType ProgressEventType, consumed, total int64) *ProgressEvent {
+func newProgressEvent(eventType ProgressEventType, consumed, total int64, rwBytes int64) *ProgressEvent {
 	return &ProgressEvent{
 		ConsumedBytes: consumed,
 		TotalBytes:    total,
+		RwBytes:       rwBytes,
 		EventType:     eventType}
 }
 
@@ -78,7 +80,7 @@ func (t *teeReader) Read(p []byte) (n int, err error) {
 
 	// Read encountered error
 	if err != nil && err != io.EOF {
-		event := newProgressEvent(TransferFailedEvent, t.consumedBytes, t.totalBytes)
+		event := newProgressEvent(TransferFailedEvent, t.consumedBytes, t.totalBytes, 0)
 		publishProgress(t.listener, event)
 	}
 
@@ -92,7 +94,7 @@ func (t *teeReader) Read(p []byte) (n int, err error) {
 		}
 		// Progress
 		if t.listener != nil {
-			event := newProgressEvent(TransferDataEvent, t.consumedBytes, t.totalBytes)
+			event := newProgressEvent(TransferDataEvent, t.consumedBytes, t.totalBytes, int64(n))
 			publishProgress(t.listener, event)
 		}
 		// Track

+ 110 - 33
oss/progress_test.go

@@ -102,6 +102,7 @@ func (s *OssProgressSuite) TearDownTest(c *C) {
 
 // OssProgressListener is the progress listener
 type OssProgressListener struct {
+	TotalRwBytes int64
 }
 
 // ProgressChanged handles progress event
@@ -111,6 +112,7 @@ func (listener *OssProgressListener) ProgressChanged(event *ProgressEvent) {
 		testLogger.Printf("Transfer Started, ConsumedBytes: %d, TotalBytes %d.\n",
 			event.ConsumedBytes, event.TotalBytes)
 	case TransferDataEvent:
+		listener.TotalRwBytes += event.RwBytes
 		testLogger.Printf("Transfer Data, ConsumedBytes: %d, TotalBytes %d, %d%%.\n",
 			event.ConsumedBytes, event.TotalBytes, event.ConsumedBytes*100/event.TotalBytes)
 	case TransferCompletedEvent:
@@ -128,17 +130,24 @@ func (s *OssProgressSuite) TestPutObject(c *C) {
 	objectName := randStr(8) + ".jpg"
 	localFile := "../sample/The Go Programming Language.html"
 
+	fileInfo, err := os.Stat(localFile)
+	c.Assert(err, IsNil)
+
 	// PutObject
 	fd, err := os.Open(localFile)
 	c.Assert(err, IsNil)
 	defer fd.Close()
 
-	err = s.bucket.PutObject(objectName, fd, Progress(&OssProgressListener{}))
+	progressListener := OssProgressListener{}
+	err = s.bucket.PutObject(objectName, fd, Progress(&progressListener))
 	c.Assert(err, IsNil)
+	c.Assert(progressListener.TotalRwBytes, Equals, fileInfo.Size())
 
 	// PutObjectFromFile
-	err = s.bucket.PutObjectFromFile(objectName, localFile, Progress(&OssProgressListener{}))
+	progressListener.TotalRwBytes = 0
+	err = s.bucket.PutObjectFromFile(objectName, localFile, Progress(&progressListener))
 	c.Assert(err, IsNil)
+	c.Assert(progressListener.TotalRwBytes, Equals, fileInfo.Size())
 
 	// DoPutObject
 	fd, err = os.Open(localFile)
@@ -150,13 +159,17 @@ func (s *OssProgressSuite) TestPutObject(c *C) {
 		Reader:    fd,
 	}
 
-	options := []Option{Progress(&OssProgressListener{})}
+	progressListener.TotalRwBytes = 0
+	options := []Option{Progress(&progressListener)}
 	_, err = s.bucket.DoPutObject(request, options)
 	c.Assert(err, IsNil)
+	c.Assert(progressListener.TotalRwBytes, Equals, fileInfo.Size())
 
 	// PutObject size is 0
-	err = s.bucket.PutObject(objectName, strings.NewReader(""), Progress(&OssProgressListener{}))
+	progressListener.TotalRwBytes = 0
+	err = s.bucket.PutObject(objectName, strings.NewReader(""), Progress(&progressListener))
 	c.Assert(err, IsNil)
+	c.Assert(progressListener.TotalRwBytes, Equals, int64(0))
 
 	testLogger.Println("OssProgressSuite.TestPutObject")
 }
@@ -169,7 +182,8 @@ func (s *OssProgressSuite) TestSignURL(c *C) {
 	createFile(filePath, content, c)
 
 	// Sign URL for put
-	str, err := s.bucket.SignURL(objectName, HTTPPut, 60, Progress(&OssProgressListener{}))
+	progressListener := OssProgressListener{}
+	str, err := s.bucket.SignURL(objectName, HTTPPut, 60, Progress(&progressListener))
 	c.Assert(err, IsNil)
 	c.Assert(strings.Contains(str, HTTPParamExpires+"="), Equals, true)
 	c.Assert(strings.Contains(str, HTTPParamAccessKeyID+"="), Equals, true)
@@ -180,42 +194,52 @@ func (s *OssProgressSuite) TestSignURL(c *C) {
 	c.Assert(err, IsNil)
 	defer fd.Close()
 
-	err = s.bucket.PutObjectWithURL(str, fd, Progress(&OssProgressListener{}))
+	err = s.bucket.PutObjectWithURL(str, fd, Progress(&progressListener))
 	c.Assert(err, IsNil)
+	c.Assert(progressListener.TotalRwBytes, Equals, int64(len(content)))
 
 	// Put object from file with URL
-	err = s.bucket.PutObjectFromFileWithURL(str, filePath, Progress(&OssProgressListener{}))
+	progressListener.TotalRwBytes = 0
+	err = s.bucket.PutObjectFromFileWithURL(str, filePath, Progress(&progressListener))
 	c.Assert(err, IsNil)
+	c.Assert(progressListener.TotalRwBytes, Equals, int64(len(content)))
 
 	// DoPutObject
 	fd, err = os.Open(filePath)
 	c.Assert(err, IsNil)
 	defer fd.Close()
 
-	options := []Option{Progress(&OssProgressListener{})}
+	progressListener.TotalRwBytes = 0
+	options := []Option{Progress(&progressListener)}
 	_, err = s.bucket.DoPutObjectWithURL(str, fd, options)
 	c.Assert(err, IsNil)
+	c.Assert(progressListener.TotalRwBytes, Equals, int64(len(content)))
 
 	// Sign URL for get
-	str, err = s.bucket.SignURL(objectName, HTTPGet, 60, Progress(&OssProgressListener{}))
+	str, err = s.bucket.SignURL(objectName, HTTPGet, 60, Progress(&progressListener))
 	c.Assert(err, IsNil)
 	c.Assert(strings.Contains(str, HTTPParamExpires+"="), Equals, true)
 	c.Assert(strings.Contains(str, HTTPParamAccessKeyID+"="), Equals, true)
 	c.Assert(strings.Contains(str, HTTPParamSignature+"="), Equals, true)
 
 	// Get object with URL
-	body, err := s.bucket.GetObjectWithURL(str, Progress(&OssProgressListener{}))
+	progressListener.TotalRwBytes = 0
+	body, err := s.bucket.GetObjectWithURL(str, Progress(&progressListener))
 	c.Assert(err, IsNil)
 	str, err = readBody(body)
 	c.Assert(err, IsNil)
 	c.Assert(str, Equals, content)
+	c.Assert(progressListener.TotalRwBytes, Equals, int64(len(content)))
 
 	// Get object to file with URL
-	str, err = s.bucket.SignURL(objectName, HTTPGet, 10, Progress(&OssProgressListener{}))
+	progressListener.TotalRwBytes = 0
+	str, err = s.bucket.SignURL(objectName, HTTPGet, 10, Progress(&progressListener))
 	c.Assert(err, IsNil)
 
 	newFile := randStr(10)
-	err = s.bucket.GetObjectToFileWithURL(str, newFile, Progress(&OssProgressListener{}))
+	progressListener.TotalRwBytes = 0
+	err = s.bucket.GetObjectToFileWithURL(str, newFile, Progress(&progressListener))
+	c.Assert(progressListener.TotalRwBytes, Equals, int64(len(content)))
 	c.Assert(err, IsNil)
 	eq, err := compareFiles(filePath, newFile)
 	c.Assert(err, IsNil)
@@ -251,14 +275,16 @@ func (s *OssProgressSuite) TestPutObjectNegative(c *C) {
 // TestAppendObject
 func (s *OssProgressSuite) TestAppendObject(c *C) {
 	objectName := objectNamePrefix + randStr(8)
-	objectValue := "昨夜雨疏风骤,浓睡不消残酒。试问卷帘人,却道海棠依旧。知否?知否?应是绿肥红瘦。"
+	objectValue := randStr(100)
 	var val = []byte(objectValue)
 	var nextPos int64
 	var midPos = 1 + rand.Intn(len(val)-1)
 
 	// AppendObject
-	nextPos, err := s.bucket.AppendObject(objectName, bytes.NewReader(val[0:midPos]), nextPos, Progress(&OssProgressListener{}))
+	progressListener := OssProgressListener{}
+	nextPos, err := s.bucket.AppendObject(objectName, bytes.NewReader(val[0:midPos]), nextPos, Progress(&progressListener))
 	c.Assert(err, IsNil)
+	c.Assert(progressListener.TotalRwBytes, Equals, nextPos)
 
 	// DoAppendObject
 	request := &AppendObjectRequest{
@@ -278,6 +304,9 @@ func (s *OssProgressSuite) TestMultipartUpload(c *C) {
 	objectName := objectNamePrefix + randStr(8)
 	var fileName = "../sample/BingWallpaper-2015-11-07.jpg"
 
+	fileInfo, err := os.Stat(fileName)
+	c.Assert(err, IsNil)
+
 	chunks, err := SplitFileByPartNum(fileName, 3)
 	c.Assert(err, IsNil)
 	testLogger.Println("chunks:", chunks)
@@ -287,6 +316,7 @@ func (s *OssProgressSuite) TestMultipartUpload(c *C) {
 	defer fd.Close()
 
 	// Initiate
+	progressListener := OssProgressListener{}
 	imur, err := s.bucket.InitiateMultipartUpload(objectName)
 	c.Assert(err, IsNil)
 
@@ -294,7 +324,7 @@ func (s *OssProgressSuite) TestMultipartUpload(c *C) {
 	var parts []UploadPart
 	for _, chunk := range chunks {
 		fd.Seek(chunk.Offset, os.SEEK_SET)
-		part, err := s.bucket.UploadPart(imur, fd, chunk.Size, chunk.Number, Progress(&OssProgressListener{}))
+		part, err := s.bucket.UploadPart(imur, fd, chunk.Size, chunk.Number, Progress(&progressListener))
 		c.Assert(err, IsNil)
 		parts = append(parts, part)
 	}
@@ -302,6 +332,7 @@ func (s *OssProgressSuite) TestMultipartUpload(c *C) {
 	// Complete
 	_, err = s.bucket.CompleteMultipartUpload(imur, parts)
 	c.Assert(err, IsNil)
+	c.Assert(progressListener.TotalRwBytes, Equals, fileInfo.Size())
 
 	err = s.bucket.DeleteObject(objectName)
 	c.Assert(err, IsNil)
@@ -313,6 +344,8 @@ func (s *OssProgressSuite) TestMultipartUpload(c *C) {
 func (s *OssProgressSuite) TestMultipartUploadFromFile(c *C) {
 	objectName := objectNamePrefix + randStr(8)
 	var fileName = "../sample/BingWallpaper-2015-11-07.jpg"
+	fileInfo, err := os.Stat(fileName)
+	c.Assert(err, IsNil)
 
 	chunks, err := SplitFileByPartNum(fileName, 3)
 	c.Assert(err, IsNil)
@@ -322,9 +355,10 @@ func (s *OssProgressSuite) TestMultipartUploadFromFile(c *C) {
 	c.Assert(err, IsNil)
 
 	// UploadPart
+	progressListener := OssProgressListener{}
 	var parts []UploadPart
 	for _, chunk := range chunks {
-		part, err := s.bucket.UploadPartFromFile(imur, fileName, chunk.Offset, chunk.Size, chunk.Number, Progress(&OssProgressListener{}))
+		part, err := s.bucket.UploadPartFromFile(imur, fileName, chunk.Offset, chunk.Size, chunk.Number, Progress(&progressListener))
 		c.Assert(err, IsNil)
 		parts = append(parts, part)
 	}
@@ -332,6 +366,7 @@ func (s *OssProgressSuite) TestMultipartUploadFromFile(c *C) {
 	// Complete
 	_, err = s.bucket.CompleteMultipartUpload(imur, parts)
 	c.Assert(err, IsNil)
+	c.Assert(progressListener.TotalRwBytes, Equals, fileInfo.Size())
 
 	err = s.bucket.DeleteObject(objectName)
 	c.Assert(err, IsNil)
@@ -345,47 +380,64 @@ func (s *OssProgressSuite) TestGetObject(c *C) {
 	localFile := "../sample/BingWallpaper-2015-11-07.jpg"
 	newFile := "newpic-progress-1.jpg"
 
+	fileInfo, err := os.Stat(localFile)
+	c.Assert(err, IsNil)
+
+	progressListener := OssProgressListener{}
 	// PutObject
-	err := s.bucket.PutObjectFromFile(objectName, localFile, Progress(&OssProgressListener{}))
+	err = s.bucket.PutObjectFromFile(objectName, localFile, Progress(&progressListener))
 	c.Assert(err, IsNil)
+	c.Assert(progressListener.TotalRwBytes, Equals, fileInfo.Size())
 
 	// GetObject
-	body, err := s.bucket.GetObject(objectName, Progress(&OssProgressListener{}))
+	progressListener.TotalRwBytes = 0
+	body, err := s.bucket.GetObject(objectName, Progress(&progressListener))
 	c.Assert(err, IsNil)
 	_, err = ioutil.ReadAll(body)
 	c.Assert(err, IsNil)
 	body.Close()
+	c.Assert(progressListener.TotalRwBytes, Equals, fileInfo.Size())
 
 	// GetObjectToFile
-	err = s.bucket.GetObjectToFile(objectName, newFile, Progress(&OssProgressListener{}))
+	progressListener.TotalRwBytes = 0
+	err = s.bucket.GetObjectToFile(objectName, newFile, Progress(&progressListener))
 	c.Assert(err, IsNil)
+	c.Assert(progressListener.TotalRwBytes, Equals, fileInfo.Size())
 
 	// DoGetObject
+	progressListener.TotalRwBytes = 0
 	request := &GetObjectRequest{objectName}
-	options := []Option{Progress(&OssProgressListener{})}
+	options := []Option{Progress(&progressListener)}
 	result, err := s.bucket.DoGetObject(request, options)
 	c.Assert(err, IsNil)
 	_, err = ioutil.ReadAll(result.Response.Body)
 	c.Assert(err, IsNil)
 	result.Response.Body.Close()
+	c.Assert(progressListener.TotalRwBytes, Equals, fileInfo.Size())
 
 	// GetObject with range
-	body, err = s.bucket.GetObject(objectName, Range(1024, 4*1024), Progress(&OssProgressListener{}))
+	progressListener.TotalRwBytes = 0
+	body, err = s.bucket.GetObject(objectName, Range(1024, 4*1024), Progress(&progressListener))
 	c.Assert(err, IsNil)
-	_, err = ioutil.ReadAll(body)
+	text, err := ioutil.ReadAll(body)
 	c.Assert(err, IsNil)
 	body.Close()
+	c.Assert(progressListener.TotalRwBytes, Equals, int64(len(text)))
 
 	// PutObject size is 0
-	err = s.bucket.PutObject(objectName, strings.NewReader(""), Progress(&OssProgressListener{}))
+	progressListener.TotalRwBytes = 0
+	err = s.bucket.PutObject(objectName, strings.NewReader(""), Progress(&progressListener))
 	c.Assert(err, IsNil)
+	c.Assert(progressListener.TotalRwBytes, Equals, int64(0))
 
 	// GetObject size is 0
-	body, err = s.bucket.GetObject(objectName, Progress(&OssProgressListener{}))
+	progressListener.TotalRwBytes = 0
+	body, err = s.bucket.GetObject(objectName, Progress(&progressListener))
 	c.Assert(err, IsNil)
 	_, err = ioutil.ReadAll(body)
 	c.Assert(err, IsNil)
 	body.Close()
+	c.Assert(progressListener.TotalRwBytes, Equals, int64(0))
 
 	testLogger.Println("OssProgressSuite.TestGetObject")
 }
@@ -425,11 +477,18 @@ func (s *OssProgressSuite) TestUploadFile(c *C) {
 	objectName := objectNamePrefix + randStr(8)
 	fileName := "../sample/BingWallpaper-2015-11-07.jpg"
 
-	err := s.bucket.UploadFile(objectName, fileName, 100*1024, Routines(5), Progress(&OssProgressListener{}))
+	fileInfo, err := os.Stat(fileName)
+	c.Assert(err, IsNil)
+
+	progressListener := OssProgressListener{}
+	err = s.bucket.UploadFile(objectName, fileName, 100*1024, Routines(5), Progress(&progressListener))
 	c.Assert(err, IsNil)
+	c.Assert(progressListener.TotalRwBytes, Equals, fileInfo.Size())
 
-	err = s.bucket.UploadFile(objectName, fileName, 100*1024, Routines(3), Checkpoint(true, objectName+".cp"), Progress(&OssProgressListener{}))
+	progressListener.TotalRwBytes = 0
+	err = s.bucket.UploadFile(objectName, fileName, 100*1024, Routines(3), Checkpoint(true, objectName+".cp"), Progress(&progressListener))
 	c.Assert(err, IsNil)
+	c.Assert(progressListener.TotalRwBytes, Equals, fileInfo.Size())
 
 	testLogger.Println("OssProgressSuite.TestUploadFile")
 }
@@ -440,18 +499,27 @@ func (s *OssProgressSuite) TestDownloadFile(c *C) {
 	fileName := "../sample/BingWallpaper-2015-11-07.jpg"
 	newFile := "down-new-file-progress-2.jpg"
 
+	fileInfo, err := os.Stat(fileName)
+	c.Assert(err, IsNil)
+
 	// Upload
-	err := s.bucket.UploadFile(objectName, fileName, 100*1024, Routines(3))
+	err = s.bucket.UploadFile(objectName, fileName, 100*1024, Routines(3))
 	c.Assert(err, IsNil)
 
-	err = s.bucket.DownloadFile(objectName, newFile, 100*1024, Routines(5), Progress(&OssProgressListener{}))
+	progressListener := OssProgressListener{}
+	err = s.bucket.DownloadFile(objectName, newFile, 100*1024, Routines(5), Progress(&progressListener))
 	c.Assert(err, IsNil)
+	c.Assert(progressListener.TotalRwBytes, Equals, fileInfo.Size())
 
-	err = s.bucket.DownloadFile(objectName, newFile, 1024*1024, Routines(3), Progress(&OssProgressListener{}))
+	progressListener.TotalRwBytes = 0
+	err = s.bucket.DownloadFile(objectName, newFile, 1024*1024, Routines(3), Progress(&progressListener))
 	c.Assert(err, IsNil)
+	c.Assert(progressListener.TotalRwBytes, Equals, fileInfo.Size())
 
-	err = s.bucket.DownloadFile(objectName, newFile, 50*1024, Routines(3), Checkpoint(true, ""), Progress(&OssProgressListener{}))
+	progressListener.TotalRwBytes = 0
+	err = s.bucket.DownloadFile(objectName, newFile, 50*1024, Routines(3), Checkpoint(true, ""), Progress(&progressListener))
 	c.Assert(err, IsNil)
+	c.Assert(progressListener.TotalRwBytes, Equals, fileInfo.Size())
 
 	testLogger.Println("OssProgressSuite.TestDownloadFile")
 }
@@ -462,15 +530,24 @@ func (s *OssProgressSuite) TestCopyFile(c *C) {
 	destObjectName := srcObjectName + "-copy"
 	fileName := "../sample/BingWallpaper-2015-11-07.jpg"
 
+	fileInfo, err := os.Stat(fileName)
+	c.Assert(err, IsNil)
+
 	// Upload
-	err := s.bucket.UploadFile(srcObjectName, fileName, 100*1024, Routines(3))
+	progressListener := OssProgressListener{}
+	err = s.bucket.UploadFile(srcObjectName, fileName, 100*1024, Routines(3), Progress(&progressListener))
 	c.Assert(err, IsNil)
+	c.Assert(progressListener.TotalRwBytes, Equals, fileInfo.Size())
 
-	err = s.bucket.CopyFile(bucketName, srcObjectName, destObjectName, 100*1024, Routines(5), Progress(&OssProgressListener{}))
+	progressListener.TotalRwBytes = 0
+	err = s.bucket.CopyFile(bucketName, srcObjectName, destObjectName, 100*1024, Routines(5), Progress(&progressListener))
 	c.Assert(err, IsNil)
+	c.Assert(progressListener.TotalRwBytes, Equals, fileInfo.Size())
 
-	err = s.bucket.CopyFile(bucketName, srcObjectName, destObjectName, 1024*100, Routines(3), Checkpoint(true, ""), Progress(&OssProgressListener{}))
+	progressListener.TotalRwBytes = 0
+	err = s.bucket.CopyFile(bucketName, srcObjectName, destObjectName, 1024*100, Routines(3), Checkpoint(true, ""), Progress(&progressListener))
 	c.Assert(err, IsNil)
+	c.Assert(progressListener.TotalRwBytes, Equals, fileInfo.Size())
 
 	testLogger.Println("OssProgressSuite.TestCopyFile")
 }

+ 14 - 8
oss/upload.go

@@ -188,7 +188,7 @@ func (bucket Bucket) uploadFile(objectKey, filePath string, partSize int64, opti
 
 	var completedBytes int64
 	totalBytes := getTotalBytes(chunks)
-	event := newProgressEvent(TransferStartedEvent, 0, totalBytes)
+	event := newProgressEvent(TransferStartedEvent, 0, totalBytes, 0)
 	publishProgress(listener, event)
 
 	// Start the worker coroutine
@@ -209,11 +209,14 @@ func (bucket Bucket) uploadFile(objectKey, filePath string, partSize int64, opti
 			completed++
 			parts[part.PartNumber-1] = part
 			completedBytes += chunks[part.PartNumber-1].Size
-			event = newProgressEvent(TransferDataEvent, completedBytes, totalBytes)
+
+			// why RwBytes in ProgressEvent is 0 ?
+			// because read or write event has been notified in teeReader.Read()
+			event = newProgressEvent(TransferDataEvent, completedBytes, totalBytes, 0)
 			publishProgress(listener, event)
 		case err := <-failed:
 			close(die)
-			event = newProgressEvent(TransferFailedEvent, completedBytes, totalBytes)
+			event = newProgressEvent(TransferFailedEvent, completedBytes, totalBytes, 0)
 			publishProgress(listener, event)
 			bucket.AbortMultipartUpload(imur, options...)
 			return err
@@ -224,7 +227,7 @@ func (bucket Bucket) uploadFile(objectKey, filePath string, partSize int64, opti
 		}
 	}
 
-	event = newProgressEvent(TransferStartedEvent, completedBytes, totalBytes)
+	event = newProgressEvent(TransferStartedEvent, completedBytes, totalBytes, 0)
 	publishProgress(listener, event)
 
 	// Complete the multpart upload
@@ -470,7 +473,10 @@ func (bucket Bucket) uploadFileWithCp(objectKey, filePath string, partSize int64
 	die := make(chan bool)
 
 	completedBytes := ucp.getCompletedBytes()
-	event := newProgressEvent(TransferStartedEvent, completedBytes, ucp.FileStat.Size)
+
+	// why RwBytes in ProgressEvent is 0 ?
+	// because read or write event has been notified in teeReader.Read()
+	event := newProgressEvent(TransferStartedEvent, completedBytes, ucp.FileStat.Size, 0)
 	publishProgress(listener, event)
 
 	// Start the workers
@@ -491,11 +497,11 @@ func (bucket Bucket) uploadFileWithCp(objectKey, filePath string, partSize int64
 			ucp.updatePart(part)
 			ucp.dump(cpFilePath)
 			completedBytes += ucp.Parts[part.PartNumber-1].Chunk.Size
-			event = newProgressEvent(TransferDataEvent, completedBytes, ucp.FileStat.Size)
+			event = newProgressEvent(TransferDataEvent, completedBytes, ucp.FileStat.Size, 0)
 			publishProgress(listener, event)
 		case err := <-failed:
 			close(die)
-			event = newProgressEvent(TransferFailedEvent, completedBytes, ucp.FileStat.Size)
+			event = newProgressEvent(TransferFailedEvent, completedBytes, ucp.FileStat.Size, 0)
 			publishProgress(listener, event)
 			return err
 		}
@@ -505,7 +511,7 @@ func (bucket Bucket) uploadFileWithCp(objectKey, filePath string, partSize int64
 		}
 	}
 
-	event = newProgressEvent(TransferCompletedEvent, completedBytes, ucp.FileStat.Size)
+	event = newProgressEvent(TransferCompletedEvent, completedBytes, ucp.FileStat.Size, 0)
 	publishProgress(listener, event)
 
 	// Complete the multipart upload