limit_reader_1_7.go 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. // +build go1.7
  2. package oss
  3. import (
  4. "fmt"
  5. "io"
  6. "math"
  7. "time"
  8. "golang.org/x/time/rate"
  9. )
  10. const (
  11. perTokenBandwidthSize int = 1024
  12. )
  13. // OssLimiter wrapper rate.Limiter
  14. type OssLimiter struct {
  15. limiter *rate.Limiter
  16. }
  17. // GetOssLimiter create OssLimiter
  18. // uploadSpeed KB/s
  19. func GetOssLimiter(uploadSpeed int) (ossLimiter *OssLimiter, err error) {
  20. limiter := rate.NewLimiter(rate.Limit(uploadSpeed), uploadSpeed)
  21. // first consume the initial full token,the limiter will behave more accurately
  22. limiter.AllowN(time.Now(), uploadSpeed)
  23. return &OssLimiter{
  24. limiter: limiter,
  25. }, nil
  26. }
  27. // LimitSpeedReader for limit bandwidth upload
  28. type LimitSpeedReader struct {
  29. io.ReadCloser
  30. reader io.Reader
  31. ossLimiter *OssLimiter
  32. }
  33. // Read
  34. func (r *LimitSpeedReader) Read(p []byte) (n int, err error) {
  35. n = 0
  36. err = nil
  37. start := 0
  38. burst := r.ossLimiter.limiter.Burst()
  39. var end int
  40. var tmpN int
  41. var tc int
  42. for start < len(p) {
  43. if start+burst*perTokenBandwidthSize < len(p) {
  44. end = start + burst*perTokenBandwidthSize
  45. } else {
  46. end = len(p)
  47. }
  48. tmpN, err = r.reader.Read(p[start:end])
  49. if tmpN > 0 {
  50. n += tmpN
  51. start = n
  52. }
  53. if err != nil {
  54. return
  55. }
  56. tc = int(math.Ceil(float64(tmpN) / float64(perTokenBandwidthSize)))
  57. now := time.Now()
  58. re := r.ossLimiter.limiter.ReserveN(now, tc)
  59. if !re.OK() {
  60. err = fmt.Errorf("LimitSpeedReader.Read() failure,ReserveN error,start:%d,end:%d,burst:%d,perTokenBandwidthSize:%d",
  61. start, end, burst, perTokenBandwidthSize)
  62. return
  63. }
  64. timeDelay := re.Delay()
  65. time.Sleep(timeDelay)
  66. }
  67. return
  68. }
  69. // Close ...
  70. func (r *LimitSpeedReader) Close() error {
  71. rc, ok := r.reader.(io.ReadCloser)
  72. if ok {
  73. return rc.Close()
  74. }
  75. return nil
  76. }