123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239 |
- package sarama
- import (
- "bytes"
- "compress/gzip"
- "fmt"
- "io/ioutil"
- "time"
- "github.com/eapache/go-xerial-snappy"
- "github.com/pierrec/lz4"
- )
- type CompressionCodec int8
- const compressionCodecMask int8 = 0x07
- const (
- CompressionNone CompressionCodec = 0
- CompressionGZIP CompressionCodec = 1
- CompressionSnappy CompressionCodec = 2
- CompressionLZ4 CompressionCodec = 3
- CompressionZSTD CompressionCodec = 4
- )
- func (cc CompressionCodec) String() string {
- return []string{
- "none",
- "gzip",
- "snappy",
- "lz4",
- }[int(cc)]
- }
- const CompressionLevelDefault = -1000
- type Message struct {
- Codec CompressionCodec
- CompressionLevel int
- Key []byte
- Value []byte
- Set *MessageSet
- Version int8
- Timestamp time.Time
- compressedCache []byte
- compressedSize int
- }
- func (m *Message) encode(pe packetEncoder) error {
- pe.push(newCRC32Field(crcIEEE))
- pe.putInt8(m.Version)
- attributes := int8(m.Codec) & compressionCodecMask
- pe.putInt8(attributes)
- if m.Version >= 1 {
- if err := (Timestamp{&m.Timestamp}).encode(pe); err != nil {
- return err
- }
- }
- err := pe.putBytes(m.Key)
- if err != nil {
- return err
- }
- var payload []byte
- if m.compressedCache != nil {
- payload = m.compressedCache
- m.compressedCache = nil
- } else if m.Value != nil {
- switch m.Codec {
- case CompressionNone:
- payload = m.Value
- case CompressionGZIP:
- var buf bytes.Buffer
- var writer *gzip.Writer
- if m.CompressionLevel != CompressionLevelDefault {
- writer, err = gzip.NewWriterLevel(&buf, m.CompressionLevel)
- if err != nil {
- return err
- }
- } else {
- writer = gzip.NewWriter(&buf)
- }
- if _, err = writer.Write(m.Value); err != nil {
- return err
- }
- if err = writer.Close(); err != nil {
- return err
- }
- m.compressedCache = buf.Bytes()
- payload = m.compressedCache
- case CompressionSnappy:
- tmp := snappy.Encode(m.Value)
- m.compressedCache = tmp
- payload = m.compressedCache
- case CompressionLZ4:
- var buf bytes.Buffer
- writer := lz4.NewWriter(&buf)
- if _, err = writer.Write(m.Value); err != nil {
- return err
- }
- if err = writer.Close(); err != nil {
- return err
- }
- m.compressedCache = buf.Bytes()
- payload = m.compressedCache
- case CompressionZSTD:
- c, err := zstdCompressLevel(nil, m.Value, m.CompressionLevel)
- if err != nil {
- return err
- }
- m.compressedCache = c
- payload = m.compressedCache
- default:
- return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", m.Codec)}
- }
-
- m.compressedSize = len(payload)
- }
- if err = pe.putBytes(payload); err != nil {
- return err
- }
- return pe.pop()
- }
- func (m *Message) decode(pd packetDecoder) (err error) {
- err = pd.push(newCRC32Field(crcIEEE))
- if err != nil {
- return err
- }
- m.Version, err = pd.getInt8()
- if err != nil {
- return err
- }
- if m.Version > 1 {
- return PacketDecodingError{fmt.Sprintf("unknown magic byte (%v)", m.Version)}
- }
- attribute, err := pd.getInt8()
- if err != nil {
- return err
- }
- m.Codec = CompressionCodec(attribute & compressionCodecMask)
- if m.Version == 1 {
- if err := (Timestamp{&m.Timestamp}).decode(pd); err != nil {
- return err
- }
- }
- m.Key, err = pd.getBytes()
- if err != nil {
- return err
- }
- m.Value, err = pd.getBytes()
- if err != nil {
- return err
- }
-
-
- m.compressedSize = len(m.Value)
- switch m.Codec {
- case CompressionNone:
-
- case CompressionGZIP:
- if m.Value == nil {
- break
- }
- reader, err := gzip.NewReader(bytes.NewReader(m.Value))
- if err != nil {
- return err
- }
- if m.Value, err = ioutil.ReadAll(reader); err != nil {
- return err
- }
- if err := m.decodeSet(); err != nil {
- return err
- }
- case CompressionSnappy:
- if m.Value == nil {
- break
- }
- if m.Value, err = snappy.Decode(m.Value); err != nil {
- return err
- }
- if err := m.decodeSet(); err != nil {
- return err
- }
- case CompressionLZ4:
- if m.Value == nil {
- break
- }
- reader := lz4.NewReader(bytes.NewReader(m.Value))
- if m.Value, err = ioutil.ReadAll(reader); err != nil {
- return err
- }
- if err := m.decodeSet(); err != nil {
- return err
- }
- case CompressionZSTD:
- if m.Value == nil {
- break
- }
- if m.Value, err = zstdDecompress(nil, m.Value); err != nil {
- return err
- }
- if err := m.decodeSet(); err != nil {
- return err
- }
- default:
- return PacketDecodingError{fmt.Sprintf("invalid compression specified (%d)", m.Codec)}
- }
- return pd.pop()
- }
- func (m *Message) decodeSet() (err error) {
- pd := realDecoder{raw: m.Value}
- m.Set = &MessageSet{}
- return m.Set.decode(&pd)
- }
|