Sfoglia il codice sorgente

Writer: handle internal buffers via sync.Pools

Pierre Curto 6 anni fa
parent
commit
e990da5628
2 ha cambiato i file con 44 aggiunte e 12 eliminazioni
  1. 19 1
      lz4.go
  2. 25 11
      writer.go

+ 19 - 1
lz4.go

@@ -10,6 +10,8 @@
 //
 package lz4
 
+import "sync"
+
 const (
 	// Extension is the LZ4 frame file name extension
 	Extension = ".lz4"
@@ -47,9 +49,25 @@ const (
 
 var (
 	bsMapID    = map[byte]int{4: blockSize64K, 5: blockSize256K, 6: blockSize1M, 7: blockSize4M}
-	bsMapValue = map[int]byte{blockSize64K: 4, blockSize256K: 5, blockSize1M: 6, blockSize4M: 7}
+	bsMapValue = map[int]struct {
+		byte
+		*sync.Pool
+	}{
+		blockSize64K:  {4, newBufferPool(2 * blockSize64K)},
+		blockSize256K: {5, newBufferPool(2 * blockSize256K)},
+		blockSize1M:   {6, newBufferPool(2 * blockSize1M)},
+		blockSize4M:   {7, newBufferPool(2 * blockSize4M)},
+	}
 )
 
+func newBufferPool(size int) *sync.Pool {
+	return &sync.Pool{
+		New: func() interface{} {
+			return make([]byte, size)
+		},
+	}
+}
+
 // Header describes the various flags that can be set on a Writer or obtained from a Reader.
 // The default values match those of the LZ4 frame format definition
 // (http://fastcompression.blogspot.com/2013/04/lz4-streaming-format-final.html).

+ 25 - 11
writer.go

@@ -32,6 +32,26 @@ func NewWriter(dst io.Writer) *Writer {
 	return &Writer{dst: dst}
 }
 
+// newBuffers instantiates new buffers which size matches the one in Header.
+// The returned buffers are for decompression and compression respectively.
+func (z *Writer) newBuffers() {
+	bSize := z.Header.BlockMaxSize
+	buf := bsMapValue[bSize].Pool.Get().([]byte)
+	z.data = buf[:bSize]             // Uncompressed buffer is the first half.
+	z.zdata = buf[:cap(buf)][bSize:] // Compressed buffer is the second half.
+}
+
+// freeBuffers puts the writer's buffers back to the pool.
+func (z *Writer) freeBuffers() {
+	if n := cap(z.data); n > 0 {
+		// Put the buffer back into the pool, if any.
+		bSize := z.Header.BlockMaxSize
+		bsMapValue[bSize].Pool.Put(z.data[:n])
+	}
+	z.data = nil
+	z.zdata = nil
+}
+
 // writeHeader builds and writes the header (magic+header) to the underlying io.Writer.
 func (z *Writer) writeHeader() error {
 	// Default to 4Mb if BlockMaxSize is not set.
@@ -40,19 +60,13 @@ func (z *Writer) writeHeader() error {
 	}
 	// The only option that needs to be validated.
 	bSize := z.Header.BlockMaxSize
-	bSizeID, ok := bsMapValue[bSize]
+	m, ok := bsMapValue[bSize]
 	if !ok {
 		return fmt.Errorf("lz4: invalid block max size: %d", bSize)
 	}
 	// Allocate the compressed/uncompressed buffers.
 	// The compressed buffer cannot exceed the uncompressed one.
-	if cap(z.zdata) < bSize {
-		// Only allocate if there is not enough capacity.
-		// Allocate both buffers at once.
-		z.zdata = make([]byte, 2*bSize)
-	}
-	z.data = z.zdata[:bSize]                 // Uncompressed buffer is the first half.
-	z.zdata = z.zdata[:cap(z.zdata)][bSize:] // Compressed buffer is the second half.
+	z.newBuffers()
 	z.idx = 0
 
 	// Size is optional.
@@ -72,7 +86,7 @@ func (z *Writer) writeHeader() error {
 		flg |= 1 << 2
 	}
 	buf[4] = flg
-	buf[5] = bSizeID << 4
+	buf[5] = m.byte << 4
 
 	// Current buffer size: magic(4) + flags(1) + block max size (1).
 	n := 6
@@ -236,6 +250,7 @@ func (z *Writer) Close() error {
 	if err := z.Flush(); err != nil {
 		return err
 	}
+	z.freeBuffers()
 
 	if debugFlag {
 		debug("writing last empty block")
@@ -257,11 +272,10 @@ func (z *Writer) Close() error {
 // initial state from NewWriter, but instead writing to w.
 // No access to the underlying io.Writer is performed.
 func (z *Writer) Reset(w io.Writer) {
+	z.freeBuffers()
 	z.Header = Header{}
 	z.dst = w
 	z.checksum.Reset()
-	z.zdata = z.zdata[:0]
-	z.data = z.data[:0]
 	z.idx = 0
 }