writer.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370
  1. package lz4
  2. import (
  3. "encoding/binary"
  4. "fmt"
  5. "hash"
  6. "io"
  7. "runtime"
  8. "sync"
  9. )
  10. // Writer implements the LZ4 frame encoder.
  11. type Writer struct {
  12. Header
  13. dst io.Writer
  14. checksum hash.Hash32 // frame checksum
  15. wg sync.WaitGroup // decompressing go routine wait group
  16. data []byte // data to be compressed, only used when dealing with block dependency as we need 64Kb to work with
  17. window []byte // last 64KB of decompressed data (block dependency) + blockMaxSize buffer
  18. }
  19. // NewWriter returns a new LZ4 frame encoder.
  20. // No access to the underlying io.Writer is performed.
  21. // The supplied Header is checked at the first Write.
  22. // It is ok to change it before the first Write but then not until a Reset() is performed.
  23. func NewWriter(dst io.Writer) *Writer {
  24. return &Writer{
  25. dst: dst,
  26. checksum: hashPool.Get(),
  27. Header: Header{
  28. BlockMaxSize: 4 << 20,
  29. },
  30. }
  31. }
  32. // writeHeader builds and writes the header (magic+header) to the underlying io.Writer.
  33. func (z *Writer) writeHeader() error {
  34. // Default to 4Mb if BlockMaxSize is not set
  35. if z.Header.BlockMaxSize == 0 {
  36. z.Header.BlockMaxSize = 4 << 20
  37. }
  38. // the only option that need to be validated
  39. bSize, ok := bsMapValue[z.Header.BlockMaxSize]
  40. if !ok {
  41. return fmt.Errorf("lz4: invalid block max size: %d", z.Header.BlockMaxSize)
  42. }
  43. // magic number(4) + header(flags(2)+[Size(8)+DictID(4)]+checksum(1)) does not exceed 19 bytes
  44. // Size and DictID are optional
  45. var buf [19]byte
  46. // set the fixed size data: magic number, block max size and flags
  47. binary.LittleEndian.PutUint32(buf[0:], frameMagic)
  48. flg := byte(Version << 6)
  49. if !z.Header.BlockDependency {
  50. flg |= 1 << 5
  51. }
  52. if z.Header.BlockChecksum {
  53. flg |= 1 << 4
  54. }
  55. if z.Header.Size > 0 {
  56. flg |= 1 << 3
  57. }
  58. if !z.Header.NoChecksum {
  59. flg |= 1 << 2
  60. }
  61. // if z.Header.Dict {
  62. // flg |= 1
  63. // }
  64. buf[4] = flg
  65. buf[5] = bSize << 4
  66. // current buffer size: magic(4) + flags(1) + block max size (1)
  67. n := 6
  68. // optional items
  69. if z.Header.Size > 0 {
  70. binary.LittleEndian.PutUint64(buf[n:], z.Header.Size)
  71. n += 8
  72. }
  73. // if z.Header.Dict {
  74. // binary.LittleEndian.PutUint32(buf[n:], z.Header.DictID)
  75. // n += 4
  76. // }
  77. // header checksum includes the flags, block max size and optional Size and DictID
  78. z.checksum.Write(buf[4:n])
  79. buf[n] = byte(z.checksum.Sum32() >> 8 & 0xFF)
  80. z.checksum.Reset()
  81. // header ready, write it out
  82. if _, err := z.dst.Write(buf[0 : n+1]); err != nil {
  83. return err
  84. }
  85. z.Header.done = true
  86. return nil
  87. }
  88. // Write compresses data from the supplied buffer into the underlying io.Writer.
  89. // Write does not return until the data has been written.
  90. //
  91. // If the input buffer is large enough (typically in multiples of BlockMaxSize)
  92. // the data will be compressed concurrently.
  93. //
  94. // Write never buffers any data unless in BlockDependency mode where it may
  95. // do so until it has 64Kb of data, after which it never buffers any.
  96. func (z *Writer) Write(buf []byte) (n int, err error) {
  97. if !z.Header.done {
  98. if err = z.writeHeader(); err != nil {
  99. return
  100. }
  101. }
  102. if len(buf) == 0 {
  103. return
  104. }
  105. if !z.NoChecksum {
  106. z.wg.Add(1)
  107. go func(b []byte) {
  108. z.checksum.Write(b)
  109. z.wg.Done()
  110. }(buf)
  111. }
  112. // with block dependency, require at least 64Kb of data to work with
  113. // not having 64Kb only matters initially to setup the first window
  114. bl := 0
  115. if z.BlockDependency && len(z.window) == 0 {
  116. bl = len(z.data)
  117. z.data = append(z.data, buf...)
  118. if len(z.data) < winSize {
  119. z.wg.Wait()
  120. return len(buf), nil
  121. }
  122. buf = z.data
  123. z.data = nil
  124. }
  125. // Break up the input buffer into BlockMaxSize blocks, provisioning the left over block.
  126. // Then compress into each of them concurrently if possible (no dependency).
  127. wbuf := buf
  128. zn := len(wbuf) / z.BlockMaxSize
  129. zblocks := make([]block, zn, zn+1)
  130. for zi := 0; zi < zn; zi++ {
  131. zb := &zblocks[zi]
  132. if z.BlockDependency {
  133. if zi == 0 {
  134. // first block does not have the window
  135. zb.data = append(z.window, wbuf[:z.BlockMaxSize]...)
  136. zb.offset = len(z.window)
  137. wbuf = wbuf[z.BlockMaxSize-winSize:]
  138. } else {
  139. // set the uncompressed data including the window from previous block
  140. zb.data = wbuf[:z.BlockMaxSize+winSize]
  141. zb.offset = winSize
  142. wbuf = wbuf[z.BlockMaxSize:]
  143. }
  144. } else {
  145. zb.data = wbuf[:z.BlockMaxSize]
  146. wbuf = wbuf[z.BlockMaxSize:]
  147. }
  148. z.wg.Add(1)
  149. go z.compressBlock(zb)
  150. }
  151. // left over
  152. if len(buf)%z.BlockMaxSize > 0 {
  153. zblocks = append(zblocks, block{data: wbuf})
  154. zb := &zblocks[zn]
  155. if z.BlockDependency {
  156. if zn == 0 {
  157. zb.data = append(z.window, zb.data...)
  158. zb.offset = len(z.window)
  159. } else {
  160. zb.offset = winSize
  161. }
  162. }
  163. z.wg.Add(1)
  164. go z.compressBlock(zb)
  165. }
  166. z.wg.Wait()
  167. // outputs the compressed data
  168. for zi, zb := range zblocks {
  169. _, err = z.writeBlock(&zb)
  170. written := len(zb.data)
  171. if bl > 0 {
  172. if written >= bl {
  173. written -= bl
  174. bl = 0
  175. } else {
  176. bl -= written
  177. written = 0
  178. }
  179. }
  180. n += written
  181. // remove the window in zb.data
  182. if z.BlockDependency {
  183. if zi == 0 {
  184. n -= len(z.window)
  185. } else {
  186. n -= winSize
  187. }
  188. }
  189. if err != nil {
  190. return
  191. }
  192. }
  193. if z.BlockDependency {
  194. if len(z.window) == 0 {
  195. z.window = make([]byte, winSize)
  196. }
  197. // last buffer may be shorter than the window
  198. if len(buf) >= winSize {
  199. copy(z.window, buf[len(buf)-winSize:])
  200. } else {
  201. copy(z.window, z.window[len(buf):])
  202. copy(z.window[len(buf)+1:], buf)
  203. }
  204. }
  205. return
  206. }
  207. // compressBlock compresses a block.
  208. func (z *Writer) compressBlock(zb *block) {
  209. // compressed block size cannot exceed the input's
  210. zbuf := make([]byte, len(zb.data)-zb.offset)
  211. var (
  212. n int
  213. err error
  214. )
  215. if z.HighCompression {
  216. n, err = CompressBlockHC(zb.data, zbuf, zb.offset)
  217. } else {
  218. n, err = CompressBlock(zb.data, zbuf, zb.offset)
  219. }
  220. // compressible and compressed size smaller than decompressed: ok!
  221. if err == nil && n > 0 && len(zb.zdata) < len(zb.data) {
  222. zb.compressed = true
  223. zb.zdata = zbuf[:n]
  224. } else {
  225. zb.zdata = zb.data[zb.offset:]
  226. }
  227. if z.BlockChecksum {
  228. xxh := hashPool.Get()
  229. xxh.Write(zb.zdata)
  230. zb.checksum = xxh.Sum32()
  231. hashPool.Put(xxh)
  232. }
  233. z.wg.Done()
  234. }
  235. // writeBlock writes a frame block to the underlying io.Writer (size, data).
  236. func (z *Writer) writeBlock(zb *block) (int, error) {
  237. bLen := uint32(len(zb.zdata))
  238. if !zb.compressed {
  239. bLen |= 1 << 31
  240. }
  241. n := 0
  242. if err := binary.Write(z.dst, binary.LittleEndian, bLen); err != nil {
  243. return n, err
  244. }
  245. n += 4
  246. m, err := z.dst.Write(zb.zdata)
  247. n += m
  248. if err != nil {
  249. return n, err
  250. }
  251. if z.BlockChecksum {
  252. if err := binary.Write(z.dst, binary.LittleEndian, zb.checksum); err != nil {
  253. return n, err
  254. }
  255. n += 4
  256. }
  257. return n, nil
  258. }
  259. // Flush flushes any pending compressed data to the underlying writer.
  260. // Flush does not return until the data has been written.
  261. // If the underlying writer returns an error, Flush returns that error.
  262. //
  263. // Flush is only required when in BlockDependency mode and the total of
  264. // data written is less than 64Kb.
  265. func (z *Writer) Flush() error {
  266. if len(z.data) == 0 {
  267. return nil
  268. }
  269. zb := block{data: z.data}
  270. z.wg.Add(1)
  271. z.compressBlock(&zb)
  272. if _, err := z.writeBlock(&zb); err != nil {
  273. return err
  274. }
  275. return nil
  276. }
  277. // Close closes the Writer, flushing any unwritten data to the underlying io.Writer, but does not close the underlying io.Writer.
  278. func (z *Writer) Close() error {
  279. if !z.Header.done {
  280. if err := z.writeHeader(); err != nil {
  281. return err
  282. }
  283. }
  284. // buffered data for the block dependency window
  285. if z.BlockDependency && len(z.data) > 0 {
  286. zb := block{data: z.data}
  287. z.wg.Add(1)
  288. z.compressBlock(&zb)
  289. if _, err := z.writeBlock(&zb); err != nil {
  290. return err
  291. }
  292. }
  293. if err := binary.Write(z.dst, binary.LittleEndian, uint32(0)); err != nil {
  294. return err
  295. }
  296. if !z.NoChecksum {
  297. if err := binary.Write(z.dst, binary.LittleEndian, z.checksum.Sum32()); err != nil {
  298. return err
  299. }
  300. }
  301. return nil
  302. }
  303. // Reset clears the state of the Writer z such that it is equivalent to its
  304. // initial state from NewWriter, but instead writing to w.
  305. // No access to the underlying io.Writer is performed.
  306. func (z *Writer) Reset(w io.Writer) {
  307. z.Header = Header{}
  308. z.dst = w
  309. z.checksum.Reset()
  310. z.data = nil
  311. z.window = nil
  312. }
  313. // ReadFrom compresses the data read from the io.Reader and writes it to the underlying io.Writer.
  314. // Returns the number of bytes read.
  315. // It does not close the Writer.
  316. func (z *Writer) ReadFrom(r io.Reader) (n int64, err error) {
  317. cpus := runtime.GOMAXPROCS(0)
  318. buf := make([]byte, cpus*z.BlockMaxSize)
  319. for {
  320. m, er := io.ReadFull(r, buf)
  321. n += int64(m)
  322. if er == nil || er == io.ErrUnexpectedEOF || er == io.EOF {
  323. if _, err = z.Write(buf[:m]); err != nil {
  324. return
  325. }
  326. if er == nil {
  327. continue
  328. }
  329. return
  330. }
  331. return n, er
  332. }
  333. }