reader.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. // Copyright (c) 2015 Klaus Post, released under MIT License. See LICENSE file.
  2. // The readahead package will do asynchronous read-ahead from an input io.Reader
  3. // and make the data available as an io.Reader.
  4. //
  5. // This should be fully transparent, except that once an error
  6. // has been returned from the Reader, it will not recover.
  7. //
  8. // The readahead object also fulfills the io.WriterTo interface, which
  9. // is likely to speed up copies.
  10. //
  11. // Package home: https://github.com/klauspost/readahead
  12. //
  13. package readahead
  14. import (
  15. "errors"
  16. "fmt"
  17. "io"
  18. )
  19. type seekable struct {
  20. *reader
  21. }
  22. type ReadSeekCloser interface {
  23. io.ReadCloser
  24. io.Seeker
  25. }
  26. type reader struct {
  27. in io.Reader // Input reader
  28. closer io.Closer // Optional closer
  29. ready chan *buffer // Buffers ready to be handed to the reader
  30. reuse chan *buffer // Buffers to reuse for input reading
  31. exit chan struct{} // Closes when finished
  32. buffers int // Number of buffers
  33. size int // Size of each buffer
  34. err error // If an error has occurred it is here
  35. cur *buffer // Current buffer being served
  36. exited chan struct{} // Channel is closed been the async reader shuts down
  37. }
  38. // NewReaderSize returns a reader with a custom number of buffers and size.
  39. // buffers is the number of queued buffers and size is the size of each
  40. // buffer in bytes.
  41. func NewReaderSize(rd io.Reader, buffers, size int) (res io.ReadCloser, err error) {
  42. if size <= 0 {
  43. return nil, fmt.Errorf("buffer size too small")
  44. }
  45. if buffers <= 0 {
  46. return nil, fmt.Errorf("number of buffers too small")
  47. }
  48. if rd == nil {
  49. return nil, fmt.Errorf("nil input reader supplied")
  50. }
  51. a := &reader{}
  52. if _, ok := rd.(io.Seeker); ok {
  53. res = &seekable{a}
  54. } else {
  55. res = a
  56. }
  57. a.init(rd, buffers, size)
  58. return
  59. }
  60. // initialize the reader
  61. func (a *reader) init(rd io.Reader, buffers, size int) {
  62. a.in = rd
  63. a.ready = make(chan *buffer, buffers)
  64. a.reuse = make(chan *buffer, buffers)
  65. a.exit = make(chan struct{}, 0)
  66. a.exited = make(chan struct{}, 0)
  67. a.buffers = buffers
  68. a.size = size
  69. a.cur = nil
  70. a.err = nil
  71. // Create buffers
  72. for i := 0; i < buffers; i++ {
  73. a.reuse <- newBuffer(size)
  74. }
  75. // Start async reader
  76. go func() {
  77. // Ensure that when we exit this is signalled.
  78. defer close(a.exited)
  79. defer close(a.ready)
  80. for {
  81. select {
  82. case b := <-a.reuse:
  83. err := b.read(a.in)
  84. a.ready <- b
  85. if err != nil {
  86. return
  87. }
  88. case <-a.exit:
  89. return
  90. }
  91. }
  92. }()
  93. }
  94. // fill will check if the current buffer is empty and fill it if it is.
  95. // If an error was returned at the end of the current buffer it is returned.
  96. func (a *reader) fill() (err error) {
  97. if a.cur.isEmpty() {
  98. if a.cur != nil {
  99. a.reuse <- a.cur
  100. a.cur = nil
  101. }
  102. b, ok := <-a.ready
  103. if !ok {
  104. if a.err == nil {
  105. a.err = errors.New("readahead: read after Close")
  106. }
  107. return a.err
  108. }
  109. a.cur = b
  110. }
  111. return nil
  112. }
  113. // Read will return the next available data.
  114. func (a *reader) Read(p []byte) (n int, err error) {
  115. if a.err != nil {
  116. return 0, a.err
  117. }
  118. // Swap buffer and maybe return error
  119. err = a.fill()
  120. if err != nil {
  121. return 0, err
  122. }
  123. // Copy what we can
  124. n = copy(p, a.cur.buffer())
  125. a.cur.inc(n)
  126. // If at end of buffer, return any error, if present
  127. if a.cur.isEmpty() {
  128. a.err = a.cur.err
  129. return n, a.err
  130. }
  131. return n, nil
  132. }
  133. func (a *seekable) Seek(offset int64, whence int) (res int64, err error) {
  134. //Not checking the result as seekable receiver guarantees it to be assertable
  135. seeker, _ := a.in.(io.Seeker)
  136. //Make sure the async routine is closed
  137. select {
  138. case <-a.exited:
  139. case a.exit <- struct{}{}:
  140. <-a.exited
  141. }
  142. if whence == io.SeekCurrent {
  143. //If need to seek based on current position, take into consideration the bytes we read but the consumer
  144. //doesn't know about
  145. err = nil
  146. for a.cur != nil {
  147. if err = a.fill(); err == nil && a.cur != nil {
  148. offset -= int64(len(a.cur.buffer()))
  149. a.cur.offset = len(a.cur.buf)
  150. }
  151. }
  152. }
  153. //Seek the actual Seeker
  154. if res, err = seeker.Seek(offset, whence); err == nil {
  155. //If the seek was successful, reinitialize ourselves (with the new position).
  156. a.init(a.in, a.buffers, a.size)
  157. }
  158. return
  159. }
  160. // WriteTo writes data to w until there's no more data to write or when an error occurs.
  161. // The return value n is the number of bytes written.
  162. // Any error encountered during the write is also returned.
  163. func (a *reader) WriteTo(w io.Writer) (n int64, err error) {
  164. if a.err != nil {
  165. return 0, a.err
  166. }
  167. n = 0
  168. for {
  169. err = a.fill()
  170. if err != nil {
  171. return n, err
  172. }
  173. n2, err := w.Write(a.cur.buffer())
  174. a.cur.inc(n2)
  175. n += int64(n2)
  176. if err != nil {
  177. return n, err
  178. }
  179. if a.cur.err != nil {
  180. // io.Writer should return nil if we are at EOF.
  181. if a.cur.err == io.EOF {
  182. a.err = a.cur.err
  183. return n, nil
  184. }
  185. a.err = a.cur.err
  186. return n, a.cur.err
  187. }
  188. }
  189. }
  190. // Close will ensure that the underlying async reader is shut down.
  191. // It will also close the input supplied on newAsyncReader.
  192. func (a *reader) Close() (err error) {
  193. select {
  194. case <-a.exited:
  195. case a.exit <- struct{}{}:
  196. <-a.exited
  197. }
  198. if a.closer != nil {
  199. // Only call once
  200. c := a.closer
  201. a.closer = nil
  202. return c.Close()
  203. }
  204. a.err = errors.New("readahead: read after Close")
  205. return nil
  206. }
  207. // Internal buffer representing a single read.
  208. // If an error is present, it must be returned
  209. // once all buffer content has been served.
  210. type buffer struct {
  211. buf []byte
  212. err error
  213. offset int
  214. size int
  215. }
  216. func newBuffer(size int) *buffer {
  217. return &buffer{buf: make([]byte, size), err: nil, size: size}
  218. }
  219. // isEmpty returns true is offset is at end of
  220. // buffer, or if the buffer is nil
  221. func (b *buffer) isEmpty() bool {
  222. if b == nil {
  223. return true
  224. }
  225. if len(b.buf)-b.offset <= 0 {
  226. return true
  227. }
  228. return false
  229. }
  230. // read into start of the buffer from the supplied reader,
  231. // resets the offset and updates the size of the buffer.
  232. // Any error encountered during the read is returned.
  233. func (b *buffer) read(rd io.Reader) (err error) {
  234. defer func() {
  235. if r := recover(); r != nil {
  236. err = fmt.Errorf("panic reading: %v", r)
  237. b.err = err
  238. }
  239. }()
  240. var n int
  241. n, b.err = rd.Read(b.buf[0:b.size])
  242. b.buf = b.buf[0:n]
  243. b.offset = 0
  244. return b.err
  245. }
  246. // Return the buffer at current offset
  247. func (b *buffer) buffer() []byte {
  248. return b.buf[b.offset:]
  249. }
  250. // inc will increment the read offset
  251. func (b *buffer) inc(n int) {
  252. b.offset += n
  253. }