reader.go 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. package lz4
  2. import (
  3. "encoding/binary"
  4. "errors"
  5. "fmt"
  6. "hash"
  7. "io"
  8. "io/ioutil"
  9. "runtime"
  10. "sync"
  11. "sync/atomic"
  12. )
  13. // ErrInvalid is returned when the data being read is not an LZ4 archive
  14. // (LZ4 magic number detection failed).
  15. var ErrInvalid = errors.New("invalid lz4 data")
  16. // errEndOfBlock is returned by readBlock when it has reached the last block of the frame.
  17. // It is not an error.
  18. var errEndOfBlock = errors.New("end of block")
  19. // Reader implements the LZ4 frame decoder.
  20. // The Header is set after the first call to Read().
  21. // The Header may change between Read() calls in case of concatenated frames.
  22. type Reader struct {
  23. Pos int64 // position within the source
  24. Header
  25. src io.Reader
  26. checksum hash.Hash32 // frame hash
  27. wg sync.WaitGroup // decompressing go routine wait group
  28. data []byte // buffered decompressed data
  29. window []byte // 64Kb decompressed data window
  30. }
  31. // NewReader returns a new LZ4 frame decoder.
  32. // No access to the underlying io.Reader is performed.
  33. func NewReader(src io.Reader) *Reader {
  34. return &Reader{
  35. src: src,
  36. checksum: hashPool.Get(),
  37. }
  38. }
  39. // readHeader checks the frame magic number and parses the frame descriptoz.
  40. // Skippable frames are supported even as a first frame although the LZ4
  41. // specifications recommends skippable frames not to be used as first frames.
  42. func (z *Reader) readHeader(first bool) error {
  43. defer z.checksum.Reset()
  44. for {
  45. var magic uint32
  46. if err := binary.Read(z.src, binary.LittleEndian, &magic); err != nil {
  47. if !first && err == io.ErrUnexpectedEOF {
  48. return io.EOF
  49. }
  50. return err
  51. }
  52. z.Pos += 4
  53. if magic>>8 == frameSkipMagic>>8 {
  54. var skipSize uint32
  55. if err := binary.Read(z.src, binary.LittleEndian, &skipSize); err != nil {
  56. return err
  57. }
  58. z.Pos += 4
  59. m, err := io.CopyN(ioutil.Discard, z.src, int64(skipSize))
  60. z.Pos += m
  61. if err != nil {
  62. return err
  63. }
  64. continue
  65. }
  66. if magic != frameMagic {
  67. return ErrInvalid
  68. }
  69. break
  70. }
  71. // header
  72. var buf [8]byte
  73. if _, err := io.ReadFull(z.src, buf[:2]); err != nil {
  74. return err
  75. }
  76. z.Pos += 2
  77. b := buf[0]
  78. if b>>6 != Version {
  79. return fmt.Errorf("lz4.Read: invalid version: got %d expected %d", b>>6, Version)
  80. }
  81. z.BlockDependency = b>>5&1 == 0
  82. z.BlockChecksum = b>>4&1 > 0
  83. frameSize := b>>3&1 > 0
  84. z.NoChecksum = b>>2&1 == 0
  85. // z.Dict = b&1 > 0
  86. bmsID := buf[1] >> 4 & 0x7
  87. bSize, ok := bsMapID[bmsID]
  88. if !ok {
  89. return fmt.Errorf("lz4.Read: invalid block max size: %d", bmsID)
  90. }
  91. z.BlockMaxSize = bSize
  92. z.checksum.Write(buf[0:2])
  93. if frameSize {
  94. if err := binary.Read(z.src, binary.LittleEndian, &z.Size); err != nil {
  95. return err
  96. }
  97. z.Pos += 8
  98. binary.LittleEndian.PutUint64(buf[:], z.Size)
  99. z.checksum.Write(buf[0:8])
  100. }
  101. // if z.Dict {
  102. // if err := binary.Read(z.src, binary.LittleEndian, &z.DictID); err != nil {
  103. // return err
  104. // }
  105. // z.Pos += 4
  106. // binary.LittleEndian.PutUint32(buf[:], z.DictID)
  107. // z.checksum.Write(buf[0:4])
  108. // }
  109. // header checksum
  110. if _, err := io.ReadFull(z.src, buf[:1]); err != nil {
  111. return err
  112. }
  113. z.Pos++
  114. if h := byte(z.checksum.Sum32() >> 8 & 0xFF); h != buf[0] {
  115. return fmt.Errorf("lz4.Read: invalid header checksum: got %v expected %v", buf[0], h)
  116. }
  117. z.Header.done = true
  118. return nil
  119. }
  120. // Read decompresses data from the underlying source into the supplied buffer.
  121. //
  122. // Since there can be multiple streams concatenated, Header values may
  123. // change between calls to Read(). If that is the case, no data is actually read from
  124. // the underlying io.Reader, to allow for potential input buffer resizing.
  125. //
  126. // Data is buffered if the input buffer is too small, and exhausted upon successive calls.
  127. //
  128. // If the buffer is large enough (typically in multiples of BlockMaxSize) and there is
  129. // no block dependency, then the data will be decompressed concurrently based on the GOMAXPROCS value.
  130. func (z *Reader) Read(buf []byte) (n int, err error) {
  131. if !z.Header.done {
  132. if err = z.readHeader(true); err != nil {
  133. return
  134. }
  135. }
  136. if len(buf) == 0 {
  137. return
  138. }
  139. // exhaust remaining data from previous Read()
  140. if len(z.data) > 0 {
  141. n = copy(buf, z.data)
  142. z.data = z.data[n:]
  143. if len(z.data) == 0 {
  144. z.data = nil
  145. }
  146. return
  147. }
  148. // Break up the input buffer into BlockMaxSize blocks with at least one block.
  149. // Then decompress into each of them concurrently if possible (no dependency).
  150. // In case of dependency, the first block will be missing the window (except on the
  151. // very first call), the rest will have it already since it comes from the previous block.
  152. wbuf := buf
  153. zn := (len(wbuf) + z.BlockMaxSize - 1) / z.BlockMaxSize
  154. zblocks := make([]block, zn)
  155. for zi, abort := 0, uint32(0); zi < zn && atomic.LoadUint32(&abort) == 0; zi++ {
  156. zb := &zblocks[zi]
  157. // last block may be too small
  158. if len(wbuf) < z.BlockMaxSize+len(z.window) {
  159. wbuf = make([]byte, z.BlockMaxSize+len(z.window))
  160. }
  161. copy(wbuf, z.window)
  162. if zb.err = z.readBlock(wbuf, zb); zb.err != nil {
  163. break
  164. }
  165. wbuf = wbuf[z.BlockMaxSize:]
  166. if !z.BlockDependency {
  167. z.wg.Add(1)
  168. go z.decompressBlock(zb, &abort)
  169. continue
  170. }
  171. // cannot decompress concurrently when dealing with block dependency
  172. z.decompressBlock(zb, nil)
  173. // the last block may not contain enough data
  174. if len(z.window) == 0 {
  175. z.window = make([]byte, winSize)
  176. }
  177. if len(zb.data) >= winSize {
  178. copy(z.window, zb.data[len(zb.data)-winSize:])
  179. } else {
  180. copy(z.window, z.window[len(zb.data):])
  181. copy(z.window[len(zb.data)+1:], zb.data)
  182. }
  183. }
  184. z.wg.Wait()
  185. // since a block size may be less then BlockMaxSize, trim the decompressed buffers
  186. for _, zb := range zblocks {
  187. if zb.err != nil {
  188. if zb.err == errEndOfBlock {
  189. return n, z.close()
  190. }
  191. return n, zb.err
  192. }
  193. bLen := len(zb.data)
  194. if !z.NoChecksum {
  195. z.checksum.Write(zb.data)
  196. }
  197. m := copy(buf[n:], zb.data)
  198. // buffer the remaining data (this is necessarily the last block)
  199. if m < bLen {
  200. z.data = zb.data[m:]
  201. }
  202. n += m
  203. }
  204. return
  205. }
  206. // readBlock reads an entire frame block from the frame.
  207. // The input buffer is the one that will receive the decompressed data.
  208. // If the end of the frame is detected, it returns the errEndOfBlock error.
  209. func (z *Reader) readBlock(buf []byte, b *block) error {
  210. var bLen uint32
  211. if err := binary.Read(z.src, binary.LittleEndian, &bLen); err != nil {
  212. return err
  213. }
  214. atomic.AddInt64(&z.Pos, 4)
  215. switch {
  216. case bLen == 0:
  217. return errEndOfBlock
  218. case bLen&(1<<31) == 0:
  219. b.compressed = true
  220. b.data = buf
  221. b.zdata = make([]byte, bLen)
  222. default:
  223. bLen = bLen & (1<<31 - 1)
  224. if int(bLen) > len(buf) {
  225. return fmt.Errorf("lz4.Read: invalid block size: %d", bLen)
  226. }
  227. b.data = buf[:bLen]
  228. b.zdata = buf[:bLen]
  229. }
  230. if _, err := io.ReadFull(z.src, b.zdata); err != nil {
  231. return err
  232. }
  233. if z.BlockChecksum {
  234. if err := binary.Read(z.src, binary.LittleEndian, &b.checksum); err != nil {
  235. return err
  236. }
  237. xxh := hashPool.Get()
  238. defer hashPool.Put(xxh)
  239. xxh.Write(b.zdata)
  240. if h := xxh.Sum32(); h != b.checksum {
  241. return fmt.Errorf("lz4.Read: invalid block checksum: got %x expected %x", h, b.checksum)
  242. }
  243. }
  244. return nil
  245. }
  246. // decompressBlock decompresses a frame block.
  247. // In case of an error, the block err is set with it and abort is set to 1.
  248. func (z *Reader) decompressBlock(b *block, abort *uint32) {
  249. if abort != nil {
  250. defer z.wg.Done()
  251. }
  252. if b.compressed {
  253. n := len(z.window)
  254. m, err := UncompressBlock(b.zdata, b.data, n)
  255. if err != nil {
  256. if abort != nil {
  257. atomic.StoreUint32(abort, 1)
  258. }
  259. b.err = err
  260. return
  261. }
  262. b.data = b.data[n : n+m]
  263. }
  264. atomic.AddInt64(&z.Pos, int64(len(b.data)))
  265. }
  266. // close validates the frame checksum (if any) and checks the next frame (if any).
  267. func (z *Reader) close() error {
  268. if !z.NoChecksum {
  269. var checksum uint32
  270. if err := binary.Read(z.src, binary.LittleEndian, &checksum); err != nil {
  271. return err
  272. }
  273. if checksum != z.checksum.Sum32() {
  274. return fmt.Errorf("lz4.Read: invalid frame checksum: got %x expected %x", z.checksum.Sum32(), checksum)
  275. }
  276. }
  277. // get ready for the next concatenated frame, but do not change the position
  278. pos := z.Pos
  279. z.Reset(z.src)
  280. z.Pos = pos
  281. // since multiple frames can be concatenated, check for another one
  282. return z.readHeader(false)
  283. }
  284. // Reset discards the Reader's state and makes it equivalent to the
  285. // result of its original state from NewReader, but reading from r instead.
  286. // This permits reusing a Reader rather than allocating a new one.
  287. func (z *Reader) Reset(r io.Reader) {
  288. z.Header = Header{}
  289. z.Pos = 0
  290. z.src = r
  291. z.checksum.Reset()
  292. z.data = nil
  293. z.window = nil
  294. }
  295. // WriteTo decompresses the data from the underlying io.Reader and writes it to the io.Writer.
  296. // Returns the number of bytes written.
  297. func (z *Reader) WriteTo(w io.Writer) (n int64, err error) {
  298. cpus := runtime.GOMAXPROCS(0)
  299. var buf []byte
  300. // The initial buffer being nil, the first Read will be only read the compressed frame options.
  301. // The buffer can then be sized appropriately to support maximum concurrency decompression.
  302. // If multiple frames are concatenated, Read() will return with no data decompressed but with
  303. // potentially changed options. The buffer will be resized accordingly, always trying to
  304. // maximize concurrency.
  305. for {
  306. nsize := 0
  307. // the block max size can change if multiple streams are concatenated.
  308. // Check it after every Read().
  309. if z.BlockDependency {
  310. // in case of dependency, we cannot decompress concurrently,
  311. // so allocate the minimum buffer + window size
  312. nsize = len(z.window) + z.BlockMaxSize
  313. } else {
  314. // if no dependency, allocate a buffer large enough for concurrent decompression
  315. nsize = cpus * z.BlockMaxSize
  316. }
  317. if nsize != len(buf) {
  318. buf = make([]byte, nsize)
  319. }
  320. m, er := z.Read(buf)
  321. if er != nil && er != io.EOF {
  322. return n, er
  323. }
  324. m, err = w.Write(buf[:m])
  325. n += int64(m)
  326. if err != nil || er == io.EOF {
  327. return
  328. }
  329. }
  330. }