Ver código fonte

Merge pull request #1044 from bobrik/compression-levels

Add support for compression levels, closes #1042
Evan Huus 7 anos atrás
pai
commit
511b1ff16d
4 arquivos alterados com 57 adições e 16 exclusões
  1. 16 0
      config.go
  2. 21 7
      message.go
  3. 10 8
      produce_set.go
  4. 10 1
      record_batch.go

+ 16 - 0
config.go

@@ -1,7 +1,10 @@
 package sarama
 
 import (
+	"compress/gzip"
 	"crypto/tls"
+	"fmt"
+	"io/ioutil"
 	"regexp"
 	"time"
 
@@ -99,6 +102,10 @@ type Config struct {
 		// The type of compression to use on messages (defaults to no compression).
 		// Similar to `compression.codec` setting of the JVM producer.
 		Compression CompressionCodec
+		// The level of compression to use on messages. The meaning depends
+		// on the actual compression type used and defaults to default compression
+		// level for the codec.
+		CompressionLevel int
 		// Generates partitioners for choosing the partition to send messages to
 		// (defaults to hashing the message key). Similar to the `partitioner.class`
 		// setting for the JVM producer.
@@ -290,6 +297,7 @@ func NewConfig() *Config {
 	c.Producer.Retry.Max = 3
 	c.Producer.Retry.Backoff = 100 * time.Millisecond
 	c.Producer.Return.Errors = true
+	c.Producer.CompressionLevel = CompressionLevelDefault
 
 	c.Consumer.Fetch.Min = 1
 	c.Consumer.Fetch.Default = 1024 * 1024
@@ -409,6 +417,14 @@ func (c *Config) Validate() error {
 		return ConfigurationError("lz4 compression requires Version >= V0_10_0_0")
 	}
 
+	if c.Producer.Compression == CompressionGZIP {
+		if c.Producer.CompressionLevel != CompressionLevelDefault {
+			if _, err := gzip.NewWriterLevel(ioutil.Discard, c.Producer.CompressionLevel); err != nil {
+				return ConfigurationError(fmt.Sprintf("gzip compression does not work with level %d: %v", c.Producer.CompressionLevel, err))
+			}
+		}
+	}
+
 	// validate the Consumer values
 	switch {
 	case c.Consumer.Fetch.Min <= 0:

+ 21 - 7
message.go

@@ -24,13 +24,19 @@ const (
 	CompressionLZ4    CompressionCodec = 3
 )
 
+// CompressionLevelDefault is the constant to use in CompressionLevel
+// to have the default compression level for any codec. The value is picked
+// that we don't use any existing compression levels.
+const CompressionLevelDefault = -1000
+
 type Message struct {
-	Codec     CompressionCodec // codec used to compress the message contents
-	Key       []byte           // the message key, may be nil
-	Value     []byte           // the message contents
-	Set       *MessageSet      // the message set a message might wrap
-	Version   int8             // v1 requires Kafka 0.10
-	Timestamp time.Time        // the timestamp of the message (version 1+ only)
+	Codec            CompressionCodec // codec used to compress the message contents
+	CompressionLevel int              // compression level
+	Key              []byte           // the message key, may be nil
+	Value            []byte           // the message contents
+	Set              *MessageSet      // the message set a message might wrap
+	Version          int8             // v1 requires Kafka 0.10
+	Timestamp        time.Time        // the timestamp of the message (version 1+ only)
 
 	compressedCache []byte
 	compressedSize  int // used for computing the compression ratio metrics
@@ -66,7 +72,15 @@ func (m *Message) encode(pe packetEncoder) error {
 			payload = m.Value
 		case CompressionGZIP:
 			var buf bytes.Buffer
-			writer := gzip.NewWriter(&buf)
+			var writer *gzip.Writer
+			if m.CompressionLevel != CompressionLevelDefault {
+				writer, err = gzip.NewWriterLevel(&buf, m.CompressionLevel)
+				if err != nil {
+					return err
+				}
+			} else {
+				writer = gzip.NewWriter(&buf)
+			}
 			if _, err = writer.Write(m.Value); err != nil {
 				return err
 			}

+ 10 - 8
produce_set.go

@@ -59,10 +59,11 @@ func (ps *produceSet) add(msg *ProducerMessage) error {
 	if set == nil {
 		if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
 			batch := &RecordBatch{
-				FirstTimestamp: timestamp,
-				Version:        2,
-				ProducerID:     -1, /* No producer id */
-				Codec:          ps.parent.conf.Producer.Compression,
+				FirstTimestamp:   timestamp,
+				Version:          2,
+				ProducerID:       -1, /* No producer id */
+				Codec:            ps.parent.conf.Producer.Compression,
+				CompressionLevel: ps.parent.conf.Producer.CompressionLevel,
 			}
 			set = &partitionSet{recordsToSend: newDefaultRecords(batch)}
 			size = recordBatchOverhead
@@ -157,10 +158,11 @@ func (ps *produceSet) buildRequest() *ProduceRequest {
 					panic(err)
 				}
 				compMsg := &Message{
-					Codec: ps.parent.conf.Producer.Compression,
-					Key:   nil,
-					Value: payload,
-					Set:   set.recordsToSend.msgSet, // Provide the underlying message set for accurate metrics
+					Codec:            ps.parent.conf.Producer.Compression,
+					CompressionLevel: ps.parent.conf.Producer.CompressionLevel,
+					Key:              nil,
+					Value:            payload,
+					Set:              set.recordsToSend.msgSet, // Provide the underlying message set for accurate metrics
 				}
 				if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
 					compMsg.Version = 1

+ 10 - 1
record_batch.go

@@ -40,6 +40,7 @@ type RecordBatch struct {
 	PartitionLeaderEpoch  int32
 	Version               int8
 	Codec                 CompressionCodec
+	CompressionLevel      int
 	Control               bool
 	LastOffsetDelta       int32
 	FirstTimestamp        time.Time
@@ -219,7 +220,15 @@ func (b *RecordBatch) encodeRecords(pe packetEncoder) error {
 		b.compressedRecords = raw
 	case CompressionGZIP:
 		var buf bytes.Buffer
-		writer := gzip.NewWriter(&buf)
+		var writer *gzip.Writer
+		if b.CompressionLevel != CompressionLevelDefault {
+			writer, err = gzip.NewWriterLevel(&buf, b.CompressionLevel)
+			if err != nil {
+				return err
+			}
+		} else {
+			writer = gzip.NewWriter(&buf)
+		}
 		if _, err := writer.Write(raw); err != nil {
 			return err
 		}