decoder.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513
  1. // Copyright 2019+ Klaus Post. All rights reserved.
  2. // License information can be found in the LICENSE file.
  3. // Based on work by Yann Collet, released under BSD License.
  4. package zstd
  5. import (
  6. "bytes"
  7. "errors"
  8. "io"
  9. "sync"
  10. )
  11. // Decoder provides decoding of zstandard streams.
  12. // The decoder has been designed to operate without allocations after a warmup.
  13. // This means that you should store the decoder for best performance.
  14. // To re-use a stream decoder, use the Reset(r io.Reader) error to switch to another stream.
  15. // A decoder can safely be re-used even if the previous stream failed.
  16. // To release the resources, you must call the Close() function on a decoder.
  17. type Decoder struct {
  18. o decoderOptions
  19. // Unreferenced decoders, ready for use.
  20. decoders chan *blockDec
  21. // Unreferenced decoders, ready for use.
  22. frames chan *frameDec
  23. // Streams ready to be decoded.
  24. stream chan decodeStream
  25. // Current read position used for Reader functionality.
  26. current decoderState
  27. // Custom dictionaries
  28. dicts map[uint32]struct{}
  29. // streamWg is the waitgroup for all streams
  30. streamWg sync.WaitGroup
  31. }
  32. // decoderState is used for maintaining state when the decoder
  33. // is used for streaming.
  34. type decoderState struct {
  35. // current block being written to stream.
  36. decodeOutput
  37. // output in order to be written to stream.
  38. output chan decodeOutput
  39. // cancel remaining output.
  40. cancel chan struct{}
  41. flushed bool
  42. }
  43. var (
  44. // Check the interfaces we want to support.
  45. _ = io.WriterTo(&Decoder{})
  46. _ = io.Reader(&Decoder{})
  47. )
  48. // NewReader creates a new decoder.
  49. // A nil Reader can be provided in which case Reset can be used to start a decode.
  50. //
  51. // A Decoder can be used in two modes:
  52. //
  53. // 1) As a stream, or
  54. // 2) For stateless decoding using DecodeAll.
  55. //
  56. // Only a single stream can be decoded concurrently, but the same decoder
  57. // can run multiple concurrent stateless decodes. It is even possible to
  58. // use stateless decodes while a stream is being decoded.
  59. //
  60. // The Reset function can be used to initiate a new stream, which is will considerably
  61. // reduce the allocations normally caused by NewReader.
  62. func NewReader(r io.Reader, opts ...DOption) (*Decoder, error) {
  63. initPredefined()
  64. var d Decoder
  65. d.o.setDefault()
  66. for _, o := range opts {
  67. err := o(&d.o)
  68. if err != nil {
  69. return nil, err
  70. }
  71. }
  72. d.current.output = make(chan decodeOutput, d.o.concurrent)
  73. d.current.flushed = true
  74. // Create decoders
  75. d.decoders = make(chan *blockDec, d.o.concurrent)
  76. d.frames = make(chan *frameDec, d.o.concurrent)
  77. for i := 0; i < d.o.concurrent; i++ {
  78. d.frames <- newFrameDec(d.o)
  79. d.decoders <- newBlockDec(d.o.lowMem)
  80. }
  81. if r == nil {
  82. return &d, nil
  83. }
  84. return &d, d.Reset(r)
  85. }
  86. // Read bytes from the decompressed stream into p.
  87. // Returns the number of bytes written and any error that occurred.
  88. // When the stream is done, io.EOF will be returned.
  89. func (d *Decoder) Read(p []byte) (int, error) {
  90. if d.stream == nil {
  91. return 0, errors.New("no input has been initialized")
  92. }
  93. var n int
  94. for {
  95. if len(d.current.b) > 0 {
  96. filled := copy(p, d.current.b)
  97. p = p[filled:]
  98. d.current.b = d.current.b[filled:]
  99. n += filled
  100. }
  101. if len(p) == 0 {
  102. break
  103. }
  104. if len(d.current.b) == 0 {
  105. // We have an error and no more data
  106. if d.current.err != nil {
  107. break
  108. }
  109. if !d.nextBlock(n == 0) {
  110. return n, nil
  111. }
  112. }
  113. }
  114. if len(d.current.b) > 0 {
  115. if debug {
  116. println("returning", n, "still bytes left:", len(d.current.b))
  117. }
  118. // Only return error at end of block
  119. return n, nil
  120. }
  121. if d.current.err != nil {
  122. d.drainOutput()
  123. }
  124. if debug {
  125. println("returning", n, d.current.err, len(d.decoders))
  126. }
  127. return n, d.current.err
  128. }
  129. // Reset will reset the decoder the supplied stream after the current has finished processing.
  130. // Note that this functionality cannot be used after Close has been called.
  131. func (d *Decoder) Reset(r io.Reader) error {
  132. if d.current.err == ErrDecoderClosed {
  133. return d.current.err
  134. }
  135. if r == nil {
  136. return errors.New("nil Reader sent as input")
  137. }
  138. if d.stream == nil {
  139. d.stream = make(chan decodeStream, 1)
  140. d.streamWg.Add(1)
  141. go d.startStreamDecoder(d.stream)
  142. }
  143. d.drainOutput()
  144. // If bytes buffer and < 1MB, do sync decoding anyway.
  145. if bb, ok := r.(*bytes.Buffer); ok && bb.Len() < 1<<20 {
  146. if debug {
  147. println("*bytes.Buffer detected, doing sync decode, len:", bb.Len())
  148. }
  149. b := bb.Bytes()
  150. dst, err := d.DecodeAll(b, nil)
  151. if err == nil {
  152. err = io.EOF
  153. }
  154. d.current.b = dst
  155. d.current.err = err
  156. d.current.flushed = true
  157. if debug {
  158. println("sync decode to ", len(dst), "bytes, err:", err)
  159. }
  160. return nil
  161. }
  162. // Remove current block.
  163. d.current.decodeOutput = decodeOutput{}
  164. d.current.err = nil
  165. d.current.cancel = make(chan struct{})
  166. d.current.flushed = false
  167. d.current.d = nil
  168. d.stream <- decodeStream{
  169. r: r,
  170. output: d.current.output,
  171. cancel: d.current.cancel,
  172. }
  173. return nil
  174. }
  175. // drainOutput will drain the output until errEndOfStream is sent.
  176. func (d *Decoder) drainOutput() {
  177. if d.current.cancel != nil {
  178. println("cancelling current")
  179. close(d.current.cancel)
  180. d.current.cancel = nil
  181. }
  182. if d.current.d != nil {
  183. if debug {
  184. printf("re-adding current decoder %p, decoders: %d", d.current.d, len(d.decoders))
  185. }
  186. d.decoders <- d.current.d
  187. d.current.d = nil
  188. d.current.b = nil
  189. }
  190. if d.current.output == nil || d.current.flushed {
  191. println("current already flushed")
  192. return
  193. }
  194. for {
  195. select {
  196. case v := <-d.current.output:
  197. if v.d != nil {
  198. if debug {
  199. printf("re-adding decoder %p", v.d)
  200. }
  201. d.decoders <- v.d
  202. }
  203. if v.err == errEndOfStream {
  204. println("current flushed")
  205. d.current.flushed = true
  206. return
  207. }
  208. }
  209. }
  210. }
  211. // WriteTo writes data to w until there's no more data to write or when an error occurs.
  212. // The return value n is the number of bytes written.
  213. // Any error encountered during the write is also returned.
  214. func (d *Decoder) WriteTo(w io.Writer) (int64, error) {
  215. if d.stream == nil {
  216. return 0, errors.New("no input has been initialized")
  217. }
  218. var n int64
  219. for {
  220. if len(d.current.b) > 0 {
  221. n2, err2 := w.Write(d.current.b)
  222. n += int64(n2)
  223. if err2 != nil && d.current.err == nil {
  224. d.current.err = err2
  225. break
  226. }
  227. }
  228. if d.current.err != nil {
  229. break
  230. }
  231. d.nextBlock(true)
  232. }
  233. err := d.current.err
  234. if err != nil {
  235. d.drainOutput()
  236. }
  237. if err == io.EOF {
  238. err = nil
  239. }
  240. return n, err
  241. }
  242. // DecodeAll allows stateless decoding of a blob of bytes.
  243. // Output will be appended to dst, so if the destination size is known
  244. // you can pre-allocate the destination slice to avoid allocations.
  245. // DecodeAll can be used concurrently.
  246. // The Decoder concurrency limits will be respected.
  247. func (d *Decoder) DecodeAll(input, dst []byte) ([]byte, error) {
  248. if d.current.err == ErrDecoderClosed {
  249. return dst, ErrDecoderClosed
  250. }
  251. // Grab a block decoder and frame decoder.
  252. block, frame := <-d.decoders, <-d.frames
  253. defer func() {
  254. if debug {
  255. printf("re-adding decoder: %p", block)
  256. }
  257. d.decoders <- block
  258. frame.rawInput = nil
  259. frame.bBuf = nil
  260. d.frames <- frame
  261. }()
  262. frame.bBuf = input
  263. for {
  264. err := frame.reset(&frame.bBuf)
  265. if err == io.EOF {
  266. return dst, nil
  267. }
  268. if err != nil {
  269. return dst, err
  270. }
  271. if frame.FrameContentSize > d.o.maxDecodedSize-uint64(len(dst)) {
  272. return dst, ErrDecoderSizeExceeded
  273. }
  274. if frame.FrameContentSize > 0 && frame.FrameContentSize < 1<<30 {
  275. // Never preallocate moe than 1 GB up front.
  276. if uint64(cap(dst)) < frame.FrameContentSize {
  277. dst2 := make([]byte, len(dst), len(dst)+int(frame.FrameContentSize))
  278. copy(dst2, dst)
  279. dst = dst2
  280. }
  281. }
  282. if cap(dst) == 0 {
  283. // Allocate window size * 2 by default if nothing is provided and we didn't get frame content size.
  284. size := frame.WindowSize * 2
  285. // Cap to 1 MB.
  286. if size > 1<<20 {
  287. size = 1 << 20
  288. }
  289. dst = make([]byte, 0, size)
  290. }
  291. dst, err = frame.runDecoder(dst, block)
  292. if err != nil {
  293. return dst, err
  294. }
  295. if len(frame.bBuf) == 0 {
  296. break
  297. }
  298. }
  299. return dst, nil
  300. }
  301. // nextBlock returns the next block.
  302. // If an error occurs d.err will be set.
  303. // Optionally the function can block for new output.
  304. // If non-blocking mode is used the returned boolean will be false
  305. // if no data was available without blocking.
  306. func (d *Decoder) nextBlock(blocking bool) (ok bool) {
  307. if d.current.d != nil {
  308. if debug {
  309. printf("re-adding current decoder %p", d.current.d)
  310. }
  311. d.decoders <- d.current.d
  312. d.current.d = nil
  313. }
  314. if d.current.err != nil {
  315. // Keep error state.
  316. return blocking
  317. }
  318. if blocking {
  319. d.current.decodeOutput = <-d.current.output
  320. } else {
  321. select {
  322. case d.current.decodeOutput = <-d.current.output:
  323. default:
  324. return false
  325. }
  326. }
  327. if debug {
  328. println("got", len(d.current.b), "bytes, error:", d.current.err)
  329. }
  330. return true
  331. }
  332. // Close will release all resources.
  333. // It is NOT possible to reuse the decoder after this.
  334. func (d *Decoder) Close() {
  335. if d.current.err == ErrDecoderClosed {
  336. return
  337. }
  338. d.drainOutput()
  339. if d.stream != nil {
  340. close(d.stream)
  341. d.streamWg.Wait()
  342. d.stream = nil
  343. }
  344. if d.decoders != nil {
  345. close(d.decoders)
  346. for dec := range d.decoders {
  347. dec.Close()
  348. }
  349. d.decoders = nil
  350. }
  351. if d.current.d != nil {
  352. d.current.d.Close()
  353. d.current.d = nil
  354. }
  355. d.current.err = ErrDecoderClosed
  356. }
  357. // IOReadCloser returns the decoder as an io.ReadCloser for convenience.
  358. // Any changes to the decoder will be reflected, so the returned ReadCloser
  359. // can be reused along with the decoder.
  360. // io.WriterTo is also supported by the returned ReadCloser.
  361. func (d *Decoder) IOReadCloser() io.ReadCloser {
  362. return closeWrapper{d: d}
  363. }
  364. // closeWrapper wraps a function call as a closer.
  365. type closeWrapper struct {
  366. d *Decoder
  367. }
  368. // WriteTo forwards WriteTo calls to the decoder.
  369. func (c closeWrapper) WriteTo(w io.Writer) (n int64, err error) {
  370. return c.d.WriteTo(w)
  371. }
  372. // Read forwards read calls to the decoder.
  373. func (c closeWrapper) Read(p []byte) (n int, err error) {
  374. return c.d.Read(p)
  375. }
  376. // Close closes the decoder.
  377. func (c closeWrapper) Close() error {
  378. c.d.Close()
  379. return nil
  380. }
  381. type decodeOutput struct {
  382. d *blockDec
  383. b []byte
  384. err error
  385. }
  386. type decodeStream struct {
  387. r io.Reader
  388. // Blocks ready to be written to output.
  389. output chan decodeOutput
  390. // cancel reading from the input
  391. cancel chan struct{}
  392. }
  393. // errEndOfStream indicates that everything from the stream was read.
  394. var errEndOfStream = errors.New("end-of-stream")
  395. // Create Decoder:
  396. // Spawn n block decoders. These accept tasks to decode a block.
  397. // Create goroutine that handles stream processing, this will send history to decoders as they are available.
  398. // Decoders update the history as they decode.
  399. // When a block is returned:
  400. // a) history is sent to the next decoder,
  401. // b) content written to CRC.
  402. // c) return data to WRITER.
  403. // d) wait for next block to return data.
  404. // Once WRITTEN, the decoders reused by the writer frame decoder for re-use.
  405. func (d *Decoder) startStreamDecoder(inStream chan decodeStream) {
  406. defer d.streamWg.Done()
  407. frame := newFrameDec(d.o)
  408. for stream := range inStream {
  409. if debug {
  410. println("got new stream")
  411. }
  412. br := readerWrapper{r: stream.r}
  413. decodeStream:
  414. for {
  415. err := frame.reset(&br)
  416. if debug && err != nil {
  417. println("Frame decoder returned", err)
  418. }
  419. if err != nil {
  420. stream.output <- decodeOutput{
  421. err: err,
  422. }
  423. break
  424. }
  425. if debug {
  426. println("starting frame decoder")
  427. }
  428. // This goroutine will forward history between frames.
  429. frame.frameDone.Add(1)
  430. frame.initAsync()
  431. go frame.startDecoder(stream.output)
  432. decodeFrame:
  433. // Go through all blocks of the frame.
  434. for {
  435. dec := <-d.decoders
  436. select {
  437. case <-stream.cancel:
  438. if !frame.sendErr(dec, io.EOF) {
  439. // To not let the decoder dangle, send it back.
  440. stream.output <- decodeOutput{d: dec}
  441. }
  442. break decodeStream
  443. default:
  444. }
  445. err := frame.next(dec)
  446. switch err {
  447. case io.EOF:
  448. // End of current frame, no error
  449. println("EOF on next block")
  450. break decodeFrame
  451. case nil:
  452. continue
  453. default:
  454. println("block decoder returned", err)
  455. break decodeStream
  456. }
  457. }
  458. // All blocks have started decoding, check if there are more frames.
  459. println("waiting for done")
  460. frame.frameDone.Wait()
  461. println("done waiting...")
  462. }
  463. frame.frameDone.Wait()
  464. println("Sending EOS")
  465. stream.output <- decodeOutput{err: errEndOfStream}
  466. }
  467. }