Browse Source

Reuse compression writer objects

Similar to decompressing, you can reuse *lz4.Writer and *gzip.Writer
objects by calling Reset() between uses. Add two more sync.Pools so we
can reuse writers easily.

The gzip writer is only reused when using the default compression. We
could maintain a separate sync.Pool for each gzip compression level,
but that adds more complexity and gzip wasn't that slow without this
optimization.

When producing 100 msgs/second across 10 producers, CPU usage dropped
from 70% to 4% on my machine (lz4). Gzip dropped from 15% to 5%.
Muir Manders 7 years ago
parent
commit
f352e5cc8b
4 changed files with 88 additions and 106 deletions
  1. 75 0
      compress.go
  2. 6 6
      decompress.go
  3. 5 51
      message.go
  4. 2 49
      record_batch.go

+ 75 - 0
compress.go

@@ -0,0 +1,75 @@
+package sarama
+
+import (
+	"bytes"
+	"compress/gzip"
+	"fmt"
+	"sync"
+
+	"github.com/eapache/go-xerial-snappy"
+	"github.com/pierrec/lz4"
+)
+
+var (
+	lz4WriterPool = sync.Pool{
+		New: func() interface{} {
+			return lz4.NewWriter(nil)
+		},
+	}
+
+	gzipWriterPool = sync.Pool{
+		New: func() interface{} {
+			return gzip.NewWriter(nil)
+		},
+	}
+)
+
+func compress(cc CompressionCodec, level int, data []byte) ([]byte, error) {
+	switch cc {
+	case CompressionNone:
+		return data, nil
+	case CompressionGZIP:
+		var (
+			err    error
+			buf    bytes.Buffer
+			writer *gzip.Writer
+		)
+		if level != CompressionLevelDefault {
+			writer, err = gzip.NewWriterLevel(&buf, level)
+			if err != nil {
+				return nil, err
+			}
+		} else {
+			writer = gzipWriterPool.Get().(*gzip.Writer)
+			defer gzipWriterPool.Put(writer)
+			writer.Reset(&buf)
+		}
+		if _, err := writer.Write(data); err != nil {
+			return nil, err
+		}
+		if err := writer.Close(); err != nil {
+			return nil, err
+		}
+		return buf.Bytes(), nil
+	case CompressionSnappy:
+		return snappy.Encode(data), nil
+	case CompressionLZ4:
+		writer := lz4WriterPool.Get().(*lz4.Writer)
+		defer lz4WriterPool.Put(writer)
+
+		var buf bytes.Buffer
+		writer.Reset(&buf)
+
+		if _, err := writer.Write(data); err != nil {
+			return nil, err
+		}
+		if err := writer.Close(); err != nil {
+			return nil, err
+		}
+		return buf.Bytes(), nil
+	case CompressionZSTD:
+		return zstdCompressLevel(nil, data, level)
+	default:
+		return nil, PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", cc)}
+	}
+}

+ 6 - 6
decompress.go

@@ -12,13 +12,13 @@ import (
 )
 
 var (
-	lz4Pool = sync.Pool{
+	lz4ReaderPool = sync.Pool{
 		New: func() interface{} {
 			return lz4.NewReader(nil)
 		},
 	}
 
-	gzipPool sync.Pool
+	gzipReaderPool sync.Pool
 )
 
 func decompress(cc CompressionCodec, data []byte) ([]byte, error) {
@@ -29,7 +29,7 @@ func decompress(cc CompressionCodec, data []byte) ([]byte, error) {
 		var (
 			err        error
 			reader     *gzip.Reader
-			readerIntf = gzipPool.Get()
+			readerIntf = gzipReaderPool.Get()
 		)
 		if readerIntf != nil {
 			reader = readerIntf.(*gzip.Reader)
@@ -40,7 +40,7 @@ func decompress(cc CompressionCodec, data []byte) ([]byte, error) {
 			}
 		}
 
-		defer gzipPool.Put(reader)
+		defer gzipReaderPool.Put(reader)
 
 		if err := reader.Reset(bytes.NewReader(data)); err != nil {
 			return nil, err
@@ -50,8 +50,8 @@ func decompress(cc CompressionCodec, data []byte) ([]byte, error) {
 	case CompressionSnappy:
 		return snappy.Decode(data)
 	case CompressionLZ4:
-		reader := lz4Pool.Get().(*lz4.Reader)
-		defer lz4Pool.Put(reader)
+		reader := lz4ReaderPool.Get().(*lz4.Reader)
+		defer lz4ReaderPool.Put(reader)
 
 		reader.Reset(bytes.NewReader(data))
 		return ioutil.ReadAll(reader)

+ 5 - 51
message.go

@@ -1,13 +1,8 @@
 package sarama
 
 import (
-	"bytes"
-	"compress/gzip"
 	"fmt"
 	"time"
-
-	"github.com/eapache/go-xerial-snappy"
-	"github.com/pierrec/lz4"
 )
 
 // CompressionCodec represents the various compression codecs recognized by Kafka in messages.
@@ -76,53 +71,12 @@ func (m *Message) encode(pe packetEncoder) error {
 		payload = m.compressedCache
 		m.compressedCache = nil
 	} else if m.Value != nil {
-		switch m.Codec {
-		case CompressionNone:
-			payload = m.Value
-		case CompressionGZIP:
-			var buf bytes.Buffer
-			var writer *gzip.Writer
-			if m.CompressionLevel != CompressionLevelDefault {
-				writer, err = gzip.NewWriterLevel(&buf, m.CompressionLevel)
-				if err != nil {
-					return err
-				}
-			} else {
-				writer = gzip.NewWriter(&buf)
-			}
-			if _, err = writer.Write(m.Value); err != nil {
-				return err
-			}
-			if err = writer.Close(); err != nil {
-				return err
-			}
-			m.compressedCache = buf.Bytes()
-			payload = m.compressedCache
-		case CompressionSnappy:
-			tmp := snappy.Encode(m.Value)
-			m.compressedCache = tmp
-			payload = m.compressedCache
-		case CompressionLZ4:
-			var buf bytes.Buffer
-			writer := lz4.NewWriter(&buf)
-			if _, err = writer.Write(m.Value); err != nil {
-				return err
-			}
-			if err = writer.Close(); err != nil {
-				return err
-			}
-			m.compressedCache = buf.Bytes()
-			payload = m.compressedCache
-		case CompressionZSTD:
-			c, err := zstdCompressLevel(nil, m.Value, m.CompressionLevel)
-			if err != nil {
-				return err
-			}
-			m.compressedCache = c
-			payload = m.compressedCache
-		default:
-			return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", m.Codec)}
+
+		payload, err = compress(m.Codec, m.CompressionLevel, m.Value)
+		if err != nil {
+			return err
 		}
+		m.compressedCache = payload
 		// Keep in mind the compressed payload size for metric gathering
 		m.compressedSize = len(payload)
 	}

+ 2 - 49
record_batch.go

@@ -1,13 +1,8 @@
 package sarama
 
 import (
-	"bytes"
-	"compress/gzip"
 	"fmt"
 	"time"
-
-	"github.com/eapache/go-xerial-snappy"
-	"github.com/pierrec/lz4"
 )
 
 const recordBatchOverhead = 49
@@ -196,50 +191,8 @@ func (b *RecordBatch) encodeRecords(pe packetEncoder) error {
 	}
 	b.recordsLen = len(raw)
 
-	switch b.Codec {
-	case CompressionNone:
-		b.compressedRecords = raw
-	case CompressionGZIP:
-		var buf bytes.Buffer
-		var writer *gzip.Writer
-		if b.CompressionLevel != CompressionLevelDefault {
-			writer, err = gzip.NewWriterLevel(&buf, b.CompressionLevel)
-			if err != nil {
-				return err
-			}
-		} else {
-			writer = gzip.NewWriter(&buf)
-		}
-		if _, err := writer.Write(raw); err != nil {
-			return err
-		}
-		if err := writer.Close(); err != nil {
-			return err
-		}
-		b.compressedRecords = buf.Bytes()
-	case CompressionSnappy:
-		b.compressedRecords = snappy.Encode(raw)
-	case CompressionLZ4:
-		var buf bytes.Buffer
-		writer := lz4.NewWriter(&buf)
-		if _, err := writer.Write(raw); err != nil {
-			return err
-		}
-		if err := writer.Close(); err != nil {
-			return err
-		}
-		b.compressedRecords = buf.Bytes()
-	case CompressionZSTD:
-		c, err := zstdCompressLevel(nil, raw, b.CompressionLevel)
-		if err != nil {
-			return err
-		}
-		b.compressedRecords = c
-	default:
-		return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)}
-	}
-
-	return nil
+	b.compressedRecords, err = compress(b.Codec, b.CompressionLevel, raw)
+	return err
 }
 
 func (b *RecordBatch) computeAttributes() int16 {