writer.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. package lz4
  2. import "io"
  3. var writerStates = []aState{
  4. noState: newState,
  5. newState: writeState,
  6. writeState: closedState,
  7. closedState: newState,
  8. errorState: newState,
  9. }
  10. // NewWriter returns a new LZ4 frame encoder.
  11. func NewWriter(w io.Writer) *Writer {
  12. zw := new(Writer)
  13. zw.state.init(writerStates)
  14. _ = zw.Apply(DefaultBlockSizeOption, DefaultChecksumOption, DefaultConcurrency, defaultOnBlockDone)
  15. return zw.Reset(w)
  16. }
  17. type Writer struct {
  18. state _State
  19. buf [15]byte // frame descriptor needs at most 4(magic)+4+8+1=11 bytes
  20. src io.Writer // destination writer
  21. level CompressionLevel // how hard to try
  22. num int // concurrency level
  23. frame Frame // frame being built
  24. ht []int // hash table (set if no concurrency)
  25. data []byte // pending data
  26. idx int // size of pending data
  27. handler func(int)
  28. }
  29. func (*Writer) private() {}
  30. func (w *Writer) Apply(options ...Option) (err error) {
  31. defer w.state.check(&err)
  32. switch w.state.state {
  33. case newState:
  34. case errorState:
  35. return w.state.err
  36. default:
  37. return ErrOptionClosedOrError
  38. }
  39. for _, o := range options {
  40. if err = o(w); err != nil {
  41. return
  42. }
  43. }
  44. return
  45. }
  46. func (w *Writer) isNotConcurrent() bool {
  47. return w.num == 1
  48. }
  49. func (w *Writer) Write(buf []byte) (n int, err error) {
  50. defer w.state.check(&err)
  51. switch w.state.state {
  52. case writeState:
  53. case closedState, errorState:
  54. return 0, w.state.err
  55. case newState:
  56. if err = w.frame.Descriptor.write(w); w.state.next(err) {
  57. return
  58. }
  59. default:
  60. return 0, w.state.fail()
  61. }
  62. zn := len(w.data)
  63. for len(buf) > 0 {
  64. if w.idx == 0 && len(buf) >= zn {
  65. // Avoid a copy as there is enough data for a block.
  66. if err = w.write(buf[:zn], false); err != nil {
  67. return
  68. }
  69. n += zn
  70. buf = buf[zn:]
  71. continue
  72. }
  73. // Accumulate the data to be compressed.
  74. m := copy(w.data[w.idx:], buf)
  75. n += m
  76. w.idx += m
  77. buf = buf[m:]
  78. if w.idx < len(w.data) {
  79. // Buffer not filled.
  80. return
  81. }
  82. // Buffer full.
  83. if err = w.write(w.data, true); err != nil {
  84. return
  85. }
  86. w.idx = 0
  87. }
  88. return
  89. }
  90. func (w *Writer) write(data []byte, direct bool) error {
  91. if w.isNotConcurrent() {
  92. defer w.handler(len(data))
  93. block := w.frame.Blocks.Block
  94. return block.compress(w, data, w.ht).write(w)
  95. }
  96. size := w.frame.Descriptor.Flags.BlockSizeIndex()
  97. c := make(chan *FrameDataBlock)
  98. w.frame.Blocks.Blocks <- c
  99. go func(c chan *FrameDataBlock, data []byte, size BlockSizeIndex) {
  100. defer w.handler(len(data))
  101. b := newFrameDataBlock(size)
  102. zdata := b.Data
  103. c <- b.compress(w, data, nil)
  104. // Wait for the compressed or uncompressed data to no longer be in use
  105. // and free the allocated buffers
  106. if b.Size.uncompressed() {
  107. zdata, data = data, zdata
  108. }
  109. size.put(data)
  110. <-c
  111. size.put(zdata)
  112. }(c, data, size)
  113. if direct {
  114. w.data = size.get()
  115. }
  116. return nil
  117. }
  118. // Close closes the Writer, flushing any unwritten data to the underlying io.Writer,
  119. // but does not close the underlying io.Writer.
  120. func (w *Writer) Close() (err error) {
  121. switch w.state.state {
  122. case writeState:
  123. case errorState:
  124. return w.state.err
  125. default:
  126. return nil
  127. }
  128. defer func() { w.state.next(err) }()
  129. if w.idx > 0 {
  130. // Flush pending data.
  131. if err = w.write(w.data[:w.idx], false); err != nil {
  132. return err
  133. }
  134. w.idx = 0
  135. }
  136. if w.isNotConcurrent() {
  137. htPool.Put(w.ht)
  138. size := w.frame.Descriptor.Flags.BlockSizeIndex()
  139. size.put(w.data)
  140. w.data = nil
  141. }
  142. return w.frame.closeW(w)
  143. }
  144. // Reset clears the state of the Writer w such that it is equivalent to its
  145. // initial state from NewWriter, but instead writing to writer.
  146. // Reset keeps the previous options unless overwritten by the supplied ones.
  147. // No access to writer is performed.
  148. //
  149. // w.Close must be called before Reset.
  150. func (w *Writer) Reset(writer io.Writer) *Writer {
  151. w.state.state = noState
  152. w.state.next(nil)
  153. w.src = writer
  154. w.frame.initW(w)
  155. size := w.frame.Descriptor.Flags.BlockSizeIndex()
  156. w.data = size.get()
  157. w.idx = 0
  158. if w.isNotConcurrent() {
  159. w.ht = htPool.Get().([]int)
  160. }
  161. return w
  162. }