Browse Source

Merge pull request #786 from rtreffer/master

Add lz4 compression
Evan Huus 9 năm trước cách đây
mục cha
commit
e6df07fb9c
4 tập tin đã thay đổi với 95 bổ sung1 xóa
  1. 4 0
      config.go
  2. 12 0
      config_test.go
  3. 26 0
      message.go
  4. 53 1
      message_test.go

+ 4 - 0
config.go

@@ -380,6 +380,10 @@ func (c *Config) Validate() error {
 		return ConfigurationError("Producer.Retry.Backoff must be >= 0")
 	}
 
+	if c.Producer.Compression == CompressionLZ4 && !c.Version.IsAtLeast(V0_10_0_0) {
+		return ConfigurationError("lz4 compression requires Version >= V0_10_0_0")
+	}
+
 	// validate the Consumer values
 	switch {
 	case c.Consumer.Fetch.Min <= 0:

+ 12 - 0
config_test.go

@@ -33,6 +33,18 @@ func TestEmptyClientIDConfigValidates(t *testing.T) {
 	}
 }
 
+func TestLZ4ConfigValidation(t *testing.T) {
+	config := NewConfig()
+	config.Producer.Compression = CompressionLZ4
+	if err := config.Validate(); string(err.(ConfigurationError)) != "lz4 compression requires Version >= V0_10_0_0" {
+		t.Error("Expected invalid lz4/kakfa version error, got ", err)
+	}
+	config.Version = V0_10_0_0
+	if err := config.Validate(); err != nil {
+		t.Error("Expected lz4 to work, got ", err)
+	}
+}
+
 // This example shows how to integrate with an existing registry as well as publishing metrics
 // on the standard output
 func ExampleConfig_metrics() {

+ 26 - 0
message.go

@@ -8,6 +8,7 @@ import (
 	"time"
 
 	"github.com/eapache/go-xerial-snappy"
+	"github.com/pierrec/lz4"
 )
 
 // CompressionCodec represents the various compression codecs recognized by Kafka in messages.
@@ -20,6 +21,7 @@ const (
 	CompressionNone   CompressionCodec = 0
 	CompressionGZIP   CompressionCodec = 1
 	CompressionSnappy CompressionCodec = 2
+	CompressionLZ4    CompressionCodec = 3
 )
 
 type Message struct {
@@ -75,6 +77,18 @@ func (m *Message) encode(pe packetEncoder) error {
 			tmp := snappy.Encode(m.Value)
 			m.compressedCache = tmp
 			payload = m.compressedCache
+		case CompressionLZ4:
+			var buf bytes.Buffer
+			writer := lz4.NewWriter(&buf)
+			if _, err = writer.Write(m.Value); err != nil {
+				return err
+			}
+			if err = writer.Close(); err != nil {
+				return err
+			}
+			m.compressedCache = buf.Bytes()
+			payload = m.compressedCache
+
 		default:
 			return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", m.Codec)}
 		}
@@ -155,6 +169,18 @@ func (m *Message) decode(pd packetDecoder) (err error) {
 		if err := m.decodeSet(); err != nil {
 			return err
 		}
+	case CompressionLZ4:
+		if m.Value == nil {
+			break
+		}
+		reader := lz4.NewReader(bytes.NewReader(m.Value))
+		if m.Value, err = ioutil.ReadAll(reader); err != nil {
+			return err
+		}
+		if err := m.decodeSet(); err != nil {
+			return err
+		}
+
 	default:
 		return PacketDecodingError{fmt.Sprintf("invalid compression specified (%d)", m.Codec)}
 	}

+ 53 - 1
message_test.go

@@ -1,6 +1,9 @@
 package sarama
 
-import "testing"
+import (
+	"testing"
+	"time"
+)
 
 var (
 	emptyMessage = []byte{
@@ -21,6 +24,19 @@ var (
 		0x08,
 		0, 0, 9, 110, 136, 0, 255, 1, 0, 0, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0}
 
+	emptyLZ4Message = []byte{
+		132, 219, 238, 101, // CRC
+		0x01,                          // version byte
+		0x03,                          // attribute flags: lz4
+		0, 0, 1, 88, 141, 205, 89, 56, // timestamp
+		0xFF, 0xFF, 0xFF, 0xFF, // key
+		0x00, 0x00, 0x00, 0x0f, // len
+		0x04, 0x22, 0x4D, 0x18, // LZ4 magic number
+		100,                  // LZ4 flags: version 01, block indepedant, content checksum
+		112, 185, 0, 0, 0, 0, // LZ4 data
+		5, 93, 204, 2, // LZ4 checksum
+	}
+
 	emptyBulkSnappyMessage = []byte{
 		180, 47, 53, 209, //CRC
 		0x00,                   // magic version byte
@@ -41,6 +57,20 @@ var (
 		0x1f, 0x8b, // Gzip Magic
 		0x08, // deflate compressed
 		0, 0, 0, 0, 0, 0, 0, 99, 96, 128, 3, 190, 202, 112, 143, 7, 12, 12, 255, 129, 0, 33, 200, 192, 136, 41, 3, 0, 199, 226, 155, 70, 52, 0, 0, 0}
+
+	emptyBulkLZ4Message = []byte{
+		246, 12, 188, 129, // CRC
+		0x01,                                  // Version
+		0x03,                                  // attribute flags (LZ4)
+		255, 255, 249, 209, 212, 181, 73, 201, // timestamp
+		0xFF, 0xFF, 0xFF, 0xFF, // key
+		0x00, 0x00, 0x00, 0x47, // len
+		0x04, 0x22, 0x4D, 0x18, // magic number lz4
+		100, // lz4 flags 01100100
+		// version: 01, block indep: 1, block checksum: 0, content size: 0, content checksum: 1, reserved: 00
+		112, 185, 52, 0, 0, 128, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 14, 121, 87, 72, 224, 0, 0, 255, 255, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 14, 121, 87, 72, 224, 0, 0, 255, 255, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0,
+		71, 129, 23, 111, // LZ4 checksum
+	}
 )
 
 func TestMessageEncoding(t *testing.T) {
@@ -50,6 +80,12 @@ func TestMessageEncoding(t *testing.T) {
 	message.Value = []byte{}
 	message.Codec = CompressionGZIP
 	testEncodable(t, "empty gzip", &message, emptyGzipMessage)
+
+	message.Value = []byte{}
+	message.Codec = CompressionLZ4
+	message.Timestamp = time.Unix(1479847795, 0)
+	message.Version = 1
+	testEncodable(t, "empty lz4", &message, emptyLZ4Message)
 }
 
 func TestMessageDecoding(t *testing.T) {
@@ -111,3 +147,19 @@ func TestMessageDecodingBulkGzip(t *testing.T) {
 		t.Errorf("Decoding produced a set with %d messages, but 2 were expected.", len(message.Set.Messages))
 	}
 }
+
+func TestMessageDecodingBulkLZ4(t *testing.T) {
+	message := Message{}
+	testDecodable(t, "bulk lz4", &message, emptyBulkLZ4Message)
+	if message.Codec != CompressionLZ4 {
+		t.Errorf("Decoding produced codec %d, but expected %d.", message.Codec, CompressionLZ4)
+	}
+	if message.Key != nil {
+		t.Errorf("Decoding produced key %+v, but none was expected.", message.Key)
+	}
+	if message.Set == nil {
+		t.Error("Decoding produced no set, but one was expected.")
+	} else if len(message.Set.Messages) != 2 {
+		t.Errorf("Decoding produced a set with %d messages, but 2 were expected.", len(message.Set.Messages))
+	}
+}