| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171 |
- package lz4
- import "io"
- var writerStates = []aState{
- noState: newState,
- newState: headerState,
- headerState: writeState,
- writeState: closedState,
- closedState: newState,
- errorState: newState,
- }
- // NewWriter returns a new LZ4 frame encoder.
- func NewWriter(w io.Writer, options ...Option) (io.WriteCloser, error) {
- zw := new(_Writer)
- _ = defaultBlockSizeOption(zw)
- _ = defaultChecksumOption(zw)
- _ = defaultConcurrency(zw)
- if err := zw.Reset(w, options...); err != nil {
- return nil, err
- }
- return zw, nil
- }
- type _Writer struct {
- state _State
- buf [11]byte // frame descriptor needs at most 4+8+1=11 bytes
- src io.Writer // destination writer
- level CompressionLevel // how hard to try
- num int // concurrency level
- frame Frame // frame being built
- ht []int // hash table (set if no concurrency)
- data []byte // pending data
- idx int // size of pending data
- }
- func (w *_Writer) isNotConcurrent() bool {
- return w.num == 1
- }
- func (w *_Writer) Write(buf []byte) (n int, err error) {
- defer w.state.check(&err)
- switch w.state.state {
- case closedState, errorState:
- return 0, w.state.err
- case newState:
- w.state.next(nil)
- if err = w.frame.Descriptor.write(w); w.state.next(err) {
- return
- }
- default:
- return 0, w.state.fail()
- }
- zn := len(w.data)
- for len(buf) > 0 {
- if w.idx == 0 && len(buf) >= zn {
- // Avoid a copy as there is enough data for a block.
- if err = w.write(); err != nil {
- return
- }
- n += zn
- buf = buf[zn:]
- continue
- }
- // Accumulate the data to be compressed.
- m := copy(w.data[w.idx:], buf)
- n += m
- w.idx += m
- buf = buf[m:]
- if w.idx < len(w.data) {
- // Buffer not filled.
- return
- }
- // Buffer full.
- if err = w.write(); err != nil {
- return
- }
- w.idx = 0
- }
- return
- }
- func (w *_Writer) write() error {
- if w.isNotConcurrent() {
- return w.frame.Blocks.Block.compress(w, w.data, w.ht).write(w)
- }
- size := w.frame.Descriptor.Flags.BlockSizeIndex()
- c := make(chan *FrameDataBlock)
- w.frame.Blocks.Blocks <- c
- go func(c chan *FrameDataBlock, data []byte, size BlockSizeIndex) {
- b := newFrameDataBlock(size)
- zdata := b.Data
- c <- b.compress(w, data, nil)
- // Wait for the compressed or uncompressed data to no longer be in use
- // and free the allocated buffers
- if !b.Size.compressed() {
- zdata, data = data, zdata
- }
- size.put(data)
- <-c
- size.put(zdata)
- }(c, w.data, size)
- if w.idx > 0 {
- // Not closed.
- w.data = size.get()
- }
- w.idx = 0
- return nil
- }
- // Close closes the Writer, flushing any unwritten data to the underlying io.Writer,
- // but does not close the underlying io.Writer.
- func (w *_Writer) Close() error {
- switch w.state.state {
- case writeState:
- case errorState:
- return w.state.err
- default:
- return nil
- }
- var err error
- defer func() { w.state.next(err) }()
- if idx := w.idx; idx > 0 {
- // Flush pending data.
- w.data = w.data[:idx]
- w.idx = 0
- if err = w.write(); err != nil {
- return err
- }
- w.data = nil
- }
- if w.isNotConcurrent() {
- htPool.Put(w.ht)
- size := w.frame.Descriptor.Flags.BlockSizeIndex()
- size.put(w.data)
- }
- return w.frame.closeW(w)
- }
- // Reset clears the state of the Writer w such that it is equivalent to its
- // initial state from NewWriter, but instead writing to writer.
- // Reset keeps the previous options unless overwritten by the supplied ones.
- // No access to writer is performed.
- //
- // w.Close must be called before Reset.
- func (w *_Writer) Reset(writer io.Writer, options ...Option) (err error) {
- for _, o := range options {
- if err = o(w); err != nil {
- break
- }
- }
- w.state.state = noState
- if w.state.next(err) {
- return
- }
- w.src = writer
- w.frame.initW(w)
- size := w.frame.Descriptor.Flags.BlockSizeIndex()
- w.data = size.get()
- w.idx = 0
- if w.isNotConcurrent() {
- w.ht = htPool.Get().([]int)
- }
- return nil
- }
|