Browse Source

Add support for compression levels, closes #1042

This makes it possible to set compression level for gzip:

```
config.Producer.CompressionLevel = flate.BestCompression
```

There are no other compression codecs that support levels,
but the code is generic to easily add other codecs with
numeric levels.

Default compression level tracks what Go libraries
set as the default level.
Ivan Babrou 7 years ago
parent
commit
1f78d8a7f1
4 changed files with 46 additions and 16 deletions
  1. 5 0
      config.go
  2. 21 7
      message.go
  3. 10 8
      produce_set.go
  4. 10 1
      record_batch.go

+ 5 - 0
config.go

@@ -99,6 +99,10 @@ type Config struct {
 		// The type of compression to use on messages (defaults to no compression).
 		// Similar to `compression.codec` setting of the JVM producer.
 		Compression CompressionCodec
+		// The level of compression to use on messages. The meaning depends
+		// on the actual compression type used and defaults to default compression
+		// level for the codec.
+		CompressionLevel int
 		// Generates partitioners for choosing the partition to send messages to
 		// (defaults to hashing the message key). Similar to the `partitioner.class`
 		// setting for the JVM producer.
@@ -290,6 +294,7 @@ func NewConfig() *Config {
 	c.Producer.Retry.Max = 3
 	c.Producer.Retry.Backoff = 100 * time.Millisecond
 	c.Producer.Return.Errors = true
+	c.Producer.CompressionLevel = CompressionLevelDefault
 
 	c.Consumer.Fetch.Min = 1
 	c.Consumer.Fetch.Default = 1024 * 1024

+ 21 - 7
message.go

@@ -24,13 +24,19 @@ const (
 	CompressionLZ4    CompressionCodec = 3
 )
 
+// CompressionLevelDefault is the constant to use in CompressionLevel
+// to have the default compression level for any codec. The value is picked
+// that we don't use any existing compression levels.
+const CompressionLevelDefault = -1000
+
 type Message struct {
-	Codec     CompressionCodec // codec used to compress the message contents
-	Key       []byte           // the message key, may be nil
-	Value     []byte           // the message contents
-	Set       *MessageSet      // the message set a message might wrap
-	Version   int8             // v1 requires Kafka 0.10
-	Timestamp time.Time        // the timestamp of the message (version 1+ only)
+	Codec            CompressionCodec // codec used to compress the message contents
+	CompressionLevel int              // compression level
+	Key              []byte           // the message key, may be nil
+	Value            []byte           // the message contents
+	Set              *MessageSet      // the message set a message might wrap
+	Version          int8             // v1 requires Kafka 0.10
+	Timestamp        time.Time        // the timestamp of the message (version 1+ only)
 
 	compressedCache []byte
 	compressedSize  int // used for computing the compression ratio metrics
@@ -66,7 +72,15 @@ func (m *Message) encode(pe packetEncoder) error {
 			payload = m.Value
 		case CompressionGZIP:
 			var buf bytes.Buffer
-			writer := gzip.NewWriter(&buf)
+			var writer *gzip.Writer
+			if m.CompressionLevel != CompressionLevelDefault {
+				writer, err = gzip.NewWriterLevel(&buf, m.CompressionLevel)
+				if err != nil {
+					return err
+				}
+			} else {
+				writer = gzip.NewWriter(&buf)
+			}
 			if _, err = writer.Write(m.Value); err != nil {
 				return err
 			}

+ 10 - 8
produce_set.go

@@ -59,10 +59,11 @@ func (ps *produceSet) add(msg *ProducerMessage) error {
 	if set == nil {
 		if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
 			batch := &RecordBatch{
-				FirstTimestamp: timestamp,
-				Version:        2,
-				ProducerID:     -1, /* No producer id */
-				Codec:          ps.parent.conf.Producer.Compression,
+				FirstTimestamp:   timestamp,
+				Version:          2,
+				ProducerID:       -1, /* No producer id */
+				Codec:            ps.parent.conf.Producer.Compression,
+				CompressionLevel: ps.parent.conf.Producer.CompressionLevel,
 			}
 			set = &partitionSet{recordsToSend: newDefaultRecords(batch)}
 			size = recordBatchOverhead
@@ -157,10 +158,11 @@ func (ps *produceSet) buildRequest() *ProduceRequest {
 					panic(err)
 				}
 				compMsg := &Message{
-					Codec: ps.parent.conf.Producer.Compression,
-					Key:   nil,
-					Value: payload,
-					Set:   set.recordsToSend.msgSet, // Provide the underlying message set for accurate metrics
+					Codec:            ps.parent.conf.Producer.Compression,
+					CompressionLevel: ps.parent.conf.Producer.CompressionLevel,
+					Key:              nil,
+					Value:            payload,
+					Set:              set.recordsToSend.msgSet, // Provide the underlying message set for accurate metrics
 				}
 				if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
 					compMsg.Version = 1

+ 10 - 1
record_batch.go

@@ -40,6 +40,7 @@ type RecordBatch struct {
 	PartitionLeaderEpoch  int32
 	Version               int8
 	Codec                 CompressionCodec
+	CompressionLevel      int
 	Control               bool
 	LastOffsetDelta       int32
 	FirstTimestamp        time.Time
@@ -219,7 +220,15 @@ func (b *RecordBatch) encodeRecords(pe packetEncoder) error {
 		b.compressedRecords = raw
 	case CompressionGZIP:
 		var buf bytes.Buffer
-		writer := gzip.NewWriter(&buf)
+		var writer *gzip.Writer
+		if b.CompressionLevel != CompressionLevelDefault {
+			writer, err = gzip.NewWriterLevel(&buf, b.CompressionLevel)
+			if err != nil {
+				return err
+			}
+		} else {
+			writer = gzip.NewWriter(&buf)
+		}
 		if _, err := writer.Write(raw); err != nil {
 			return err
 		}