Browse Source

Add lz4 compression

Rene Treffer 9 years ago
parent
commit
fea677b253
1 changed files with 26 additions and 0 deletions
  1. 26 0
      message.go

+ 26 - 0
message.go

@@ -8,6 +8,7 @@ import (
 	"time"
 
 	"github.com/eapache/go-xerial-snappy"
+	"github.com/pierrec/lz4"
 )
 
 // CompressionCodec represents the various compression codecs recognized by Kafka in messages.
@@ -20,6 +21,7 @@ const (
 	CompressionNone   CompressionCodec = 0
 	CompressionGZIP   CompressionCodec = 1
 	CompressionSnappy CompressionCodec = 2
+	CompressionLZ4    CompressionCodec = 3
 )
 
 type Message struct {
@@ -74,6 +76,18 @@ func (m *Message) encode(pe packetEncoder) error {
 			tmp := snappy.Encode(m.Value)
 			m.compressedCache = tmp
 			payload = m.compressedCache
+		case CompressionLZ4:
+			var buf bytes.Buffer
+			writer := lz4.NewWriter(&buf)
+			if _, err = writer.Write(m.Value); err != nil {
+				return err
+			}
+			if err = writer.Close(); err != nil {
+				return err
+			}
+			m.compressedCache = buf.Bytes()
+			payload = m.compressedCache
+
 		default:
 			return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", m.Codec)}
 		}
@@ -148,6 +162,18 @@ func (m *Message) decode(pd packetDecoder) (err error) {
 		if err := m.decodeSet(); err != nil {
 			return err
 		}
+	case CompressionLZ4:
+		if m.Value == nil {
+			break
+		}
+		reader := lz4.NewReader(bytes.NewReader(m.Value))
+		if m.Value, err = ioutil.ReadAll(reader); err != nil {
+			return err
+		}
+		if err := m.decodeSet(); err != nil {
+			return err
+		}
+
 	default:
 		return PacketDecodingError{fmt.Sprintf("invalid compression specified (%d)", m.Codec)}
 	}