|
@@ -5,6 +5,8 @@ import (
|
|
|
"compress/gzip"
|
|
"compress/gzip"
|
|
|
"fmt"
|
|
"fmt"
|
|
|
"io/ioutil"
|
|
"io/ioutil"
|
|
|
|
|
+
|
|
|
|
|
+ "github.com/eapache/go-xerial-snappy"
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
// CompressionCodec represents the various compression codecs recognized by Kafka in messages.
|
|
// CompressionCodec represents the various compression codecs recognized by Kafka in messages.
|
|
@@ -66,7 +68,7 @@ func (m *Message) encode(pe packetEncoder) error {
|
|
|
m.compressedCache = buf.Bytes()
|
|
m.compressedCache = buf.Bytes()
|
|
|
payload = m.compressedCache
|
|
payload = m.compressedCache
|
|
|
case CompressionSnappy:
|
|
case CompressionSnappy:
|
|
|
- tmp := snappyEncode(m.Value)
|
|
|
|
|
|
|
+ tmp := snappy.Encode(m.Value)
|
|
|
m.compressedCache = tmp
|
|
m.compressedCache = tmp
|
|
|
payload = m.compressedCache
|
|
payload = m.compressedCache
|
|
|
default:
|
|
default:
|
|
@@ -132,7 +134,7 @@ func (m *Message) decode(pd packetDecoder) (err error) {
|
|
|
if m.Value == nil {
|
|
if m.Value == nil {
|
|
|
break
|
|
break
|
|
|
}
|
|
}
|
|
|
- if m.Value, err = snappyDecode(m.Value); err != nil {
|
|
|
|
|
|
|
+ if m.Value, err = snappy.Decode(m.Value); err != nil {
|
|
|
return err
|
|
return err
|
|
|
}
|
|
}
|
|
|
if err := m.decodeSet(); err != nil {
|
|
if err := m.decodeSet(); err != nil {
|