Browse Source

Merge branch 'writer-concurrency' into v3

# Conflicts:
#	writer.go
Pierre Curto 6 years ago
parent
commit
e6faec6f17
8 changed files with 367 additions and 86 deletions
  1. 27 0
      bench_test.go
  2. 21 15
      block.go
  3. 3 0
      cmd/lz4c/compress.go
  4. 2 0
      errors.go
  5. 43 6
      lz4.go
  6. 30 2
      reader.go
  7. 59 0
      reader_test.go
  8. 182 63
      writer.go

+ 27 - 0
bench_test.go

@@ -101,6 +101,33 @@ func BenchmarkUncompressDigits(b *testing.B) { benchmarkUncompress(b, digitsLZ4)
 func BenchmarkUncompressTwain(b *testing.B)  { benchmarkUncompress(b, twainLZ4) }
 func BenchmarkUncompressTwain(b *testing.B)  { benchmarkUncompress(b, twainLZ4) }
 func BenchmarkUncompressRand(b *testing.B)   { benchmarkUncompress(b, randomLZ4) }
 func BenchmarkUncompressRand(b *testing.B)   { benchmarkUncompress(b, randomLZ4) }
 
 
+func benchmarkSkipBytes(b *testing.B, compressed []byte) {
+	r := bytes.NewReader(compressed)
+	zr := lz4.NewReader(r)
+
+	// Determine the uncompressed size of testfile.
+	uncompressedSize, err := io.Copy(ioutil.Discard, zr)
+	if err != nil {
+		b.Fatal(err)
+	}
+
+	b.SetBytes(uncompressedSize)
+	b.ReportAllocs()
+	b.ResetTimer()
+
+	for i := 0; i < b.N; i++ {
+		r.Reset(compressed)
+		zr.Reset(r)
+		zr.Seek(uncompressedSize, io.SeekCurrent)
+		_, _ = io.Copy(ioutil.Discard, zr)
+	}
+}
+
+func BenchmarkSkipBytesPg1661(b *testing.B) { benchmarkSkipBytes(b, pg1661LZ4) }
+func BenchmarkSkipBytesDigits(b *testing.B) { benchmarkSkipBytes(b, digitsLZ4) }
+func BenchmarkSkipBytesTwain(b *testing.B)  { benchmarkSkipBytes(b, twainLZ4) }
+func BenchmarkSkipBytesRand(b *testing.B)   { benchmarkSkipBytes(b, randomLZ4) }
+
 func benchmarkCompress(b *testing.B, uncompressed []byte) {
 func benchmarkCompress(b *testing.B, uncompressed []byte) {
 	w := bytes.NewBuffer(nil)
 	w := bytes.NewBuffer(nil)
 	zw := lz4.NewWriter(w)
 	zw := lz4.NewWriter(w)

+ 21 - 15
block.go

@@ -40,19 +40,22 @@ func UncompressBlock(src, dst []byte) (int, error) {
 // The size of the compressed data is returned. If it is 0 and no error, then the data is incompressible.
 // The size of the compressed data is returned. If it is 0 and no error, then the data is incompressible.
 //
 //
 // An error is returned if the destination buffer is too small.
 // An error is returned if the destination buffer is too small.
-func CompressBlock(src, dst []byte, hashTable []int) (di int, err error) {
+func CompressBlock(src, dst []byte, hashTable []int) (_ int, err error) {
+	if len(hashTable) < htSize {
+		return 0, fmt.Errorf("hash table too small, should be at least %d in size", htSize)
+	}
 	defer recoverBlock(&err)
 	defer recoverBlock(&err)
+	return compressBlock(src, dst, hashTable), nil
+}
 
 
+func compressBlock(src, dst []byte, hashTable []int) int {
 	// adaptSkipLog sets how quickly the compressor begins skipping blocks when data is incompressible.
 	// adaptSkipLog sets how quickly the compressor begins skipping blocks when data is incompressible.
 	// This significantly speeds up incompressible data and usually has very small impact on compresssion.
 	// This significantly speeds up incompressible data and usually has very small impact on compresssion.
 	// bytes to skip =  1 + (bytes since last match >> adaptSkipLog)
 	// bytes to skip =  1 + (bytes since last match >> adaptSkipLog)
 	const adaptSkipLog = 7
 	const adaptSkipLog = 7
 	sn, dn := len(src)-mfLimit, len(dst)
 	sn, dn := len(src)-mfLimit, len(dst)
 	if sn <= 0 || dn == 0 {
 	if sn <= 0 || dn == 0 {
-		return 0, nil
-	}
-	if len(hashTable) < htSize {
-		return 0, fmt.Errorf("hash table too small, should be at least %d in size", htSize)
+		return 0
 	}
 	}
 	// Prove to the compiler the table has at least htSize elements.
 	// Prove to the compiler the table has at least htSize elements.
 	// The compiler can see that "uint32() >> hashShift" cannot be out of bounds.
 	// The compiler can see that "uint32() >> hashShift" cannot be out of bounds.
@@ -60,7 +63,7 @@ func CompressBlock(src, dst []byte, hashTable []int) (di int, err error) {
 
 
 	// si: Current position of the search.
 	// si: Current position of the search.
 	// anchor: Position of the current literals.
 	// anchor: Position of the current literals.
-	var si, anchor int
+	var si, di, anchor int
 
 
 	// Fast scan strategy: the hash table only stores the last 4 bytes sequences.
 	// Fast scan strategy: the hash table only stores the last 4 bytes sequences.
 	for si < sn {
 	for si < sn {
@@ -186,7 +189,7 @@ func CompressBlock(src, dst []byte, hashTable []int) (di int, err error) {
 
 
 	if anchor == 0 {
 	if anchor == 0 {
 		// Incompressible.
 		// Incompressible.
-		return 0, nil
+		return 0
 	}
 	}
 
 
 	// Last literals.
 	// Last literals.
@@ -207,10 +210,10 @@ func CompressBlock(src, dst []byte, hashTable []int) (di int, err error) {
 	// Write the last literals.
 	// Write the last literals.
 	if di >= anchor {
 	if di >= anchor {
 		// Incompressible.
 		// Incompressible.
-		return 0, nil
+		return 0
 	}
 	}
 	di += copy(dst[di:di+len(src)-anchor], src[anchor:])
 	di += copy(dst[di:di+len(src)-anchor], src[anchor:])
-	return di, nil
+	return di
 }
 }
 
 
 // blockHash hashes 4 bytes into a value < winSize.
 // blockHash hashes 4 bytes into a value < winSize.
@@ -227,9 +230,12 @@ func blockHashHC(x uint32) uint32 {
 // The size of the compressed data is returned. If it is 0 and no error, then the data is not compressible.
 // The size of the compressed data is returned. If it is 0 and no error, then the data is not compressible.
 //
 //
 // An error is returned if the destination buffer is too small.
 // An error is returned if the destination buffer is too small.
-func CompressBlockHC(src, dst []byte, depth int) (di int, err error) {
+func CompressBlockHC(src, dst []byte, depth int) (_ int, err error) {
 	defer recoverBlock(&err)
 	defer recoverBlock(&err)
+	return compressBlockHC(src, dst, depth), nil
+}
 
 
+func compressBlockHC(src, dst []byte, depth int) int {
 	// adaptSkipLog sets how quickly the compressor begins skipping blocks when data is incompressible.
 	// adaptSkipLog sets how quickly the compressor begins skipping blocks when data is incompressible.
 	// This significantly speeds up incompressible data and usually has very small impact on compresssion.
 	// This significantly speeds up incompressible data and usually has very small impact on compresssion.
 	// bytes to skip =  1 + (bytes since last match >> adaptSkipLog)
 	// bytes to skip =  1 + (bytes since last match >> adaptSkipLog)
@@ -237,9 +243,9 @@ func CompressBlockHC(src, dst []byte, depth int) (di int, err error) {
 
 
 	sn, dn := len(src)-mfLimit, len(dst)
 	sn, dn := len(src)-mfLimit, len(dst)
 	if sn <= 0 || dn == 0 {
 	if sn <= 0 || dn == 0 {
-		return 0, nil
+		return 0
 	}
 	}
-	var si int
+	var si, di int
 
 
 	// hashTable: stores the last position found for a given hash
 	// hashTable: stores the last position found for a given hash
 	// chainTable: stores previous positions for a given hash
 	// chainTable: stores previous positions for a given hash
@@ -358,7 +364,7 @@ func CompressBlockHC(src, dst []byte, depth int) (di int, err error) {
 
 
 	if anchor == 0 {
 	if anchor == 0 {
 		// Incompressible.
 		// Incompressible.
-		return 0, nil
+		return 0
 	}
 	}
 
 
 	// Last literals.
 	// Last literals.
@@ -380,8 +386,8 @@ func CompressBlockHC(src, dst []byte, depth int) (di int, err error) {
 	// Write the last literals.
 	// Write the last literals.
 	if di >= anchor {
 	if di >= anchor {
 		// Incompressible.
 		// Incompressible.
-		return 0, nil
+		return 0
 	}
 	}
 	di += copy(dst[di:di+len(src)-anchor], src[anchor:])
 	di += copy(dst[di:di+len(src)-anchor], src[anchor:])
-	return di, nil
+	return di
 }
 }

+ 3 - 0
cmd/lz4c/compress.go

@@ -23,6 +23,8 @@ func Compress(fs *flag.FlagSet) cmdflag.Handler {
 	fs.BoolVar(&streamChecksum, "sc", false, "disable stream checksum")
 	fs.BoolVar(&streamChecksum, "sc", false, "disable stream checksum")
 	var level int
 	var level int
 	fs.IntVar(&level, "l", 0, "compression level (0=fastest)")
 	fs.IntVar(&level, "l", 0, "compression level (0=fastest)")
+	var concurrency int
+	fs.IntVar(&concurrency, "c", -1, "concurrency (default=all CPUs")
 
 
 	return func(args ...string) (int, error) {
 	return func(args ...string) (int, error) {
 		sz, err := bytefmt.ToBytes(blockMaxSize)
 		sz, err := bytefmt.ToBytes(blockMaxSize)
@@ -37,6 +39,7 @@ func Compress(fs *flag.FlagSet) cmdflag.Handler {
 			NoChecksum:       streamChecksum,
 			NoChecksum:       streamChecksum,
 			CompressionLevel: level,
 			CompressionLevel: level,
 		}
 		}
+		zw.WithConcurrency(concurrency)
 
 
 		// Use stdin/stdout if no file provided.
 		// Use stdin/stdout if no file provided.
 		if len(args) == 0 {
 		if len(args) == 0 {

+ 2 - 0
errors.go

@@ -15,6 +15,8 @@ var (
 	ErrInvalid = errors.New("lz4: bad magic number")
 	ErrInvalid = errors.New("lz4: bad magic number")
 	// ErrBlockDependency is returned when attempting to decompress an archive created with block dependency.
 	// ErrBlockDependency is returned when attempting to decompress an archive created with block dependency.
 	ErrBlockDependency = errors.New("lz4: block dependency not supported")
 	ErrBlockDependency = errors.New("lz4: block dependency not supported")
+	// ErrUnsupportedSeek is returned when attempting to Seek any way but forward from the current position.
+	ErrUnsupportedSeek = errors.New("lz4: can only seek forward from io.SeekCurrent")
 )
 )
 
 
 func recoverBlock(e *error) {
 func recoverBlock(e *error) {

+ 43 - 6
lz4.go

@@ -10,6 +10,10 @@
 //
 //
 package lz4
 package lz4
 
 
+import "math/bits"
+
+import "sync"
+
 const (
 const (
 	// Extension is the LZ4 frame file name extension
 	// Extension is the LZ4 frame file name extension
 	Extension = ".lz4"
 	Extension = ".lz4"
@@ -39,17 +43,50 @@ const (
 
 
 // map the block max size id with its value in bytes: 64Kb, 256Kb, 1Mb and 4Mb.
 // map the block max size id with its value in bytes: 64Kb, 256Kb, 1Mb and 4Mb.
 const (
 const (
-	blockSize64K  = 64 << 10
-	blockSize256K = 256 << 10
-	blockSize1M   = 1 << 20
-	blockSize4M   = 4 << 20
+	blockSize64K = 1 << (16 + 2*iota)
+	blockSize256K
+	blockSize1M
+	blockSize4M
 )
 )
 
 
 var (
 var (
-	bsMapID    = map[byte]int{4: blockSize64K, 5: blockSize256K, 6: blockSize1M, 7: blockSize4M}
-	bsMapValue = map[int]byte{blockSize64K: 4, blockSize256K: 5, blockSize1M: 6, blockSize4M: 7}
+	// Keep a pool of buffers for each valid block sizes.
+	bsMapValue = [...]*sync.Pool{
+		newBufferPool(2 * blockSize64K),
+		newBufferPool(2 * blockSize256K),
+		newBufferPool(2 * blockSize1M),
+		newBufferPool(2 * blockSize4M),
+	}
 )
 )
 
 
+// newBufferPool returns a pool for buffers of the given size.
+func newBufferPool(size int) *sync.Pool {
+	return &sync.Pool{
+		New: func() interface{} {
+			return make([]byte, size)
+		},
+	}
+}
+
+// putBuffer returns a buffer to its pool.
+func putBuffer(size int, buf []byte) {
+	if cap(buf) > 0 {
+		idx := blockSizeValueToIndex(size) - 4
+		bsMapValue[idx].Put(buf[:cap(buf)])
+	}
+}
+func blockSizeIndexToValue(i byte) int {
+	return 1 << (16 + 2*uint(i))
+}
+func isValidBlockSize(size int) bool {
+	const blockSizeMask = blockSize64K | blockSize256K | blockSize1M | blockSize4M
+
+	return size&blockSizeMask > 0 && bits.OnesCount(uint(size)) == 1
+}
+func blockSizeValueToIndex(size int) byte {
+	return 4 + byte(bits.TrailingZeros(uint(size)>>16)/2)
+}
+
 // Header describes the various flags that can be set on a Writer or obtained from a Reader.
 // Header describes the various flags that can be set on a Writer or obtained from a Reader.
 // The default values match those of the LZ4 frame format definition
 // The default values match those of the LZ4 frame format definition
 // (http://fastcompression.blogspot.com/2013/04/lz4-streaming-format-final.html).
 // (http://fastcompression.blogspot.com/2013/04/lz4-streaming-format-final.html).

+ 30 - 2
reader.go

@@ -25,6 +25,8 @@ type Reader struct {
 	data     []byte        // Uncompressed data.
 	data     []byte        // Uncompressed data.
 	idx      int           // Index of unread bytes into data.
 	idx      int           // Index of unread bytes into data.
 	checksum xxh32.XXHZero // Frame hash.
 	checksum xxh32.XXHZero // Frame hash.
+	skip     int64         // Bytes to skip before next read.
+	dpos     int64         // Position in dest
 }
 }
 
 
 // NewReader returns a new LZ4 frame decoder.
 // NewReader returns a new LZ4 frame decoder.
@@ -86,10 +88,10 @@ func (z *Reader) readHeader(first bool) error {
 	z.NoChecksum = b>>2&1 == 0
 	z.NoChecksum = b>>2&1 == 0
 
 
 	bmsID := buf[1] >> 4 & 0x7
 	bmsID := buf[1] >> 4 & 0x7
-	bSize, ok := bsMapID[bmsID]
-	if !ok {
+	if bmsID < 4 || bmsID > 7 {
 		return fmt.Errorf("lz4: invalid block max size ID: %d", bmsID)
 		return fmt.Errorf("lz4: invalid block max size ID: %d", bmsID)
 	}
 	}
+	bSize := blockSizeIndexToValue(bmsID - 4)
 	z.BlockMaxSize = bSize
 	z.BlockMaxSize = bSize
 
 
 	// Allocate the compressed/uncompressed buffers.
 	// Allocate the compressed/uncompressed buffers.
@@ -275,8 +277,20 @@ func (z *Reader) Read(buf []byte) (int, error) {
 		z.idx = 0
 		z.idx = 0
 	}
 	}
 
 
+	if z.skip > int64(len(z.data[z.idx:])) {
+		z.skip -= int64(len(z.data[z.idx:]))
+		z.dpos += int64(len(z.data[z.idx:]))
+		z.idx = len(z.data)
+		return 0, nil
+	}
+
+	z.idx += int(z.skip)
+	z.dpos += z.skip
+	z.skip = 0
+
 	n := copy(buf, z.data[z.idx:])
 	n := copy(buf, z.data[z.idx:])
 	z.idx += n
 	z.idx += n
+	z.dpos += int64(n)
 	if debugFlag {
 	if debugFlag {
 		debug("copied %d bytes to input", n)
 		debug("copied %d bytes to input", n)
 	}
 	}
@@ -284,6 +298,20 @@ func (z *Reader) Read(buf []byte) (int, error) {
 	return n, nil
 	return n, nil
 }
 }
 
 
+// Seek implements io.Seeker, but supports seeking forward from the current
+// position only. Any other seek will return an error. Allows skipping output
+// bytes which aren't needed, which in some scenarios is faster than reading
+// and discarding them.
+// Note this may cause future calls to Read() to read 0 bytes if all of the
+// data they would have returned is skipped.
+func (z *Reader) Seek(offset int64, whence int) (int64, error) {
+	if offset < 0 || whence != io.SeekCurrent {
+		return z.dpos + z.skip, ErrUnsupportedSeek
+	}
+	z.skip += offset
+	return z.dpos + z.skip, nil
+}
+
 // Reset discards the Reader's state and makes it equivalent to the
 // Reset discards the Reader's state and makes it equivalent to the
 // result of its original state from NewReader, but reading from r instead.
 // result of its original state from NewReader, but reading from r instead.
 // This permits reusing a Reader rather than allocating a new one.
 // This permits reusing a Reader rather than allocating a new one.

+ 59 - 0
reader_test.go

@@ -56,6 +56,65 @@ func TestReader(t *testing.T) {
 			if got, want := out.Bytes(), raw; !reflect.DeepEqual(got, want) {
 			if got, want := out.Bytes(), raw; !reflect.DeepEqual(got, want) {
 				t.Fatal("uncompressed data does not match original")
 				t.Fatal("uncompressed data does not match original")
 			}
 			}
+
+			if len(raw) < 20 {
+				return
+			}
+
+			f2, err := os.Open(fname)
+			if err != nil {
+				t.Fatal(err)
+			}
+			defer f2.Close()
+
+			out.Reset()
+			zr = lz4.NewReader(f2)
+			_, err = io.CopyN(&out, zr, 10)
+			if err != nil {
+				t.Fatal(err)
+			}
+			if !reflect.DeepEqual(out.Bytes(), raw[:10]) {
+				t.Fatal("partial read does not match original")
+			}
+
+			pos, err := zr.Seek(-1, io.SeekCurrent)
+			if err == nil {
+				t.Fatal("expected error from invalid seek")
+			}
+			if pos != 10 {
+				t.Fatalf("unexpected position %d", pos)
+			}
+			pos, err = zr.Seek(1, io.SeekStart)
+			if err == nil {
+				t.Fatal("expected error from invalid seek")
+			}
+			if pos != 10 {
+				t.Fatalf("unexpected position %d", pos)
+			}
+			pos, err = zr.Seek(-1, io.SeekEnd)
+			if err == nil {
+				t.Fatal("expected error from invalid seek")
+			}
+			if pos != 10 {
+				t.Fatalf("unexpected position %d", pos)
+			}
+
+			pos, err = zr.Seek(int64(len(raw)-20), io.SeekCurrent)
+			if err != nil {
+				t.Fatal(err)
+			}
+			if pos != int64(len(raw)-10) {
+				t.Fatalf("unexpected position %d", pos)
+			}
+
+			out.Reset()
+			_, err = io.CopyN(&out, zr, 10)
+			if err != nil {
+				t.Fatal(err)
+			}
+			if !reflect.DeepEqual(out.Bytes(), raw[len(raw)-10:]) {
+				t.Fatal("after seek, partial read does not match original")
+			}
 		})
 		})
 	}
 	}
 }
 }

+ 182 - 63
writer.go

@@ -3,11 +3,18 @@ package lz4
 import (
 import (
 	"encoding/binary"
 	"encoding/binary"
 	"fmt"
 	"fmt"
+	"github.com/pierrec/lz4/internal/xxh32"
 	"io"
 	"io"
-
-	"github.com/pierrec/lz4/v3/internal/xxh32"
+	"runtime"
 )
 )
 
 
+// zResult contains the results of compressing a block.
+type zResult struct {
+	size     uint32 // Block header
+	data     []byte // Compressed data
+	checksum uint32 // Data checksum
+}
+
 // Writer implements the LZ4 frame encoder.
 // Writer implements the LZ4 frame encoder.
 type Writer struct {
 type Writer struct {
 	Header
 	Header
@@ -18,10 +25,13 @@ type Writer struct {
 	buf       [19]byte      // magic number(4) + header(flags(2)+[Size(8)+DictID(4)]+checksum(1)) does not exceed 19 bytes
 	buf       [19]byte      // magic number(4) + header(flags(2)+[Size(8)+DictID(4)]+checksum(1)) does not exceed 19 bytes
 	dst       io.Writer     // Destination.
 	dst       io.Writer     // Destination.
 	checksum  xxh32.XXHZero // Frame checksum.
 	checksum  xxh32.XXHZero // Frame checksum.
-	zdata     []byte        // Compressed data.
-	data      []byte        // Data to be compressed.
+	data      []byte        // Data to be compressed + buffer for compressed data.
 	idx       int           // Index into data.
 	idx       int           // Index into data.
 	hashtable [winSize]int  // Hash table used in CompressBlock().
 	hashtable [winSize]int  // Hash table used in CompressBlock().
+
+	// For concurrency.
+	c   chan chan zResult // Channel for block compression goroutines and writer goroutine.
+	err error             // Any error encountered while writing to the underlying destination.
 }
 }
 
 
 // NewWriter returns a new LZ4 frame encoder.
 // NewWriter returns a new LZ4 frame encoder.
@@ -29,30 +39,86 @@ type Writer struct {
 // The supplied Header is checked at the first Write.
 // The supplied Header is checked at the first Write.
 // It is ok to change it before the first Write but then not until a Reset() is performed.
 // It is ok to change it before the first Write but then not until a Reset() is performed.
 func NewWriter(dst io.Writer) *Writer {
 func NewWriter(dst io.Writer) *Writer {
-	return &Writer{dst: dst}
+	z := new(Writer)
+	z.Reset(dst)
+	return z
+}
+
+// WithConcurrency sets the number of concurrent go routines used for compression.
+// A negative value sets the concurrency to GOMAXPROCS.
+func (z *Writer) WithConcurrency(n int) *Writer {
+	switch {
+	case n == 0 || n == 1:
+		z.c = nil
+		return z
+	case n < 0:
+		n = runtime.GOMAXPROCS(0)
+	}
+	z.c = make(chan chan zResult, n)
+	// Writer goroutine managing concurrent block compression goroutines.
+	go func() {
+		// Process next block compression item.
+		for c := range z.c {
+			// Read the next compressed block result.
+			// Waiting here ensures that the blocks are output in the order they were sent.
+			res := <-c
+			n := len(res.data)
+			if n == 0 {
+				// Notify the block compression routine that we are done with its result.
+				// This is used when a sentinel block is sent to terminate the compression.
+				close(c)
+				return
+			}
+			// Write the block.
+			if err := z.writeUint32(res.size); err != nil && z.err == nil {
+				z.err = err
+			}
+			if _, err := z.dst.Write(res.data); err != nil && z.err == nil {
+				z.err = err
+			}
+			if z.BlockChecksum {
+				if err := z.writeUint32(res.checksum); err != nil && z.err == nil {
+					z.err = err
+				}
+			}
+			if h := z.OnBlockDone; h != nil {
+				h(n)
+			}
+		}
+	}()
+	return z
+}
+
+// newBuffers instantiates new buffers which size matches the one in Header.
+// The returned buffers are for decompression and compression respectively.
+func (z *Writer) newBuffers() {
+	bSize := z.Header.BlockMaxSize
+	idx := blockSizeValueToIndex(bSize) - 4
+	buf := bsMapValue[idx].Get().([]byte)
+	z.data = buf[:bSize] // Uncompressed buffer is the first half.
+}
+
+// freeBuffers puts the writer's buffers back to the pool.
+func (z *Writer) freeBuffers() {
+	// Put the buffer back into the pool, if any.
+	putBuffer(z.Header.BlockMaxSize, z.data)
+	z.data = nil
 }
 }
 
 
 // writeHeader builds and writes the header (magic+header) to the underlying io.Writer.
 // writeHeader builds and writes the header (magic+header) to the underlying io.Writer.
 func (z *Writer) writeHeader() error {
 func (z *Writer) writeHeader() error {
 	// Default to 4Mb if BlockMaxSize is not set.
 	// Default to 4Mb if BlockMaxSize is not set.
 	if z.Header.BlockMaxSize == 0 {
 	if z.Header.BlockMaxSize == 0 {
-		z.Header.BlockMaxSize = bsMapID[7]
+		z.Header.BlockMaxSize = blockSize4M
 	}
 	}
 	// The only option that needs to be validated.
 	// The only option that needs to be validated.
 	bSize := z.Header.BlockMaxSize
 	bSize := z.Header.BlockMaxSize
-	bSizeID, ok := bsMapValue[bSize]
-	if !ok {
+	if !isValidBlockSize(z.Header.BlockMaxSize) {
 		return fmt.Errorf("lz4: invalid block max size: %d", bSize)
 		return fmt.Errorf("lz4: invalid block max size: %d", bSize)
 	}
 	}
 	// Allocate the compressed/uncompressed buffers.
 	// Allocate the compressed/uncompressed buffers.
 	// The compressed buffer cannot exceed the uncompressed one.
 	// The compressed buffer cannot exceed the uncompressed one.
-	if cap(z.zdata) < bSize {
-		// Only allocate if there is not enough capacity.
-		// Allocate both buffers at once.
-		z.zdata = make([]byte, 2*bSize)
-	}
-	z.data = z.zdata[:bSize]                 // Uncompressed buffer is the first half.
-	z.zdata = z.zdata[:cap(z.zdata)][bSize:] // Compressed buffer is the second half.
+	z.newBuffers()
 	z.idx = 0
 	z.idx = 0
 
 
 	// Size is optional.
 	// Size is optional.
@@ -72,7 +138,7 @@ func (z *Writer) writeHeader() error {
 		flg |= 1 << 2
 		flg |= 1 << 2
 	}
 	}
 	buf[4] = flg
 	buf[4] = flg
-	buf[5] = bSizeID << 4
+	buf[5] = blockSizeValueToIndex(z.Header.BlockMaxSize) << 4
 
 
 	// Current buffer size: magic(4) + flags(1) + block max size (1).
 	// Current buffer size: magic(4) + flags(1) + block max size (1).
 	n := 6
 	n := 6
@@ -155,58 +221,90 @@ func (z *Writer) compressBlock(data []byte) error {
 		z.checksum.Write(data)
 		z.checksum.Write(data)
 	}
 	}
 
 
-	// The compressed block size cannot exceed the input's.
-	var zn int
-	var err error
+	zdata := z.data[z.Header.BlockMaxSize:cap(z.data)]
+	if z.c == nil {
+		// The compressed block size cannot exceed the input's.
+		var zn int
 
 
-	if level := z.Header.CompressionLevel; level != 0 {
-		zn, err = CompressBlockHC(data, z.zdata, level)
-	} else {
-		zn, err = CompressBlock(data, z.zdata, z.hashtable[:])
-	}
+		if level := z.Header.CompressionLevel; level != 0 {
+			zn = compressBlockHC(data, zdata, level)
+		} else {
+			zn = compressBlock(data, zdata, z.hashtable[:])
+		}
 
 
-	var zdata []byte
-	var bLen uint32
-	if debugFlag {
-		debug("block compression %d => %d", len(data), zn)
-	}
-	if err == nil && zn > 0 && zn < len(data) {
-		// Compressible and compressed size smaller than uncompressed: ok!
-		bLen = uint32(zn)
-		zdata = z.zdata[:zn]
-	} else {
-		// Uncompressed block.
-		bLen = uint32(len(data)) | compressedBlockFlag
-		zdata = data
-	}
-	if debugFlag {
-		debug("block compression to be written len=%d data len=%d", bLen, len(zdata))
-	}
+		var bLen uint32
+		if debugFlag {
+			debug("block compression %d => %d", len(data), zn)
+		}
+		if zn > 0 && zn < len(data) {
+			// Compressible and compressed size smaller than uncompressed: ok!
+			bLen = uint32(zn)
+			zdata = zdata[:zn]
+		} else {
+			// Uncompressed block.
+			bLen = uint32(len(data)) | compressedBlockFlag
+			zdata = data
+		}
+		if debugFlag {
+			debug("block compression to be written len=%d data len=%d", bLen, len(zdata))
+		}
 
 
-	// Write the block.
-	if err := z.writeUint32(bLen); err != nil {
-		return err
-	}
-	written, err := z.dst.Write(zdata)
-	if err != nil {
-		return err
-	}
-	if h := z.OnBlockDone; h != nil {
-		h(written)
-	}
+		// Write the block.
+		if err := z.writeUint32(bLen); err != nil {
+			return err
+		}
+		written, err := z.dst.Write(zdata)
+		if err != nil {
+			return err
+		}
+		if h := z.OnBlockDone; h != nil {
+			h(written)
+		}
 
 
-	if !z.BlockChecksum {
+		if !z.BlockChecksum {
+			if debugFlag {
+				debug("current frame checksum %x", z.checksum.Sum32())
+			}
+			return nil
+		}
+		checksum := xxh32.ChecksumZero(zdata)
 		if debugFlag {
 		if debugFlag {
-			debug("current frame checksum %x", z.checksum.Sum32())
+			debug("block checksum %x", checksum)
+			defer func() { debug("current frame checksum %x", z.checksum.Sum32()) }()
 		}
 		}
-		return nil
+		return z.writeUint32(checksum)
 	}
 	}
-	checksum := xxh32.ChecksumZero(zdata)
-	if debugFlag {
-		debug("block checksum %x", checksum)
-		defer func() { debug("current frame checksum %x", z.checksum.Sum32()) }()
-	}
-	return z.writeUint32(checksum)
+
+	odata := z.data
+	z.newBuffers()
+	c := make(chan zResult)
+	z.c <- c // Send now to guarantee order
+	go func(header Header) {
+		// The compressed block size cannot exceed the input's.
+		var zn int
+		if level := header.CompressionLevel; level != 0 {
+			zn = compressBlockHC(data, zdata, level)
+		} else {
+			var hashTable [winSize]int
+			zn = compressBlock(data, zdata, hashTable[:])
+		}
+		var res zResult
+		if zn > 0 && zn < len(data) {
+			// Compressible and compressed size smaller than uncompressed: ok!
+			res.size = uint32(zn)
+			res.data = zdata[:zn]
+		} else {
+			// Uncompressed block.
+			res.size = uint32(len(data)) | compressedBlockFlag
+			res.data = data
+		}
+		if header.BlockChecksum {
+			res.checksum = xxh32.ChecksumZero(res.data)
+		}
+		c <- res
+		putBuffer(header.BlockMaxSize, odata)
+	}(z.Header)
+	return nil
 }
 }
 
 
 // Flush flushes any pending compressed data to the underlying writer.
 // Flush flushes any pending compressed data to the underlying writer.
@@ -227,6 +325,21 @@ func (z *Writer) Flush() error {
 	return nil
 	return nil
 }
 }
 
 
+func (z *Writer) close() error {
+	if z.c == nil {
+		return nil
+	}
+	// Send a sentinel block (no data to compress) to terminate the writer main goroutine.
+	c := make(chan zResult)
+	z.c <- c
+	c <- zResult{}
+	// Wait for the main goroutine to complete.
+	<-c
+	// At this point the main goroutine has shut down or is about to return.
+	z.c = nil
+	return z.err
+}
+
 // Close closes the Writer, flushing any unwritten data to the underlying io.Writer, but does not close the underlying io.Writer.
 // Close closes the Writer, flushing any unwritten data to the underlying io.Writer, but does not close the underlying io.Writer.
 func (z *Writer) Close() error {
 func (z *Writer) Close() error {
 	if !z.Header.done {
 	if !z.Header.done {
@@ -237,6 +350,10 @@ func (z *Writer) Close() error {
 	if err := z.Flush(); err != nil {
 	if err := z.Flush(); err != nil {
 		return err
 		return err
 	}
 	}
+	if err := z.close(); err != nil {
+		return err
+	}
+	z.freeBuffers()
 
 
 	if debugFlag {
 	if debugFlag {
 		debug("writing last empty block")
 		debug("writing last empty block")
@@ -258,12 +375,14 @@ func (z *Writer) Close() error {
 // initial state from NewWriter, but instead writing to w.
 // initial state from NewWriter, but instead writing to w.
 // No access to the underlying io.Writer is performed.
 // No access to the underlying io.Writer is performed.
 func (z *Writer) Reset(w io.Writer) {
 func (z *Writer) Reset(w io.Writer) {
+	n := cap(z.c)
+	_ = z.close()
+	z.freeBuffers()
 	z.Header = Header{}
 	z.Header = Header{}
 	z.dst = w
 	z.dst = w
 	z.checksum.Reset()
 	z.checksum.Reset()
-	z.zdata = z.zdata[:0]
-	z.data = z.data[:0]
 	z.idx = 0
 	z.idx = 0
+	z.WithConcurrency(n)
 }
 }
 
 
 // writeUint32 writes a uint32 to the underlying writer.
 // writeUint32 writes a uint32 to the underlying writer.