writer.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. package lz4
  2. import (
  3. "io"
  4. "github.com/pierrec/lz4/v4/internal/lz4block"
  5. "github.com/pierrec/lz4/v4/internal/lz4errors"
  6. "github.com/pierrec/lz4/v4/internal/lz4stream"
  7. )
  8. var writerStates = []aState{
  9. noState: newState,
  10. newState: writeState,
  11. writeState: closedState,
  12. closedState: newState,
  13. errorState: newState,
  14. }
  15. // NewWriter returns a new LZ4 frame encoder.
  16. func NewWriter(w io.Writer) *Writer {
  17. zw := &Writer{frame: lz4stream.NewFrame()}
  18. zw.state.init(writerStates)
  19. _ = zw.Apply(DefaultBlockSizeOption, DefaultChecksumOption, DefaultConcurrency, defaultOnBlockDone)
  20. zw.Reset(w)
  21. return zw
  22. }
  23. // Writer allows writing an LZ4 stream.
  24. type Writer struct {
  25. state _State
  26. src io.Writer // destination writer
  27. level lz4block.CompressionLevel // how hard to try
  28. num int // concurrency level
  29. frame *lz4stream.Frame // frame being built
  30. data []byte // pending data
  31. idx int // size of pending data
  32. handler func(int)
  33. }
  34. func (*Writer) private() {}
  35. func (w *Writer) Apply(options ...Option) (err error) {
  36. defer w.state.check(&err)
  37. switch w.state.state {
  38. case newState:
  39. case errorState:
  40. return w.state.err
  41. default:
  42. return lz4errors.ErrOptionClosedOrError
  43. }
  44. for _, o := range options {
  45. if err = o(w); err != nil {
  46. return
  47. }
  48. }
  49. w.Reset(w.src)
  50. return
  51. }
  52. func (w *Writer) isNotConcurrent() bool {
  53. return w.num == 1
  54. }
  55. // init sets up the Writer when in newState. It does not change the Writer state.
  56. func (w *Writer) init() error {
  57. w.frame.InitW(w.src, w.num)
  58. size := w.frame.Descriptor.Flags.BlockSizeIndex()
  59. w.data = size.Get()
  60. w.idx = 0
  61. return w.frame.Descriptor.Write(w.frame, w.src)
  62. }
  63. func (w *Writer) Write(buf []byte) (n int, err error) {
  64. defer w.state.check(&err)
  65. switch w.state.state {
  66. case writeState:
  67. case closedState, errorState:
  68. return 0, w.state.err
  69. case newState:
  70. if err = w.init(); w.state.next(err) {
  71. return
  72. }
  73. default:
  74. return 0, w.state.fail()
  75. }
  76. zn := len(w.data)
  77. for len(buf) > 0 {
  78. if w.idx == 0 && len(buf) >= zn {
  79. // Avoid a copy as there is enough data for a block.
  80. if err = w.write(buf[:zn], false); err != nil {
  81. return
  82. }
  83. n += zn
  84. buf = buf[zn:]
  85. continue
  86. }
  87. // Accumulate the data to be compressed.
  88. m := copy(w.data[w.idx:], buf)
  89. n += m
  90. w.idx += m
  91. buf = buf[m:]
  92. if w.idx < len(w.data) {
  93. // Buffer not filled.
  94. return
  95. }
  96. // Buffer full.
  97. if err = w.write(w.data, true); err != nil {
  98. return
  99. }
  100. if !w.isNotConcurrent() {
  101. size := w.frame.Descriptor.Flags.BlockSizeIndex()
  102. w.data = size.Get()
  103. }
  104. w.idx = 0
  105. }
  106. return
  107. }
  108. func (w *Writer) write(data []byte, safe bool) error {
  109. if w.isNotConcurrent() {
  110. block := w.frame.Blocks.Block
  111. err := block.Compress(w.frame, data, w.level).Write(w.frame, w.src)
  112. w.handler(len(block.Data))
  113. return err
  114. }
  115. size := w.frame.Descriptor.Flags.BlockSizeIndex()
  116. c := make(chan *lz4stream.FrameDataBlock)
  117. w.frame.Blocks.Blocks <- c
  118. go func(c chan *lz4stream.FrameDataBlock, data []byte, size lz4block.BlockSizeIndex, safe bool) {
  119. b := lz4stream.NewFrameDataBlock(size)
  120. c <- b.Compress(w.frame, data, w.level)
  121. <-c
  122. w.handler(len(b.Data))
  123. b.CloseW(w.frame)
  124. if safe {
  125. // safe to put it back as the last usage of it was FrameDataBlock.Write() called before c is closed
  126. size.Put(data)
  127. }
  128. }(c, data, size, safe)
  129. return nil
  130. }
  131. // Close closes the Writer, flushing any unwritten data to the underlying io.Writer,
  132. // but does not close the underlying io.Writer.
  133. func (w *Writer) Close() (err error) {
  134. switch w.state.state {
  135. case writeState:
  136. case errorState:
  137. return w.state.err
  138. default:
  139. return nil
  140. }
  141. defer w.state.nextd(&err)
  142. if w.idx > 0 {
  143. // Flush pending data, disable w.data freeing as it is done later on.
  144. if err = w.write(w.data[:w.idx], false); err != nil {
  145. return err
  146. }
  147. w.idx = 0
  148. }
  149. err = w.frame.CloseW(w.src, w.num)
  150. // It is now safe to free the buffer.
  151. if w.data != nil {
  152. size := w.frame.Descriptor.Flags.BlockSizeIndex()
  153. size.Put(w.data)
  154. w.data = nil
  155. }
  156. return
  157. }
  158. // Reset clears the state of the Writer w such that it is equivalent to its
  159. // initial state from NewWriter, but instead writing to writer.
  160. // Reset keeps the previous options unless overwritten by the supplied ones.
  161. // No access to writer is performed.
  162. //
  163. // w.Close must be called before Reset or pending data may be dropped.
  164. func (w *Writer) Reset(writer io.Writer) {
  165. w.frame.Reset(w.num)
  166. w.state.reset()
  167. w.src = writer
  168. }
  169. // ReadFrom efficiently reads from r and compressed into the Writer destination.
  170. func (w *Writer) ReadFrom(r io.Reader) (n int64, err error) {
  171. switch w.state.state {
  172. case closedState, errorState:
  173. return 0, w.state.err
  174. case newState:
  175. if err = w.init(); w.state.next(err) {
  176. return
  177. }
  178. default:
  179. return 0, w.state.fail()
  180. }
  181. defer w.state.check(&err)
  182. size := w.frame.Descriptor.Flags.BlockSizeIndex()
  183. var done bool
  184. var rn int
  185. data := size.Get()
  186. if w.isNotConcurrent() {
  187. // Keep the same buffer for the whole process.
  188. defer size.Put(data)
  189. }
  190. for !done {
  191. rn, err = io.ReadFull(r, data)
  192. switch err {
  193. case nil:
  194. case io.EOF:
  195. done = true
  196. default:
  197. return
  198. }
  199. n += int64(rn)
  200. err = w.write(data[:rn], true)
  201. if err != nil {
  202. return
  203. }
  204. w.handler(rn)
  205. if !done && !w.isNotConcurrent() {
  206. // The buffer will be returned automatically by go routines (safe=true)
  207. // so get a new one fo the next round.
  208. data = size.Get()
  209. }
  210. }
  211. err = w.Close()
  212. return
  213. }