Bläddra i källkod

Zstandard (zstd) support for compression

Ivan Babrou 7 år sedan
förälder
incheckning
49c4c9a359
2 ändrade filer med 44 tillägg och 4 borttagningar
  1. 27 4
      message.go
  2. 17 0
      record_batch.go

+ 27 - 4
message.go

@@ -7,6 +7,7 @@ import (
 	"io/ioutil"
 	"time"
 
+	"github.com/DataDog/zstd"
 	"github.com/eapache/go-xerial-snappy"
 	"github.com/pierrec/lz4"
 )
@@ -14,14 +15,15 @@ import (
 // CompressionCodec represents the various compression codecs recognized by Kafka in messages.
 type CompressionCodec int8
 
-// only the last two bits are really used
-const compressionCodecMask int8 = 0x03
+// The lowest 3 bits contain the compression codec used for the message
+const compressionCodecMask int8 = 0x07
 
 const (
 	CompressionNone   CompressionCodec = 0
 	CompressionGZIP   CompressionCodec = 1
 	CompressionSnappy CompressionCodec = 2
 	CompressionLZ4    CompressionCodec = 3
+	CompressionZSTD   CompressionCodec = 4
 )
 
 func (cc CompressionCodec) String() string {
@@ -113,7 +115,17 @@ func (m *Message) encode(pe packetEncoder) error {
 			}
 			m.compressedCache = buf.Bytes()
 			payload = m.compressedCache
-
+		case CompressionZSTD:
+			var buf bytes.Buffer
+			writer := zstd.NewWriterLevel(&buf, m.CompressionLevel)
+			if _, err = writer.Write(m.Value); err != nil {
+				return err
+			}
+			if err = writer.Close(); err != nil {
+				return err
+			}
+			m.compressedCache = buf.Bytes()
+			payload = m.compressedCache
 		default:
 			return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", m.Codec)}
 		}
@@ -207,7 +219,18 @@ func (m *Message) decode(pd packetDecoder) (err error) {
 		if err := m.decodeSet(); err != nil {
 			return err
 		}
-
+	case CompressionZSTD:
+		if m.Value == nil {
+			break
+		}
+		reader := zstd.NewReader(bytes.NewReader(m.Value))
+		defer reader.Close()
+		if m.Value, err = ioutil.ReadAll(reader); err != nil {
+			return err
+		}
+		if err := m.decodeSet(); err != nil {
+			return err
+		}
 	default:
 		return PacketDecodingError{fmt.Sprintf("invalid compression specified (%d)", m.Codec)}
 	}

+ 17 - 0
record_batch.go

@@ -7,6 +7,7 @@ import (
 	"io/ioutil"
 	"time"
 
+	"github.com/DataDog/zstd"
 	"github.com/eapache/go-xerial-snappy"
 	"github.com/pierrec/lz4"
 )
@@ -193,6 +194,12 @@ func (b *RecordBatch) decode(pd packetDecoder) (err error) {
 		if recBuffer, err = ioutil.ReadAll(reader); err != nil {
 			return err
 		}
+	case CompressionZSTD:
+		reader := zstd.NewReader(bytes.NewReader(recBuffer))
+		defer reader.Close()
+		if recBuffer, err = ioutil.ReadAll(reader); err != nil {
+			return err
+		}
 	default:
 		return PacketDecodingError{fmt.Sprintf("invalid compression specified (%d)", b.Codec)}
 	}
@@ -248,6 +255,16 @@ func (b *RecordBatch) encodeRecords(pe packetEncoder) error {
 			return err
 		}
 		b.compressedRecords = buf.Bytes()
+	case CompressionZSTD:
+		var buf bytes.Buffer
+		writer := zstd.NewWriterLevel(&buf, b.CompressionLevel)
+		if _, err = writer.Write(raw); err != nil {
+			return err
+		}
+		if err = writer.Close(); err != nil {
+			return err
+		}
+		b.compressedRecords = buf.Bytes()
 	default:
 		return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)}
 	}