Browse Source

Writer: fixed Reset() not clearing error field
fixed Flush() waiting for completion before returning in concurrency mode

Pierre Curto 6 years ago
parent
commit
d34061778c
2 changed files with 19 additions and 18 deletions
  1. 8 14
      block.go
  2. 11 4
      writer.go

+ 8 - 14
block.go

@@ -45,17 +45,14 @@ func CompressBlock(src, dst []byte, hashTable []int) (_ int, err error) {
 		return 0, fmt.Errorf("hash table too small, should be at least %d in size", htSize)
 	}
 	defer recoverBlock(&err)
-	return compressBlock(src, dst, hashTable), nil
-}
 
-func compressBlock(src, dst []byte, hashTable []int) int {
 	// adaptSkipLog sets how quickly the compressor begins skipping blocks when data is incompressible.
 	// This significantly speeds up incompressible data and usually has very small impact on compresssion.
 	// bytes to skip =  1 + (bytes since last match >> adaptSkipLog)
 	const adaptSkipLog = 7
 	sn, dn := len(src)-mfLimit, len(dst)
 	if sn <= 0 || dn == 0 {
-		return 0
+		return 0, nil
 	}
 	// Prove to the compiler the table has at least htSize elements.
 	// The compiler can see that "uint32() >> hashShift" cannot be out of bounds.
@@ -189,7 +186,7 @@ func compressBlock(src, dst []byte, hashTable []int) int {
 
 	if anchor == 0 {
 		// Incompressible.
-		return 0
+		return 0, nil
 	}
 
 	// Last literals.
@@ -210,10 +207,10 @@ func compressBlock(src, dst []byte, hashTable []int) int {
 	// Write the last literals.
 	if di >= anchor {
 		// Incompressible.
-		return 0
+		return 0, nil
 	}
 	di += copy(dst[di:di+len(src)-anchor], src[anchor:])
-	return di
+	return di, nil
 }
 
 // blockHash hashes 4 bytes into a value < winSize.
@@ -232,10 +229,7 @@ func blockHashHC(x uint32) uint32 {
 // An error is returned if the destination buffer is too small.
 func CompressBlockHC(src, dst []byte, depth int) (_ int, err error) {
 	defer recoverBlock(&err)
-	return compressBlockHC(src, dst, depth), nil
-}
 
-func compressBlockHC(src, dst []byte, depth int) int {
 	// adaptSkipLog sets how quickly the compressor begins skipping blocks when data is incompressible.
 	// This significantly speeds up incompressible data and usually has very small impact on compresssion.
 	// bytes to skip =  1 + (bytes since last match >> adaptSkipLog)
@@ -243,7 +237,7 @@ func compressBlockHC(src, dst []byte, depth int) int {
 
 	sn, dn := len(src)-mfLimit, len(dst)
 	if sn <= 0 || dn == 0 {
-		return 0
+		return 0, nil
 	}
 	var si, di int
 
@@ -364,7 +358,7 @@ func compressBlockHC(src, dst []byte, depth int) int {
 
 	if anchor == 0 {
 		// Incompressible.
-		return 0
+		return 0, nil
 	}
 
 	// Last literals.
@@ -386,8 +380,8 @@ func compressBlockHC(src, dst []byte, depth int) int {
 	// Write the last literals.
 	if di >= anchor {
 		// Incompressible.
-		return 0
+		return 0, nil
 	}
 	di += copy(dst[di:di+len(src)-anchor], src[anchor:])
-	return di
+	return di, nil
 }

+ 11 - 4
writer.go

@@ -40,6 +40,7 @@ type Writer struct {
 // It is ok to change it before the first Write but then not until a Reset() is performed.
 func NewWriter(dst io.Writer) *Writer {
 	z := new(Writer)
+	//z.WithConcurrency(4)
 	z.Reset(dst)
 	return z
 }
@@ -227,9 +228,9 @@ func (z *Writer) compressBlock(data []byte) error {
 		var zn int
 
 		if level := z.Header.CompressionLevel; level != 0 {
-			zn = compressBlockHC(data, zdata, level)
+			zn, _ = CompressBlockHC(data, zdata, level)
 		} else {
-			zn = compressBlock(data, zdata, z.hashtable[:])
+			zn, _ = CompressBlock(data, zdata, z.hashtable[:])
 		}
 
 		var bLen uint32
@@ -283,10 +284,10 @@ func (z *Writer) compressBlock(data []byte) error {
 		// The compressed block size cannot exceed the input's.
 		var zn int
 		if level := header.CompressionLevel; level != 0 {
-			zn = compressBlockHC(data, zdata, level)
+			zn, _ = CompressBlockHC(data, zdata, level)
 		} else {
 			var hashTable [winSize]int
-			zn = compressBlock(data, zdata, hashTable[:])
+			zn, _ = CompressBlock(data, zdata, hashTable[:])
 		}
 		var res zResult
 		if zn > 0 && zn < len(data) {
@@ -318,9 +319,14 @@ func (z *Writer) Flush() error {
 		return nil
 	}
 
+	// Disable concurrency for now.
+	c := z.c
+	z.c = nil
 	if err := z.compressBlock(z.data[:z.idx]); err != nil {
 		return err
 	}
+	z.c = c // Restore concurrency.
+
 	z.idx = 0
 	return nil
 }
@@ -382,6 +388,7 @@ func (z *Writer) Reset(w io.Writer) {
 	z.dst = w
 	z.checksum.Reset()
 	z.idx = 0
+	z.err = nil
 	z.WithConcurrency(n)
 }