progress.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. package oss
  2. import "io"
  3. // ProgressEventType defines transfer progress event type
  4. type ProgressEventType int
  5. const (
  6. // TransferStartedEvent transfer started, set TotalBytes
  7. TransferStartedEvent ProgressEventType = 1 + iota
  8. // TransferDataEvent transfer data, set ConsumedBytes anmd TotalBytes
  9. TransferDataEvent
  10. // TransferCompletedEvent transfer completed
  11. TransferCompletedEvent
  12. // TransferFailedEvent transfer encounters an error
  13. TransferFailedEvent
  14. )
  15. // ProgressEvent defines progress event
  16. type ProgressEvent struct {
  17. ConsumedBytes int64
  18. TotalBytes int64
  19. EventType ProgressEventType
  20. }
  21. // ProgressListener listens progress change
  22. type ProgressListener interface {
  23. ProgressChanged(event *ProgressEvent)
  24. }
  25. // -------------------- Private --------------------
  26. func newProgressEvent(eventType ProgressEventType, consumed, total int64) *ProgressEvent {
  27. return &ProgressEvent{
  28. ConsumedBytes: consumed,
  29. TotalBytes: total,
  30. EventType: eventType}
  31. }
  32. // publishProgress
  33. func publishProgress(listener ProgressListener, event *ProgressEvent) {
  34. if listener != nil && event != nil {
  35. listener.ProgressChanged(event)
  36. }
  37. }
  38. type readerTracker struct {
  39. completedBytes int64
  40. }
  41. type teeReader struct {
  42. reader io.Reader
  43. writer io.Writer
  44. listener ProgressListener
  45. consumedBytes int64
  46. totalBytes int64
  47. tracker *readerTracker
  48. }
  49. // TeeReader returns a Reader that writes to w what it reads from r.
  50. // All reads from r performed through it are matched with
  51. // corresponding writes to w. There is no internal buffering -
  52. // the write must complete before the read completes.
  53. // Any error encountered while writing is reported as a read error.
  54. func TeeReader(reader io.Reader, writer io.Writer, totalBytes int64, listener ProgressListener, tracker *readerTracker) io.ReadCloser {
  55. return &teeReader{
  56. reader: reader,
  57. writer: writer,
  58. listener: listener,
  59. consumedBytes: 0,
  60. totalBytes: totalBytes,
  61. tracker: tracker,
  62. }
  63. }
  64. func (t *teeReader) Read(p []byte) (n int, err error) {
  65. n, err = t.reader.Read(p)
  66. // Read encountered error
  67. if err != nil && err != io.EOF {
  68. event := newProgressEvent(TransferFailedEvent, t.consumedBytes, t.totalBytes)
  69. publishProgress(t.listener, event)
  70. }
  71. if n > 0 {
  72. t.consumedBytes += int64(n)
  73. // CRC
  74. if t.writer != nil {
  75. if n, err := t.writer.Write(p[:n]); err != nil {
  76. return n, err
  77. }
  78. }
  79. // Progress
  80. if t.listener != nil {
  81. event := newProgressEvent(TransferDataEvent, t.consumedBytes, t.totalBytes)
  82. publishProgress(t.listener, event)
  83. }
  84. // Track
  85. if t.tracker != nil {
  86. t.tracker.completedBytes = t.consumedBytes
  87. }
  88. }
  89. return
  90. }
  91. func (t *teeReader) Close() error {
  92. if rc, ok := t.reader.(io.ReadCloser); ok {
  93. return rc.Close()
  94. }
  95. return nil
  96. }