|
@@ -10,6 +10,9 @@ import (
|
|
|
// CompressionCodec represents the various compression codecs recognized by Kafka in messages.
|
|
// CompressionCodec represents the various compression codecs recognized by Kafka in messages.
|
|
|
type CompressionCodec int8
|
|
type CompressionCodec int8
|
|
|
|
|
|
|
|
|
|
+// only the last three bits are really used
|
|
|
|
|
+const compressionCodecMask int8 = 0x07
|
|
|
|
|
+
|
|
|
const (
|
|
const (
|
|
|
CompressionNone CompressionCodec = 0
|
|
CompressionNone CompressionCodec = 0
|
|
|
CompressionGZIP CompressionCodec = 1
|
|
CompressionGZIP CompressionCodec = 1
|
|
@@ -33,7 +36,7 @@ func (m *Message) encode(pe packetEncoder) error {
|
|
|
|
|
|
|
|
pe.putInt8(messageFormat)
|
|
pe.putInt8(messageFormat)
|
|
|
|
|
|
|
|
- attributes := int8(m.Codec) & 0x07
|
|
|
|
|
|
|
+ attributes := int8(m.Codec) & compressionCodecMask
|
|
|
pe.putInt8(attributes)
|
|
pe.putInt8(attributes)
|
|
|
|
|
|
|
|
err := pe.putBytes(m.Key)
|
|
err := pe.putBytes(m.Key)
|
|
@@ -95,7 +98,7 @@ func (m *Message) decode(pd packetDecoder) (err error) {
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return err
|
|
return err
|
|
|
}
|
|
}
|
|
|
- m.Codec = CompressionCodec(attribute & 0x07)
|
|
|
|
|
|
|
+ m.Codec = CompressionCodec(attribute & compressionCodecMask)
|
|
|
|
|
|
|
|
m.Key, err = pd.getBytes()
|
|
m.Key, err = pd.getBytes()
|
|
|
if err != nil {
|
|
if err != nil {
|