progress.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. package oss
  2. import "io"
  3. // ProgressEventType defines transfer progress event type
  4. type ProgressEventType int
  5. const (
  6. // TransferStartedEvent transfer started, set TotalBytes
  7. TransferStartedEvent ProgressEventType = 1 + iota
  8. // TransferDataEvent transfer data, set ConsumedBytes anmd TotalBytes
  9. TransferDataEvent
  10. // TransferCompletedEvent transfer completed
  11. TransferCompletedEvent
  12. // TransferFailedEvent transfer encounters an error
  13. TransferFailedEvent
  14. )
  15. // ProgressEvent defines progress event
  16. type ProgressEvent struct {
  17. ConsumedBytes int64
  18. TotalBytes int64
  19. RwBytes int64
  20. EventType ProgressEventType
  21. }
  22. // ProgressListener listens progress change
  23. type ProgressListener interface {
  24. ProgressChanged(event *ProgressEvent)
  25. }
  26. // -------------------- Private --------------------
  27. func newProgressEvent(eventType ProgressEventType, consumed, total int64, rwBytes int64) *ProgressEvent {
  28. return &ProgressEvent{
  29. ConsumedBytes: consumed,
  30. TotalBytes: total,
  31. RwBytes: rwBytes,
  32. EventType: eventType}
  33. }
  34. // publishProgress
  35. func publishProgress(listener ProgressListener, event *ProgressEvent) {
  36. if listener != nil && event != nil {
  37. listener.ProgressChanged(event)
  38. }
  39. }
  40. type readerTracker struct {
  41. completedBytes int64
  42. }
  43. type teeReader struct {
  44. reader io.Reader
  45. writer io.Writer
  46. listener ProgressListener
  47. consumedBytes int64
  48. totalBytes int64
  49. tracker *readerTracker
  50. }
  51. // TeeReader returns a Reader that writes to w what it reads from r.
  52. // All reads from r performed through it are matched with
  53. // corresponding writes to w. There is no internal buffering -
  54. // the write must complete before the read completes.
  55. // Any error encountered while writing is reported as a read error.
  56. func TeeReader(reader io.Reader, writer io.Writer, totalBytes int64, listener ProgressListener, tracker *readerTracker) io.ReadCloser {
  57. return &teeReader{
  58. reader: reader,
  59. writer: writer,
  60. listener: listener,
  61. consumedBytes: 0,
  62. totalBytes: totalBytes,
  63. tracker: tracker,
  64. }
  65. }
  66. func (t *teeReader) Read(p []byte) (n int, err error) {
  67. n, err = t.reader.Read(p)
  68. // Read encountered error
  69. if err != nil && err != io.EOF {
  70. event := newProgressEvent(TransferFailedEvent, t.consumedBytes, t.totalBytes, 0)
  71. publishProgress(t.listener, event)
  72. }
  73. if n > 0 {
  74. t.consumedBytes += int64(n)
  75. // CRC
  76. if t.writer != nil {
  77. if n, err := t.writer.Write(p[:n]); err != nil {
  78. return n, err
  79. }
  80. }
  81. // Progress
  82. if t.listener != nil {
  83. event := newProgressEvent(TransferDataEvent, t.consumedBytes, t.totalBytes, int64(n))
  84. publishProgress(t.listener, event)
  85. }
  86. // Track
  87. if t.tracker != nil {
  88. t.tracker.completedBytes = t.consumedBytes
  89. }
  90. }
  91. return
  92. }
  93. func (t *teeReader) Close() error {
  94. if rc, ok := t.reader.(io.ReadCloser); ok {
  95. return rc.Close()
  96. }
  97. return nil
  98. }