writer.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. package lz4
  2. import "io"
  3. var writerStates = []aState{
  4. noState: newState,
  5. newState: headerState,
  6. headerState: writeState,
  7. writeState: closedState,
  8. closedState: newState,
  9. errorState: newState,
  10. }
  11. // NewWriter returns a new LZ4 frame encoder.
  12. func NewWriter(w io.Writer, options ...Option) (io.WriteCloser, error) {
  13. zw := new(_Writer)
  14. _ = defaultBlockSizeOption(zw)
  15. _ = defaultChecksumOption(zw)
  16. _ = defaultConcurrency(zw)
  17. if err := zw.Reset(w, options...); err != nil {
  18. return nil, err
  19. }
  20. return zw, nil
  21. }
  22. type _Writer struct {
  23. state _State
  24. buf [11]byte // frame descriptor needs at most 4+8+1=11 bytes
  25. src io.Writer // destination writer
  26. level CompressionLevel // how hard to try
  27. num int // concurrency level
  28. frame Frame // frame being built
  29. ht []int // hash table (set if no concurrency)
  30. data []byte // pending data
  31. idx int // size of pending data
  32. }
  33. func (w *_Writer) isNotConcurrent() bool {
  34. return w.num == 1
  35. }
  36. func (w *_Writer) Write(buf []byte) (n int, err error) {
  37. defer w.state.check(&err)
  38. switch w.state.state {
  39. case closedState, errorState:
  40. return 0, w.state.err
  41. case newState:
  42. w.state.next(nil)
  43. if err = w.frame.Descriptor.write(w); w.state.next(err) {
  44. return
  45. }
  46. default:
  47. return 0, w.state.fail()
  48. }
  49. zn := len(w.data)
  50. for len(buf) > 0 {
  51. if w.idx == 0 && len(buf) >= zn {
  52. // Avoid a copy as there is enough data for a block.
  53. if err = w.write(); err != nil {
  54. return
  55. }
  56. n += zn
  57. buf = buf[zn:]
  58. continue
  59. }
  60. // Accumulate the data to be compressed.
  61. m := copy(w.data[w.idx:], buf)
  62. n += m
  63. w.idx += m
  64. buf = buf[m:]
  65. if w.idx < len(w.data) {
  66. // Buffer not filled.
  67. return
  68. }
  69. // Buffer full.
  70. if err = w.write(); err != nil {
  71. return
  72. }
  73. w.idx = 0
  74. }
  75. return
  76. }
  77. func (w *_Writer) write() error {
  78. if w.isNotConcurrent() {
  79. return w.frame.Blocks.Block.compress(w, w.data, w.ht).write(w)
  80. }
  81. size := w.frame.Descriptor.Flags.BlockSizeIndex()
  82. c := make(chan *FrameDataBlock)
  83. w.frame.Blocks.Blocks <- c
  84. go func(c chan *FrameDataBlock, data []byte, size BlockSizeIndex) {
  85. b := newFrameDataBlock(size)
  86. zdata := b.Data
  87. c <- b.compress(w, data, nil)
  88. // Wait for the compressed or uncompressed data to no longer be in use
  89. // and free the allocated buffers
  90. if !b.Size.compressed() {
  91. zdata, data = data, zdata
  92. }
  93. size.put(data)
  94. <-c
  95. size.put(zdata)
  96. }(c, w.data, size)
  97. if w.idx > 0 {
  98. // Not closed.
  99. w.data = size.get()
  100. }
  101. w.idx = 0
  102. return nil
  103. }
  104. // Close closes the Writer, flushing any unwritten data to the underlying io.Writer,
  105. // but does not close the underlying io.Writer.
  106. func (w *_Writer) Close() error {
  107. switch w.state.state {
  108. case writeState:
  109. case errorState:
  110. return w.state.err
  111. default:
  112. return nil
  113. }
  114. var err error
  115. defer func() { w.state.next(err) }()
  116. if idx := w.idx; idx > 0 {
  117. // Flush pending data.
  118. w.data = w.data[:idx]
  119. w.idx = 0
  120. if err = w.write(); err != nil {
  121. return err
  122. }
  123. w.data = nil
  124. }
  125. if w.isNotConcurrent() {
  126. htPool.Put(w.ht)
  127. size := w.frame.Descriptor.Flags.BlockSizeIndex()
  128. size.put(w.data)
  129. }
  130. return w.frame.closeW(w)
  131. }
  132. // Reset clears the state of the Writer w such that it is equivalent to its
  133. // initial state from NewWriter, but instead writing to writer.
  134. // Reset keeps the previous options unless overwritten by the supplied ones.
  135. // No access to writer is performed.
  136. //
  137. // w.Close must be called before Reset.
  138. func (w *_Writer) Reset(writer io.Writer, options ...Option) (err error) {
  139. for _, o := range options {
  140. if err = o(w); err != nil {
  141. break
  142. }
  143. }
  144. w.state.state = noState
  145. if w.state.next(err) {
  146. return
  147. }
  148. w.src = writer
  149. w.frame.initW(w)
  150. size := w.frame.Descriptor.Flags.BlockSizeIndex()
  151. w.data = size.get()
  152. w.idx = 0
  153. if w.isNotConcurrent() {
  154. w.ht = htPool.Get().([]int)
  155. }
  156. return nil
  157. }