writer.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367
  1. package lz4
  2. import (
  3. "encoding/binary"
  4. "fmt"
  5. "hash"
  6. "io"
  7. "runtime"
  8. "sync"
  9. )
  10. // Writer implements the LZ4 frame encoder.
  11. type Writer struct {
  12. Header
  13. dst io.Writer
  14. checksum hash.Hash32 // frame checksum
  15. wg sync.WaitGroup // decompressing go routine wait group
  16. data []byte // data to be compressed, only used when dealing with block dependency as we need 64Kb to work with
  17. window []byte // last 64KB of decompressed data (block dependency) + blockMaxSize buffer
  18. }
  19. // NewWriter returns a new LZ4 frame encoder.
  20. // No access to the underlying io.Writer is performed.
  21. // The supplied Header is checked at the first Write.
  22. // It is ok to change it before the first Write but then not until a Reset() is performed.
  23. func NewWriter(dst io.Writer) *Writer {
  24. return &Writer{
  25. dst: dst,
  26. checksum: hashPool.Get(),
  27. Header: Header{
  28. BlockMaxSize: 4 << 20,
  29. },
  30. }
  31. }
  32. // writeHeader builds and writes the header (magic+header) to the underlying io.Writer.
  33. func (z *Writer) writeHeader() error {
  34. // Default to 4Mb if BlockMaxSize is not set
  35. if z.Header.BlockMaxSize == 0 {
  36. z.Header.BlockMaxSize = 4 << 20
  37. }
  38. // the only option that need to be validated
  39. bSize, ok := bsMapValue[z.Header.BlockMaxSize]
  40. if !ok {
  41. return fmt.Errorf("lz4: invalid block max size: %d", z.Header.BlockMaxSize)
  42. }
  43. // magic number(4) + header(flags(2)+[Size(8)+DictID(4)]+checksum(1)) does not exceed 19 bytes
  44. // Size and DictID are optional
  45. var buf [19]byte
  46. // set the fixed size data: magic number, block max size and flags
  47. binary.LittleEndian.PutUint32(buf[0:], frameMagic)
  48. flg := byte(Version << 6)
  49. if !z.Header.BlockDependency {
  50. flg |= 1 << 5
  51. }
  52. if z.Header.BlockChecksum {
  53. flg |= 1 << 4
  54. }
  55. if z.Header.Size > 0 {
  56. flg |= 1 << 3
  57. }
  58. if !z.Header.NoChecksum {
  59. flg |= 1 << 2
  60. }
  61. // if z.Header.Dict {
  62. // flg |= 1
  63. // }
  64. buf[4] = flg
  65. buf[5] = bSize << 4
  66. // current buffer size: magic(4) + flags(1) + block max size (1)
  67. n := 6
  68. // optional items
  69. if z.Header.Size > 0 {
  70. binary.LittleEndian.PutUint64(buf[n:], z.Header.Size)
  71. n += 8
  72. }
  73. // if z.Header.Dict {
  74. // binary.LittleEndian.PutUint32(buf[n:], z.Header.DictID)
  75. // n += 4
  76. // }
  77. // header checksum includes the flags, block max size and optional Size and DictID
  78. z.checksum.Write(buf[4:n])
  79. buf[n] = byte(z.checksum.Sum32() >> 8 & 0xFF)
  80. z.checksum.Reset()
  81. // header ready, write it out
  82. if _, err := z.dst.Write(buf[0 : n+1]); err != nil {
  83. return err
  84. }
  85. z.Header.done = true
  86. return nil
  87. }
  88. // Write compresses data from the supplied buffer into the underlying io.Writer.
  89. // Write does not return until the data has been written.
  90. //
  91. // If the input buffer is large enough (typically in multiples of BlockMaxSize)
  92. // the data will be compressed concurrently.
  93. //
  94. // Write never buffers any data unless in BlockDependency mode where it may
  95. // do so until it has 64Kb of data, after which it never buffers any.
  96. func (z *Writer) Write(buf []byte) (n int, err error) {
  97. if !z.Header.done {
  98. if err = z.writeHeader(); err != nil {
  99. return
  100. }
  101. }
  102. if len(buf) == 0 {
  103. return
  104. }
  105. if !z.NoChecksum {
  106. z.wg.Add(1)
  107. go func(b []byte) {
  108. z.checksum.Write(b)
  109. z.wg.Done()
  110. }(buf)
  111. }
  112. // with block dependency, require at least 64Kb of data to work with
  113. // not having 64Kb only matters initially to setup the first window
  114. bl := 0
  115. if z.BlockDependency && len(z.window) == 0 {
  116. bl = len(z.data)
  117. z.data = append(z.data, buf...)
  118. if len(z.data) < winSize {
  119. z.wg.Wait()
  120. return len(buf), nil
  121. }
  122. buf = z.data
  123. z.data = nil
  124. }
  125. // Break up the input buffer into BlockMaxSize blocks, provisioning the left over block.
  126. // Then compress into each of them concurrently if possible (no dependency).
  127. wbuf := buf
  128. zn := len(wbuf) / z.BlockMaxSize
  129. zblocks := make([]block, zn, zn+1)
  130. for zi := 0; zi < zn; zi++ {
  131. zb := &zblocks[zi]
  132. if z.BlockDependency {
  133. if zi == 0 {
  134. // first block does not have the window
  135. zb.data = append(z.window, wbuf[:z.BlockMaxSize]...)
  136. zb.offset = len(z.window)
  137. wbuf = wbuf[z.BlockMaxSize-winSize:]
  138. } else {
  139. // set the uncompressed data including the window from previous block
  140. zb.data = wbuf[:z.BlockMaxSize+winSize]
  141. zb.offset = winSize
  142. wbuf = wbuf[z.BlockMaxSize:]
  143. }
  144. } else {
  145. zb.data = wbuf[:z.BlockMaxSize]
  146. wbuf = wbuf[z.BlockMaxSize:]
  147. }
  148. z.wg.Add(1)
  149. go z.compressBlock(zb)
  150. }
  151. // left over
  152. if len(buf)%z.BlockMaxSize > 0 {
  153. zblocks = append(zblocks, block{data: wbuf})
  154. zb := &zblocks[zn]
  155. if z.BlockDependency {
  156. if zn == 0 {
  157. zb.data = append(z.window, zb.data...)
  158. zb.offset = len(z.window)
  159. } else {
  160. zb.offset = winSize
  161. }
  162. }
  163. z.wg.Add(1)
  164. go z.compressBlock(zb)
  165. }
  166. z.wg.Wait()
  167. // outputs the compressed data
  168. for zi, zb := range zblocks {
  169. _, err = z.writeBlock(&zb)
  170. written := len(zb.data)
  171. if bl > 0 {
  172. if written >= bl {
  173. written -= bl
  174. bl = 0
  175. } else {
  176. bl -= written
  177. written = 0
  178. }
  179. }
  180. n += written
  181. // remove the window in zb.data
  182. if z.BlockDependency {
  183. if zi == 0 {
  184. n -= len(z.window)
  185. } else {
  186. n -= winSize
  187. }
  188. }
  189. if err != nil {
  190. return
  191. }
  192. }
  193. if z.BlockDependency {
  194. if len(z.window) == 0 {
  195. z.window = make([]byte, winSize)
  196. }
  197. // last buffer may be shorter than the window
  198. if len(buf) > winSize {
  199. copy(z.window, buf[len(buf)-winSize:])
  200. }
  201. }
  202. return
  203. }
  204. // compressBlock compresses a block.
  205. func (z *Writer) compressBlock(zb *block) {
  206. // compressed block size cannot exceed the input's
  207. zbuf := make([]byte, len(zb.data)-zb.offset)
  208. var (
  209. n int
  210. err error
  211. )
  212. if z.HighCompression {
  213. n, err = CompressBlockHC(zb.data, zbuf, zb.offset)
  214. } else {
  215. n, err = CompressBlock(zb.data, zbuf, zb.offset)
  216. }
  217. // compressible and compressed size smaller than decompressed: ok!
  218. if err == nil && n > 0 && len(zb.zdata) < len(zb.data) {
  219. zb.compressed = true
  220. zb.zdata = zbuf[:n]
  221. } else {
  222. zb.zdata = zb.data[zb.offset:]
  223. }
  224. if z.BlockChecksum {
  225. xxh := hashPool.Get()
  226. xxh.Write(zb.zdata)
  227. zb.checksum = xxh.Sum32()
  228. hashPool.Put(xxh)
  229. }
  230. z.wg.Done()
  231. }
  232. // writeBlock writes a frame block to the underlying io.Writer (size, data).
  233. func (z *Writer) writeBlock(zb *block) (int, error) {
  234. bLen := uint32(len(zb.zdata))
  235. if !zb.compressed {
  236. bLen |= 1 << 31
  237. }
  238. n := 0
  239. if err := binary.Write(z.dst, binary.LittleEndian, bLen); err != nil {
  240. return n, err
  241. }
  242. n += 4
  243. m, err := z.dst.Write(zb.zdata)
  244. n += m
  245. if err != nil {
  246. return n, err
  247. }
  248. if z.BlockChecksum {
  249. if err := binary.Write(z.dst, binary.LittleEndian, zb.checksum); err != nil {
  250. return n, err
  251. }
  252. n += 4
  253. }
  254. return n, nil
  255. }
  256. // Flush flushes any pending compressed data to the underlying writer.
  257. // Flush does not return until the data has been written.
  258. // If the underlying writer returns an error, Flush returns that error.
  259. //
  260. // Flush is only required when in BlockDependency mode and the total of
  261. // data written is less than 64Kb.
  262. func (z *Writer) Flush() error {
  263. if len(z.data) == 0 {
  264. return nil
  265. }
  266. zb := block{data: z.data}
  267. z.wg.Add(1)
  268. z.compressBlock(&zb)
  269. if _, err := z.writeBlock(&zb); err != nil {
  270. return err
  271. }
  272. return nil
  273. }
  274. // Close closes the Writer, flushing any unwritten data to the underlying io.Writer, but does not close the underlying io.Writer.
  275. func (z *Writer) Close() error {
  276. if !z.Header.done {
  277. if err := z.writeHeader(); err != nil {
  278. return err
  279. }
  280. }
  281. // buffered data for the block dependency window
  282. if z.BlockDependency && len(z.data) > 0 {
  283. zb := block{data: z.data}
  284. z.wg.Add(1)
  285. z.compressBlock(&zb)
  286. if _, err := z.writeBlock(&zb); err != nil {
  287. return err
  288. }
  289. }
  290. if err := binary.Write(z.dst, binary.LittleEndian, uint32(0)); err != nil {
  291. return err
  292. }
  293. if !z.NoChecksum {
  294. if err := binary.Write(z.dst, binary.LittleEndian, z.checksum.Sum32()); err != nil {
  295. return err
  296. }
  297. }
  298. return nil
  299. }
  300. // Reset clears the state of the Writer z such that it is equivalent to its
  301. // initial state from NewWriter, but instead writing to w.
  302. // No access to the underlying io.Writer is performed.
  303. func (z *Writer) Reset(w io.Writer) {
  304. z.Header = Header{}
  305. z.dst = w
  306. z.checksum.Reset()
  307. z.data = nil
  308. z.window = nil
  309. }
  310. // ReadFrom compresses the data read from the io.Reader and writes it to the underlying io.Writer.
  311. // Returns the number of bytes read.
  312. // It does not close the Writer.
  313. func (z *Writer) ReadFrom(r io.Reader) (n int64, err error) {
  314. cpus := runtime.GOMAXPROCS(0)
  315. buf := make([]byte, cpus*z.BlockMaxSize)
  316. for {
  317. m, er := io.ReadFull(r, buf)
  318. n += int64(m)
  319. if er == nil || er == io.ErrUnexpectedEOF || er == io.EOF {
  320. if _, err = z.Write(buf[:m]); err != nil {
  321. return
  322. }
  323. if er == nil {
  324. continue
  325. }
  326. return
  327. }
  328. return n, er
  329. }
  330. }