Browse Source

Merge pull request #8 from WatchBeam/master

Increase general compression efficiency
Pierre Curto 9 years ago
parent
commit
5c9560bfa9
4 changed files with 151 additions and 85 deletions
  1. 35 6
      block.go
  2. 16 2
      lz4.go
  3. 10 0
      lz4_test.go
  4. 90 77
      writer.go

+ 35 - 6
block.go

@@ -3,6 +3,7 @@ package lz4
 import (
 import (
 	"encoding/binary"
 	"encoding/binary"
 	"errors"
 	"errors"
+	"unsafe"
 )
 )
 
 
 // block represents a frame data block.
 // block represents a frame data block.
@@ -110,6 +111,11 @@ func UncompressBlock(src, dst []byte, di int) (int, error) {
 	}
 	}
 }
 }
 
 
+type hashEntry struct {
+	generation uint
+	value      int
+}
+
 // CompressBlock compresses the source buffer starting at soffet into the destination one.
 // CompressBlock compresses the source buffer starting at soffet into the destination one.
 // This is the fast version of LZ4 compression and also the default one.
 // This is the fast version of LZ4 compression and also the default one.
 //
 //
@@ -117,6 +123,27 @@ func UncompressBlock(src, dst []byte, di int) (int, error) {
 //
 //
 // An error is returned if the destination buffer is too small.
 // An error is returned if the destination buffer is too small.
 func CompressBlock(src, dst []byte, soffset int) (int, error) {
 func CompressBlock(src, dst []byte, soffset int) (int, error) {
+	var hashTable [hashTableSize]hashEntry
+	return compressGenerationalBlock(src, dst, soffset, 0, hashTable[:])
+}
+
+// getUint32 is a despicably evil function (well, for Go!) that takes advantage
+// of the machine's byte order to save some operations. This may look
+// inefficient but it is significantly faster on littleEndian machines,
+// which include x84, amd64, and some ARM processors.
+func getUint32(b []byte) uint32 {
+	_ = b[3]
+	if isLittleEndian {
+		return *(*uint32)(unsafe.Pointer(&b))
+	}
+
+	return uint32(b[0]) |
+		uint32(b[1])<<8 |
+		uint32(b[2])<<16 |
+		uint32(b[3])<<24
+}
+
+func compressGenerationalBlock(src, dst []byte, soffset int, generation uint, hashTable []hashEntry) (int, error) {
 	sn, dn := len(src)-mfLimit, len(dst)
 	sn, dn := len(src)-mfLimit, len(dst)
 	if sn <= 0 || dn == 0 || soffset >= sn {
 	if sn <= 0 || dn == 0 || soffset >= sn {
 		return 0, nil
 		return 0, nil
@@ -125,26 +152,28 @@ func CompressBlock(src, dst []byte, soffset int) (int, error) {
 
 
 	// fast scan strategy:
 	// fast scan strategy:
 	// we only need a hash table to store the last sequences (4 bytes)
 	// we only need a hash table to store the last sequences (4 bytes)
-	var hashTable [1 << hashLog]int
 	var hashShift = uint((minMatch * 8) - hashLog)
 	var hashShift = uint((minMatch * 8) - hashLog)
 
 
 	// Initialise the hash table with the first 64Kb of the input buffer
 	// Initialise the hash table with the first 64Kb of the input buffer
 	// (used when compressing dependent blocks)
 	// (used when compressing dependent blocks)
 	for si < soffset {
 	for si < soffset {
-		h := binary.LittleEndian.Uint32(src[si:]) * hasher >> hashShift
+		h := getUint32(src[si:]) * hasher >> hashShift
 		si++
 		si++
-		hashTable[h] = si
+		hashTable[h] = hashEntry{generation, si}
 	}
 	}
 
 
 	anchor := si
 	anchor := si
 	fma := 1 << skipStrength
 	fma := 1 << skipStrength
 	for si < sn-minMatch {
 	for si < sn-minMatch {
 		// hash the next 4 bytes (sequence)...
 		// hash the next 4 bytes (sequence)...
-		h := binary.LittleEndian.Uint32(src[si:]) * hasher >> hashShift
+		h := getUint32(src[si:]) * hasher >> hashShift
+		if hashTable[h].generation != generation {
+			hashTable[h] = hashEntry{generation, 0}
+		}
 		// -1 to separate existing entries from new ones
 		// -1 to separate existing entries from new ones
-		ref := hashTable[h] - 1
+		ref := hashTable[h].value - 1
 		// ...and store the position of the hash in the hash table (+1 to compensate the -1 upon saving)
 		// ...and store the position of the hash in the hash table (+1 to compensate the -1 upon saving)
-		hashTable[h] = si + 1
+		hashTable[h].value = si + 1
 		// no need to check the last 3 bytes in the first literal 4 bytes as
 		// no need to check the last 3 bytes in the first literal 4 bytes as
 		// this guarantees that the next match, if any, is compressed with
 		// this guarantees that the next match, if any, is compressed with
 		// a lower size, since to have some compression we must have:
 		// a lower size, since to have some compression we must have:

+ 16 - 2
lz4.go

@@ -20,6 +20,7 @@ package lz4
 import (
 import (
 	"hash"
 	"hash"
 	"sync"
 	"sync"
+	"unsafe"
 
 
 	"github.com/pierrec/xxHash/xxHash32"
 	"github.com/pierrec/xxHash/xxHash32"
 )
 )
@@ -43,8 +44,9 @@ const (
 	// Its value influences the compression speed and memory usage, the lower the faster,
 	// Its value influences the compression speed and memory usage, the lower the faster,
 	// but at the expense of the compression ratio.
 	// but at the expense of the compression ratio.
 	// 16 seems to be the best compromise.
 	// 16 seems to be the best compromise.
-	hashLog   = 16
-	hashShift = uint((minMatch * 8) - hashLog)
+	hashLog       = 16
+	hashTableSize = 1 << hashLog
+	hashShift     = uint((minMatch * 8) - hashLog)
 
 
 	mfLimit      = 8 + minMatch // The last match cannot start within the last 12 bytes.
 	mfLimit      = 8 + minMatch // The last match cannot start within the last 12 bytes.
 	skipStrength = 6            // variable step for fast scan
 	skipStrength = 6            // variable step for fast scan
@@ -63,6 +65,18 @@ func init() {
 	}
 	}
 }
 }
 
 
+var isLittleEndian = getIsLittleEndian()
+
+func getIsLittleEndian() (ret bool) {
+	var i int = 0x1
+	bs := (*[1]byte)(unsafe.Pointer(&i))
+	if bs[0] == 0 {
+		return false
+	}
+
+	return true
+}
+
 // Header describes the various flags that can be set on a Writer or obtained from a Reader.
 // Header describes the various flags that can be set on a Writer or obtained from a Reader.
 // The default values match those of the LZ4 frame format definition (http://fastcompression.blogspot.com/2013/04/lz4-streaming-format-final.html).
 // The default values match those of the LZ4 frame format definition (http://fastcompression.blogspot.com/2013/04/lz4-streaming-format-final.html).
 //
 //

+ 10 - 0
lz4_test.go

@@ -6,6 +6,7 @@ import (
 	"encoding/binary"
 	"encoding/binary"
 	"fmt"
 	"fmt"
 	"io"
 	"io"
+	"io/ioutil"
 	"math/big"
 	"math/big"
 	"reflect"
 	"reflect"
 	"testing"
 	"testing"
@@ -303,6 +304,15 @@ func BenchmarkCompressBlockHC(b *testing.B) {
 		lz4.CompressBlockHC(d, z, 0)
 		lz4.CompressBlockHC(d, z, 0)
 	}
 	}
 }
 }
+func BenchmarkCompressEndToEnd(b *testing.B) {
+	w := lz4.NewWriter(ioutil.Discard)
+	b.ResetTimer()
+	for i := 0; i < b.N; i++ {
+		if _, err := w.Write(lorem); err != nil {
+			b.Fatal(err)
+		}
+	}
+}
 
 
 // TestNoWrite compresses without any call to Write() (empty frame).
 // TestNoWrite compresses without any call to Write() (empty frame).
 // It does so checking all possible headers.
 // It does so checking all possible headers.

+ 90 - 77
writer.go

@@ -6,17 +6,20 @@ import (
 	"hash"
 	"hash"
 	"io"
 	"io"
 	"runtime"
 	"runtime"
-	"sync"
 )
 )
 
 
 // Writer implements the LZ4 frame encoder.
 // Writer implements the LZ4 frame encoder.
 type Writer struct {
 type Writer struct {
 	Header
 	Header
 	dst      io.Writer
 	dst      io.Writer
-	checksum hash.Hash32    // frame checksum
-	wg       sync.WaitGroup // decompressing go routine wait group
-	data     []byte         // data to be compressed, only used when dealing with block dependency as we need 64Kb to work with
-	window   []byte         // last 64KB of decompressed data (block dependency) + blockMaxSize buffer
+	checksum hash.Hash32 // frame checksum
+	data     []byte      // data to be compressed, only used when dealing with block dependency as we need 64Kb to work with
+	window   []byte      // last 64KB of decompressed data (block dependency) + blockMaxSize buffer
+
+	zbCompressBuf     []byte // buffer for compressing lz4 blocks
+	writeSizeBuf      []byte // four-byte slice for writing checksums and sizes in writeblock
+	hashTable         []hashEntry
+	currentGeneration uint
 }
 }
 
 
 // NewWriter returns a new LZ4 frame encoder.
 // NewWriter returns a new LZ4 frame encoder.
@@ -30,6 +33,8 @@ func NewWriter(dst io.Writer) *Writer {
 		Header: Header{
 		Header: Header{
 			BlockMaxSize: 4 << 20,
 			BlockMaxSize: 4 << 20,
 		},
 		},
+		hashTable:    make([]hashEntry, hashTableSize),
+		writeSizeBuf: make([]byte, 4),
 	}
 	}
 }
 }
 
 
@@ -64,9 +69,9 @@ func (z *Writer) writeHeader() error {
 	if !z.Header.NoChecksum {
 	if !z.Header.NoChecksum {
 		flg |= 1 << 2
 		flg |= 1 << 2
 	}
 	}
-	// 	if z.Header.Dict {
-	// 		flg |= 1
-	// 	}
+	//  if z.Header.Dict {
+	//      flg |= 1
+	//  }
 	buf[4] = flg
 	buf[4] = flg
 	buf[5] = bSize << 4
 	buf[5] = bSize << 4
 
 
@@ -77,10 +82,10 @@ func (z *Writer) writeHeader() error {
 		binary.LittleEndian.PutUint64(buf[n:], z.Header.Size)
 		binary.LittleEndian.PutUint64(buf[n:], z.Header.Size)
 		n += 8
 		n += 8
 	}
 	}
-	// 	if z.Header.Dict {
-	// 		binary.LittleEndian.PutUint32(buf[n:], z.Header.DictID)
-	// 		n += 4
-	// 	}
+	//  if z.Header.Dict {
+	//      binary.LittleEndian.PutUint32(buf[n:], z.Header.DictID)
+	//      n += 4
+	//  }
 
 
 	// header checksum includes the flags, block max size and optional Size and DictID
 	// header checksum includes the flags, block max size and optional Size and DictID
 	z.checksum.Write(buf[4:n])
 	z.checksum.Write(buf[4:n])
@@ -93,6 +98,9 @@ func (z *Writer) writeHeader() error {
 	}
 	}
 	z.Header.done = true
 	z.Header.done = true
 
 
+	// initialize buffers dependent on header info
+	z.zbCompressBuf = make([]byte, winSize+z.BlockMaxSize)
+
 	return nil
 	return nil
 }
 }
 
 
@@ -116,11 +124,7 @@ func (z *Writer) Write(buf []byte) (n int, err error) {
 	}
 	}
 
 
 	if !z.NoChecksum {
 	if !z.NoChecksum {
-		z.wg.Add(1)
-		go func(b []byte) {
-			z.checksum.Write(b)
-			z.wg.Done()
-		}(buf)
+		z.checksum.Write(buf)
 	}
 	}
 
 
 	// with block dependency, require at least 64Kb of data to work with
 	// with block dependency, require at least 64Kb of data to work with
@@ -130,7 +134,6 @@ func (z *Writer) Write(buf []byte) (n int, err error) {
 		bl = len(z.data)
 		bl = len(z.data)
 		z.data = append(z.data, buf...)
 		z.data = append(z.data, buf...)
 		if len(z.data) < winSize {
 		if len(z.data) < winSize {
-			z.wg.Wait()
 			return len(buf), nil
 			return len(buf), nil
 		}
 		}
 		buf = z.data
 		buf = z.data
@@ -139,11 +142,16 @@ func (z *Writer) Write(buf []byte) (n int, err error) {
 
 
 	// Break up the input buffer into BlockMaxSize blocks, provisioning the left over block.
 	// Break up the input buffer into BlockMaxSize blocks, provisioning the left over block.
 	// Then compress into each of them concurrently if possible (no dependency).
 	// Then compress into each of them concurrently if possible (no dependency).
-	wbuf := buf
-	zn := len(wbuf) / z.BlockMaxSize
-	zblocks := make([]block, zn, zn+1)
-	for zi := 0; zi < zn; zi++ {
-		zb := &zblocks[zi]
+	var (
+		zb       block
+		wbuf     = buf
+		zn       = len(wbuf) / z.BlockMaxSize
+		zi       = 0
+		leftover = len(buf) % z.BlockMaxSize
+	)
+
+loop:
+	for zi < zn {
 		if z.BlockDependency {
 		if z.BlockDependency {
 			if zi == 0 {
 			if zi == 0 {
 				// first block does not have the window
 				// first block does not have the window
@@ -161,14 +169,12 @@ func (z *Writer) Write(buf []byte) (n int, err error) {
 			wbuf = wbuf[z.BlockMaxSize:]
 			wbuf = wbuf[z.BlockMaxSize:]
 		}
 		}
 
 
-		z.wg.Add(1)
-		go z.compressBlock(zb)
+		goto write
 	}
 	}
 
 
 	// left over
 	// left over
-	if len(buf)%z.BlockMaxSize > 0 {
-		zblocks = append(zblocks, block{data: wbuf})
-		zb := &zblocks[zn]
+	if leftover > 0 {
+		zb = block{data: wbuf}
 		if z.BlockDependency {
 		if z.BlockDependency {
 			if zn == 0 {
 			if zn == 0 {
 				zb.data = append(z.window, zb.data...)
 				zb.data = append(z.window, zb.data...)
@@ -177,38 +183,9 @@ func (z *Writer) Write(buf []byte) (n int, err error) {
 				zb.offset = winSize
 				zb.offset = winSize
 			}
 			}
 		}
 		}
-		z.wg.Add(1)
-		go z.compressBlock(zb)
-	}
-	z.wg.Wait()
-
-	// outputs the compressed data
-	for zi, zb := range zblocks {
-		_, err = z.writeBlock(&zb)
-
-		written := len(zb.data)
-		if bl > 0 {
-			if written >= bl {
-				written -= bl
-				bl = 0
-			} else {
-				bl -= written
-				written = 0
-			}
-		}
 
 
-		n += written
-		// remove the window in zb.data
-		if z.BlockDependency {
-			if zi == 0 {
-				n -= len(z.window)
-			} else {
-				n -= winSize
-			}
-		}
-		if err != nil {
-			return
-		}
+		leftover = 0
+		goto write
 	}
 	}
 
 
 	if z.BlockDependency {
 	if z.BlockDependency {
@@ -225,20 +202,54 @@ func (z *Writer) Write(buf []byte) (n int, err error) {
 	}
 	}
 
 
 	return
 	return
+
+write:
+	zb = z.compressBlock(zb)
+	_, err = z.writeBlock(zb)
+
+	written := len(zb.data)
+	if bl > 0 {
+		if written >= bl {
+			written -= bl
+			bl = 0
+		} else {
+			bl -= written
+			written = 0
+		}
+	}
+
+	n += written
+	// remove the window in zb.data
+	if z.BlockDependency {
+		if zi == 0 {
+			n -= len(z.window)
+		} else {
+			n -= winSize
+		}
+	}
+	if err != nil {
+		return
+	}
+	zi++
+	goto loop
 }
 }
 
 
 // compressBlock compresses a block.
 // compressBlock compresses a block.
-func (z *Writer) compressBlock(zb *block) {
+func (z *Writer) compressBlock(zb block) block {
 	// compressed block size cannot exceed the input's
 	// compressed block size cannot exceed the input's
-	zbuf := make([]byte, len(zb.data)-zb.offset)
 	var (
 	var (
-		n   int
-		err error
+		n    int
+		err  error
+		zbuf = z.zbCompressBuf
 	)
 	)
 	if z.HighCompression {
 	if z.HighCompression {
 		n, err = CompressBlockHC(zb.data, zbuf, zb.offset)
 		n, err = CompressBlockHC(zb.data, zbuf, zb.offset)
 	} else {
 	} else {
-		n, err = CompressBlock(zb.data, zbuf, zb.offset)
+		n, err = compressGenerationalBlock(zb.data, zbuf, zb.offset, z.currentGeneration, z.hashTable)
+		z.currentGeneration++
+		if z.currentGeneration == 0 { // wrapped around, reset table
+			z.hashTable = make([]hashEntry, hashTableSize)
+		}
 	}
 	}
 
 
 	// compressible and compressed size smaller than decompressed: ok!
 	// compressible and compressed size smaller than decompressed: ok!
@@ -256,21 +267,23 @@ func (z *Writer) compressBlock(zb *block) {
 		hashPool.Put(xxh)
 		hashPool.Put(xxh)
 	}
 	}
 
 
-	z.wg.Done()
+	return zb
 }
 }
 
 
 // writeBlock writes a frame block to the underlying io.Writer (size, data).
 // writeBlock writes a frame block to the underlying io.Writer (size, data).
-func (z *Writer) writeBlock(zb *block) (int, error) {
+func (z *Writer) writeBlock(zb block) (int, error) {
 	bLen := uint32(len(zb.zdata))
 	bLen := uint32(len(zb.zdata))
 	if !zb.compressed {
 	if !zb.compressed {
 		bLen |= 1 << 31
 		bLen |= 1 << 31
 	}
 	}
 
 
 	n := 0
 	n := 0
-	if err := binary.Write(z.dst, binary.LittleEndian, bLen); err != nil {
+
+	binary.LittleEndian.PutUint32(z.writeSizeBuf, bLen)
+	n, err := z.dst.Write(z.writeSizeBuf)
+	if err != nil {
 		return n, err
 		return n, err
 	}
 	}
-	n += 4
 
 
 	m, err := z.dst.Write(zb.zdata)
 	m, err := z.dst.Write(zb.zdata)
 	n += m
 	n += m
@@ -279,10 +292,13 @@ func (z *Writer) writeBlock(zb *block) (int, error) {
 	}
 	}
 
 
 	if z.BlockChecksum {
 	if z.BlockChecksum {
-		if err := binary.Write(z.dst, binary.LittleEndian, zb.checksum); err != nil {
+		binary.LittleEndian.PutUint32(z.writeSizeBuf, zb.checksum)
+		m, err := z.dst.Write(z.writeSizeBuf)
+		n += m
+
+		if err != nil {
 			return n, err
 			return n, err
 		}
 		}
-		n += 4
 	}
 	}
 
 
 	return n, nil
 	return n, nil
@@ -298,10 +314,9 @@ func (z *Writer) Flush() error {
 	if len(z.data) == 0 {
 	if len(z.data) == 0 {
 		return nil
 		return nil
 	}
 	}
-	zb := block{data: z.data}
-	z.wg.Add(1)
-	z.compressBlock(&zb)
-	if _, err := z.writeBlock(&zb); err != nil {
+
+	zb := z.compressBlock(block{data: z.data})
+	if _, err := z.writeBlock(zb); err != nil {
 		return err
 		return err
 	}
 	}
 	return nil
 	return nil
@@ -318,9 +333,7 @@ func (z *Writer) Close() error {
 	// buffered data for the block dependency window
 	// buffered data for the block dependency window
 	if z.BlockDependency && len(z.data) > 0 {
 	if z.BlockDependency && len(z.data) > 0 {
 		zb := block{data: z.data}
 		zb := block{data: z.data}
-		z.wg.Add(1)
-		z.compressBlock(&zb)
-		if _, err := z.writeBlock(&zb); err != nil {
+		if _, err := z.writeBlock(z.compressBlock(zb)); err != nil {
 			return err
 			return err
 		}
 		}
 	}
 	}