Browse Source

Remove mallocs in writer.go

Connor Peet 9 years ago
parent
commit
7f1eefc66a
3 changed files with 95 additions and 78 deletions
  1. 3 2
      lz4.go
  2. 10 0
      lz4_test.go
  3. 82 76
      writer.go

+ 3 - 2
lz4.go

@@ -43,8 +43,9 @@ const (
 	// Its value influences the compression speed and memory usage, the lower the faster,
 	// but at the expense of the compression ratio.
 	// 16 seems to be the best compromise.
-	hashLog   = 16
-	hashShift = uint((minMatch * 8) - hashLog)
+	hashLog       = 16
+	hashTableSize = 1 << hashLog
+	hashShift     = uint((minMatch * 8) - hashLog)
 
 	mfLimit      = 8 + minMatch // The last match cannot start within the last 12 bytes.
 	skipStrength = 6            // variable step for fast scan

+ 10 - 0
lz4_test.go

@@ -6,6 +6,7 @@ import (
 	"encoding/binary"
 	"fmt"
 	"io"
+	"io/ioutil"
 	"math/big"
 	"reflect"
 	"testing"
@@ -303,6 +304,15 @@ func BenchmarkCompressBlockHC(b *testing.B) {
 		lz4.CompressBlockHC(d, z, 0)
 	}
 }
+func BenchmarkCompressEndToEnd(b *testing.B) {
+	w := lz4.NewWriter(ioutil.Discard)
+	b.ResetTimer()
+	for i := 0; i < b.N; i++ {
+		if _, err := w.Write(lorem); err != nil {
+			b.Fatal(err)
+		}
+	}
+}
 
 // TestNoWrite compresses without any call to Write() (empty frame).
 // It does so checking all possible headers.

+ 82 - 76
writer.go

@@ -6,17 +6,18 @@ import (
 	"hash"
 	"io"
 	"runtime"
-	"sync"
 )
 
 // Writer implements the LZ4 frame encoder.
 type Writer struct {
 	Header
 	dst      io.Writer
-	checksum hash.Hash32    // frame checksum
-	wg       sync.WaitGroup // decompressing go routine wait group
-	data     []byte         // data to be compressed, only used when dealing with block dependency as we need 64Kb to work with
-	window   []byte         // last 64KB of decompressed data (block dependency) + blockMaxSize buffer
+	checksum hash.Hash32 // frame checksum
+	data     []byte      // data to be compressed, only used when dealing with block dependency as we need 64Kb to work with
+	window   []byte      // last 64KB of decompressed data (block dependency) + blockMaxSize buffer
+
+	zbCompressBuf []byte // buffer for compressing lz4 blocks
+	writeSizeBuf  []byte // four-byte slice for writing checksums and sizes in writeblock
 }
 
 // NewWriter returns a new LZ4 frame encoder.
@@ -30,6 +31,7 @@ func NewWriter(dst io.Writer) *Writer {
 		Header: Header{
 			BlockMaxSize: 4 << 20,
 		},
+		writeSizeBuf: make([]byte, 4),
 	}
 }
 
@@ -64,9 +66,9 @@ func (z *Writer) writeHeader() error {
 	if !z.Header.NoChecksum {
 		flg |= 1 << 2
 	}
-	// 	if z.Header.Dict {
-	// 		flg |= 1
-	// 	}
+	//  if z.Header.Dict {
+	//      flg |= 1
+	//  }
 	buf[4] = flg
 	buf[5] = bSize << 4
 
@@ -77,10 +79,10 @@ func (z *Writer) writeHeader() error {
 		binary.LittleEndian.PutUint64(buf[n:], z.Header.Size)
 		n += 8
 	}
-	// 	if z.Header.Dict {
-	// 		binary.LittleEndian.PutUint32(buf[n:], z.Header.DictID)
-	// 		n += 4
-	// 	}
+	//  if z.Header.Dict {
+	//      binary.LittleEndian.PutUint32(buf[n:], z.Header.DictID)
+	//      n += 4
+	//  }
 
 	// header checksum includes the flags, block max size and optional Size and DictID
 	z.checksum.Write(buf[4:n])
@@ -93,6 +95,9 @@ func (z *Writer) writeHeader() error {
 	}
 	z.Header.done = true
 
+	// initialize buffers dependent on header info
+	z.zbCompressBuf = make([]byte, winSize+z.BlockMaxSize)
+
 	return nil
 }
 
@@ -116,11 +121,7 @@ func (z *Writer) Write(buf []byte) (n int, err error) {
 	}
 
 	if !z.NoChecksum {
-		z.wg.Add(1)
-		go func(b []byte) {
-			z.checksum.Write(b)
-			z.wg.Done()
-		}(buf)
+		z.checksum.Write(buf)
 	}
 
 	// with block dependency, require at least 64Kb of data to work with
@@ -130,7 +131,6 @@ func (z *Writer) Write(buf []byte) (n int, err error) {
 		bl = len(z.data)
 		z.data = append(z.data, buf...)
 		if len(z.data) < winSize {
-			z.wg.Wait()
 			return len(buf), nil
 		}
 		buf = z.data
@@ -139,11 +139,16 @@ func (z *Writer) Write(buf []byte) (n int, err error) {
 
 	// Break up the input buffer into BlockMaxSize blocks, provisioning the left over block.
 	// Then compress into each of them concurrently if possible (no dependency).
-	wbuf := buf
-	zn := len(wbuf) / z.BlockMaxSize
-	zblocks := make([]block, zn, zn+1)
-	for zi := 0; zi < zn; zi++ {
-		zb := &zblocks[zi]
+	var (
+		zb       block
+		wbuf     = buf
+		zn       = len(wbuf) / z.BlockMaxSize
+		zi       = 0
+		leftover = len(buf) % z.BlockMaxSize
+	)
+
+loop:
+	for zi < zn {
 		if z.BlockDependency {
 			if zi == 0 {
 				// first block does not have the window
@@ -161,14 +166,12 @@ func (z *Writer) Write(buf []byte) (n int, err error) {
 			wbuf = wbuf[z.BlockMaxSize:]
 		}
 
-		z.wg.Add(1)
-		go z.compressBlock(zb)
+		goto write
 	}
 
 	// left over
-	if len(buf)%z.BlockMaxSize > 0 {
-		zblocks = append(zblocks, block{data: wbuf})
-		zb := &zblocks[zn]
+	if leftover > 0 {
+		zb = block{data: wbuf}
 		if z.BlockDependency {
 			if zn == 0 {
 				zb.data = append(z.window, zb.data...)
@@ -177,38 +180,9 @@ func (z *Writer) Write(buf []byte) (n int, err error) {
 				zb.offset = winSize
 			}
 		}
-		z.wg.Add(1)
-		go z.compressBlock(zb)
-	}
-	z.wg.Wait()
-
-	// outputs the compressed data
-	for zi, zb := range zblocks {
-		_, err = z.writeBlock(&zb)
 
-		written := len(zb.data)
-		if bl > 0 {
-			if written >= bl {
-				written -= bl
-				bl = 0
-			} else {
-				bl -= written
-				written = 0
-			}
-		}
-
-		n += written
-		// remove the window in zb.data
-		if z.BlockDependency {
-			if zi == 0 {
-				n -= len(z.window)
-			} else {
-				n -= winSize
-			}
-		}
-		if err != nil {
-			return
-		}
+		leftover = 0
+		goto write
 	}
 
 	if z.BlockDependency {
@@ -225,15 +199,45 @@ func (z *Writer) Write(buf []byte) (n int, err error) {
 	}
 
 	return
+
+write:
+	zb = z.compressBlock(zb)
+	_, err = z.writeBlock(zb)
+
+	written := len(zb.data)
+	if bl > 0 {
+		if written >= bl {
+			written -= bl
+			bl = 0
+		} else {
+			bl -= written
+			written = 0
+		}
+	}
+
+	n += written
+	// remove the window in zb.data
+	if z.BlockDependency {
+		if zi == 0 {
+			n -= len(z.window)
+		} else {
+			n -= winSize
+		}
+	}
+	if err != nil {
+		return
+	}
+	zi++
+	goto loop
 }
 
 // compressBlock compresses a block.
-func (z *Writer) compressBlock(zb *block) {
+func (z *Writer) compressBlock(zb block) block {
 	// compressed block size cannot exceed the input's
-	zbuf := make([]byte, len(zb.data)-zb.offset)
 	var (
-		n   int
-		err error
+		n    int
+		err  error
+		zbuf = z.zbCompressBuf
 	)
 	if z.HighCompression {
 		n, err = CompressBlockHC(zb.data, zbuf, zb.offset)
@@ -256,21 +260,23 @@ func (z *Writer) compressBlock(zb *block) {
 		hashPool.Put(xxh)
 	}
 
-	z.wg.Done()
+	return zb
 }
 
 // writeBlock writes a frame block to the underlying io.Writer (size, data).
-func (z *Writer) writeBlock(zb *block) (int, error) {
+func (z *Writer) writeBlock(zb block) (int, error) {
 	bLen := uint32(len(zb.zdata))
 	if !zb.compressed {
 		bLen |= 1 << 31
 	}
 
 	n := 0
-	if err := binary.Write(z.dst, binary.LittleEndian, bLen); err != nil {
+
+	binary.LittleEndian.PutUint32(z.writeSizeBuf, bLen)
+	n, err := z.dst.Write(z.writeSizeBuf)
+	if err != nil {
 		return n, err
 	}
-	n += 4
 
 	m, err := z.dst.Write(zb.zdata)
 	n += m
@@ -279,10 +285,13 @@ func (z *Writer) writeBlock(zb *block) (int, error) {
 	}
 
 	if z.BlockChecksum {
-		if err := binary.Write(z.dst, binary.LittleEndian, zb.checksum); err != nil {
+		binary.LittleEndian.PutUint32(z.writeSizeBuf, zb.checksum)
+		m, err := z.dst.Write(z.writeSizeBuf)
+		n += m
+
+		if err != nil {
 			return n, err
 		}
-		n += 4
 	}
 
 	return n, nil
@@ -298,10 +307,9 @@ func (z *Writer) Flush() error {
 	if len(z.data) == 0 {
 		return nil
 	}
-	zb := block{data: z.data}
-	z.wg.Add(1)
-	z.compressBlock(&zb)
-	if _, err := z.writeBlock(&zb); err != nil {
+
+	zb := z.compressBlock(block{data: z.data})
+	if _, err := z.writeBlock(zb); err != nil {
 		return err
 	}
 	return nil
@@ -318,9 +326,7 @@ func (z *Writer) Close() error {
 	// buffered data for the block dependency window
 	if z.BlockDependency && len(z.data) > 0 {
 		zb := block{data: z.data}
-		z.wg.Add(1)
-		z.compressBlock(&zb)
-		if _, err := z.writeBlock(&zb); err != nil {
+		if _, err := z.writeBlock(z.compressBlock(zb)); err != nil {
 			return err
 		}
 	}