Browse Source

Writer: initial concurrent version (no tests)

Pierre Curto 6 years ago
parent
commit
49654038d5
2 changed files with 173 additions and 58 deletions
  1. 10 1
      lz4.go
  2. 163 57
      writer.go

+ 10 - 1
lz4.go

@@ -48,7 +48,8 @@ const (
 )
 
 var (
-	bsMapID    = map[byte]int{4: blockSize64K, 5: blockSize256K, 6: blockSize1M, 7: blockSize4M}
+	bsMapID = map[byte]int{4: blockSize64K, 5: blockSize256K, 6: blockSize1M, 7: blockSize4M}
+	// Keep a pool of buffers for each valid block sizes.
 	bsMapValue = map[int]struct {
 		byte
 		*sync.Pool
@@ -60,6 +61,7 @@ var (
 	}
 )
 
+// newBufferPool returns a pool for buffers of the given size.
 func newBufferPool(size int) *sync.Pool {
 	return &sync.Pool{
 		New: func() interface{} {
@@ -68,6 +70,13 @@ func newBufferPool(size int) *sync.Pool {
 	}
 }
 
+// putBuffer returns a buffer to its pool.
+func putBuffer(size int, buf []byte) {
+	if cap(buf) > 0 {
+		bsMapValue[size].Pool.Put(buf[:cap(buf)])
+	}
+}
+
 // Header describes the various flags that can be set on a Writer or obtained from a Reader.
 // The default values match those of the LZ4 frame format definition
 // (http://fastcompression.blogspot.com/2013/04/lz4-streaming-format-final.html).

+ 163 - 57
writer.go

@@ -3,11 +3,18 @@ package lz4
 import (
 	"encoding/binary"
 	"fmt"
-	"io"
-
 	"github.com/pierrec/lz4/internal/xxh32"
+	"io"
+	"runtime"
 )
 
+// zResult contains the results of compressing a block.
+type zResult struct {
+	size     uint32 // Block header
+	data     []byte // Compressed data
+	checksum uint32 // Data checksum
+}
+
 // Writer implements the LZ4 frame encoder.
 type Writer struct {
 	Header
@@ -18,10 +25,13 @@ type Writer struct {
 	buf       [19]byte      // magic number(4) + header(flags(2)+[Size(8)+DictID(4)]+checksum(1)) does not exceed 19 bytes
 	dst       io.Writer     // Destination.
 	checksum  xxh32.XXHZero // Frame checksum.
-	zdata     []byte        // Compressed data.
-	data      []byte        // Data to be compressed.
+	data      []byte        // Data to be compressed + buffer for compressed data.
 	idx       int           // Index into data.
 	hashtable [winSize]int  // Hash table used in CompressBlock().
+
+	// For concurrency.
+	c   chan chan zResult // Channel for block compression goroutines and writer goroutine.
+	err error             // Any error encountered while writing to the underlying destination.
 }
 
 // NewWriter returns a new LZ4 frame encoder.
@@ -29,7 +39,54 @@ type Writer struct {
 // The supplied Header is checked at the first Write.
 // It is ok to change it before the first Write but then not until a Reset() is performed.
 func NewWriter(dst io.Writer) *Writer {
-	return &Writer{dst: dst}
+	z := new(Writer)
+	z.Reset(dst)
+	return z
+}
+
+// WithConcurrency sets the number of concurrent go routines used for compression.
+// A negative value sets the concurrency to GOMAXPROCS.
+func (z *Writer) WithConcurrency(n int) *Writer {
+	switch {
+	case n == 0 || n == 1:
+		z.c = nil
+		return z
+	case n < 0:
+		n = runtime.GOMAXPROCS(0)
+	}
+	z.c = make(chan chan zResult, n)
+	// Writer goroutine managing concurrent block compression goroutines.
+	go func() {
+		// Process next block compression item.
+		for c := range z.c {
+			// Read the next compressed block result.
+			// Waiting here ensures that the blocks are output in the order they were sent.
+			res := <-c
+			n := len(res.data)
+			if n == 0 {
+				// Notify the block compression routine that we are done with its result.
+				// This is used when a sentinel block is sent to terminate the compression.
+				close(c)
+				return
+			}
+			// Write the block.
+			if err := z.writeUint32(res.size); err != nil && z.err == nil {
+				z.err = err
+			}
+			if _, err := z.dst.Write(res.data); err != nil && z.err == nil {
+				z.err = err
+			}
+			if z.BlockChecksum {
+				if err := z.writeUint32(res.checksum); err != nil && z.err == nil {
+					z.err = err
+				}
+			}
+			if h := z.OnBlockDone; h != nil {
+				h(n)
+			}
+		}
+	}()
+	return z
 }
 
 // newBuffers instantiates new buffers which size matches the one in Header.
@@ -37,19 +94,14 @@ func NewWriter(dst io.Writer) *Writer {
 func (z *Writer) newBuffers() {
 	bSize := z.Header.BlockMaxSize
 	buf := bsMapValue[bSize].Pool.Get().([]byte)
-	z.data = buf[:bSize]             // Uncompressed buffer is the first half.
-	z.zdata = buf[:cap(buf)][bSize:] // Compressed buffer is the second half.
+	z.data = buf[:bSize] // Uncompressed buffer is the first half.
 }
 
 // freeBuffers puts the writer's buffers back to the pool.
 func (z *Writer) freeBuffers() {
-	if n := cap(z.data); n > 0 {
-		// Put the buffer back into the pool, if any.
-		bSize := z.Header.BlockMaxSize
-		bsMapValue[bSize].Pool.Put(z.data[:n])
-	}
+	// Put the buffer back into the pool, if any.
+	putBuffer(z.Header.BlockMaxSize, z.data)
 	z.data = nil
-	z.zdata = nil
 }
 
 // writeHeader builds and writes the header (magic+header) to the underlying io.Writer.
@@ -169,57 +221,90 @@ func (z *Writer) compressBlock(data []byte) error {
 		z.checksum.Write(data)
 	}
 
-	// The compressed block size cannot exceed the input's.
-	var zn int
+	zdata := z.data[z.Header.BlockMaxSize:cap(z.data)]
+	if z.c == nil {
+		// The compressed block size cannot exceed the input's.
+		var zn int
 
-	if level := z.Header.CompressionLevel; level != 0 {
-		zn = compressBlockHC(data, z.zdata, level)
-	} else {
-		zn = compressBlock(data, z.zdata, z.hashtable[:])
-	}
+		if level := z.Header.CompressionLevel; level != 0 {
+			zn = compressBlockHC(data, zdata, level)
+		} else {
+			zn = compressBlock(data, zdata, z.hashtable[:])
+		}
 
-	var zdata []byte
-	var bLen uint32
-	if debugFlag {
-		debug("block compression %d => %d", len(data), zn)
-	}
-	if zn > 0 && zn < len(data) {
-		// Compressible and compressed size smaller than uncompressed: ok!
-		bLen = uint32(zn)
-		zdata = z.zdata[:zn]
-	} else {
-		// Uncompressed block.
-		bLen = uint32(len(data)) | compressedBlockFlag
-		zdata = data
-	}
-	if debugFlag {
-		debug("block compression to be written len=%d data len=%d", bLen, len(zdata))
-	}
+		var bLen uint32
+		if debugFlag {
+			debug("block compression %d => %d", len(data), zn)
+		}
+		if zn > 0 && zn < len(data) {
+			// Compressible and compressed size smaller than uncompressed: ok!
+			bLen = uint32(zn)
+			zdata = zdata[:zn]
+		} else {
+			// Uncompressed block.
+			bLen = uint32(len(data)) | compressedBlockFlag
+			zdata = data
+		}
+		if debugFlag {
+			debug("block compression to be written len=%d data len=%d", bLen, len(zdata))
+		}
 
-	// Write the block.
-	if err := z.writeUint32(bLen); err != nil {
-		return err
-	}
-	written, err := z.dst.Write(zdata)
-	if err != nil {
-		return err
-	}
-	if h := z.OnBlockDone; h != nil {
-		h(written)
-	}
+		// Write the block.
+		if err := z.writeUint32(bLen); err != nil {
+			return err
+		}
+		written, err := z.dst.Write(zdata)
+		if err != nil {
+			return err
+		}
+		if h := z.OnBlockDone; h != nil {
+			h(written)
+		}
 
-	if !z.BlockChecksum {
+		if !z.BlockChecksum {
+			if debugFlag {
+				debug("current frame checksum %x", z.checksum.Sum32())
+			}
+			return nil
+		}
+		checksum := xxh32.ChecksumZero(zdata)
 		if debugFlag {
-			debug("current frame checksum %x", z.checksum.Sum32())
+			debug("block checksum %x", checksum)
+			defer func() { debug("current frame checksum %x", z.checksum.Sum32()) }()
 		}
-		return nil
-	}
-	checksum := xxh32.ChecksumZero(zdata)
-	if debugFlag {
-		debug("block checksum %x", checksum)
-		defer func() { debug("current frame checksum %x", z.checksum.Sum32()) }()
+		return z.writeUint32(checksum)
 	}
-	return z.writeUint32(checksum)
+
+	odata := z.data
+	z.newBuffers()
+	c := make(chan zResult)
+	z.c <- c // Send now to guarantee order
+	go func(header Header) {
+		// The compressed block size cannot exceed the input's.
+		var zn int
+		if level := header.CompressionLevel; level != 0 {
+			zn = compressBlockHC(data, zdata, level)
+		} else {
+			var hashTable [winSize]int
+			zn = compressBlock(data, zdata, hashTable[:])
+		}
+		var res zResult
+		if zn > 0 && zn < len(data) {
+			// Compressible and compressed size smaller than uncompressed: ok!
+			res.size = uint32(zn)
+			res.data = zdata[:zn]
+		} else {
+			// Uncompressed block.
+			res.size = uint32(len(data)) | compressedBlockFlag
+			res.data = data
+		}
+		if header.BlockChecksum {
+			res.checksum = xxh32.ChecksumZero(res.data)
+		}
+		c <- res
+		putBuffer(header.BlockMaxSize, odata)
+	}(z.Header)
+	return nil
 }
 
 // Flush flushes any pending compressed data to the underlying writer.
@@ -240,6 +325,21 @@ func (z *Writer) Flush() error {
 	return nil
 }
 
+func (z *Writer) close() error {
+	if z.c == nil {
+		return nil
+	}
+	// Send a sentinel block (no data to compress) to terminate the writer main goroutine.
+	c := make(chan zResult)
+	z.c <- c
+	c <- zResult{}
+	// Wait for the main goroutine to complete.
+	<-c
+	// At this point the main goroutine has shut down or is about to return.
+	z.c = nil
+	return z.err
+}
+
 // Close closes the Writer, flushing any unwritten data to the underlying io.Writer, but does not close the underlying io.Writer.
 func (z *Writer) Close() error {
 	if !z.Header.done {
@@ -250,6 +350,9 @@ func (z *Writer) Close() error {
 	if err := z.Flush(); err != nil {
 		return err
 	}
+	if err := z.close(); err != nil {
+		return err
+	}
 	z.freeBuffers()
 
 	if debugFlag {
@@ -272,11 +375,14 @@ func (z *Writer) Close() error {
 // initial state from NewWriter, but instead writing to w.
 // No access to the underlying io.Writer is performed.
 func (z *Writer) Reset(w io.Writer) {
+	n := cap(z.c)
+	_ = z.close()
 	z.freeBuffers()
 	z.Header = Header{}
 	z.dst = w
 	z.checksum.Reset()
 	z.idx = 0
+	z.WithConcurrency(n)
 }
 
 // writeUint32 writes a uint32 to the underlying writer.