| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115 |
- package protocol
- import enc "sarama/encoding"
- import (
- "bytes"
- "compress/gzip"
- "io/ioutil"
- "sarama/types"
- )
- // The spec just says: "This is a version id used to allow backwards compatible evolution of the message
- // binary format." but it doesn't say what the current value is, so presumably 0...
- const message_format int8 = 0
- type Message struct {
- Codec types.CompressionCodec // codec used to compress the message contents
- Key []byte // the message key, may be nil
- Value []byte // the message contents
- }
- func (m *Message) Encode(pe enc.PacketEncoder) error {
- pe.Push(&enc.CRC32Field{})
- pe.PutInt8(message_format)
- var attributes int8 = 0
- attributes |= int8(m.Codec) & 0x07
- pe.PutInt8(attributes)
- err := pe.PutBytes(m.Key)
- if err != nil {
- return err
- }
- var body []byte
- switch m.Codec {
- case types.COMPRESSION_NONE:
- body = m.Value
- case types.COMPRESSION_GZIP:
- if m.Value != nil {
- var buf bytes.Buffer
- writer := gzip.NewWriter(&buf)
- writer.Write(m.Value)
- writer.Close()
- body = buf.Bytes()
- }
- case types.COMPRESSION_SNAPPY:
- // TODO
- }
- err = pe.PutBytes(body)
- if err != nil {
- return err
- }
- return pe.Pop()
- }
- func (m *Message) Decode(pd enc.PacketDecoder) (err error) {
- err = pd.Push(&enc.CRC32Field{})
- if err != nil {
- return err
- }
- format, err := pd.GetInt8()
- if err != nil {
- return err
- }
- if format != message_format {
- return enc.DecodingError
- }
- attribute, err := pd.GetInt8()
- if err != nil {
- return err
- }
- m.Codec = types.CompressionCodec(attribute & 0x07)
- m.Key, err = pd.GetBytes()
- if err != nil {
- return err
- }
- m.Value, err = pd.GetBytes()
- if err != nil {
- return err
- }
- switch m.Codec {
- case types.COMPRESSION_NONE:
- // nothing to do
- case types.COMPRESSION_GZIP:
- if m.Value == nil {
- return enc.DecodingError
- }
- reader, err := gzip.NewReader(bytes.NewReader(m.Value))
- if err != nil {
- return err
- }
- m.Value, err = ioutil.ReadAll(reader)
- if err != nil {
- return err
- }
- case types.COMPRESSION_SNAPPY:
- // TODO
- default:
- return enc.DecodingError
- }
- err = pd.Pop()
- if err != nil {
- return err
- }
- return nil
- }
|