123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114 |
- package oss
- import "io"
- // ProgressEventType defines transfer progress event type
- type ProgressEventType int
- const (
- // TransferStartedEvent transfer started, set TotalBytes
- TransferStartedEvent ProgressEventType = 1 + iota
- // TransferDataEvent transfer data, set ConsumedBytes anmd TotalBytes
- TransferDataEvent
- // TransferCompletedEvent transfer completed
- TransferCompletedEvent
- // TransferFailedEvent transfer encounters an error
- TransferFailedEvent
- )
- // ProgressEvent defines progress event
- type ProgressEvent struct {
- ConsumedBytes int64
- TotalBytes int64
- RwBytes int64
- EventType ProgressEventType
- }
- // ProgressListener listens progress change
- type ProgressListener interface {
- ProgressChanged(event *ProgressEvent)
- }
- // -------------------- Private --------------------
- func newProgressEvent(eventType ProgressEventType, consumed, total int64, rwBytes int64) *ProgressEvent {
- return &ProgressEvent{
- ConsumedBytes: consumed,
- TotalBytes: total,
- RwBytes: rwBytes,
- EventType: eventType}
- }
- // publishProgress
- func publishProgress(listener ProgressListener, event *ProgressEvent) {
- if listener != nil && event != nil {
- listener.ProgressChanged(event)
- }
- }
- type readerTracker struct {
- completedBytes int64
- }
- type teeReader struct {
- reader io.Reader
- writer io.Writer
- listener ProgressListener
- consumedBytes int64
- totalBytes int64
- tracker *readerTracker
- }
- // TeeReader returns a Reader that writes to w what it reads from r.
- // All reads from r performed through it are matched with
- // corresponding writes to w. There is no internal buffering -
- // the write must complete before the read completes.
- // Any error encountered while writing is reported as a read error.
- func TeeReader(reader io.Reader, writer io.Writer, totalBytes int64, listener ProgressListener, tracker *readerTracker) io.ReadCloser {
- return &teeReader{
- reader: reader,
- writer: writer,
- listener: listener,
- consumedBytes: 0,
- totalBytes: totalBytes,
- tracker: tracker,
- }
- }
- func (t *teeReader) Read(p []byte) (n int, err error) {
- n, err = t.reader.Read(p)
- // Read encountered error
- if err != nil && err != io.EOF {
- event := newProgressEvent(TransferFailedEvent, t.consumedBytes, t.totalBytes, 0)
- publishProgress(t.listener, event)
- }
- if n > 0 {
- t.consumedBytes += int64(n)
- // CRC
- if t.writer != nil {
- if n, err := t.writer.Write(p[:n]); err != nil {
- return n, err
- }
- }
- // Progress
- if t.listener != nil {
- event := newProgressEvent(TransferDataEvent, t.consumedBytes, t.totalBytes, int64(n))
- publishProgress(t.listener, event)
- }
- // Track
- if t.tracker != nil {
- t.tracker.completedBytes = t.consumedBytes
- }
- }
- return
- }
- func (t *teeReader) Close() error {
- if rc, ok := t.reader.(io.ReadCloser); ok {
- return rc.Close()
- }
- return nil
- }
|