|
@@ -2,6 +2,7 @@ package sarama
|
|
|
|
|
|
|
|
import (
|
|
import (
|
|
|
"bytes"
|
|
"bytes"
|
|
|
|
|
+ "code.google.com/p/snappy-go/snappy"
|
|
|
"compress/gzip"
|
|
"compress/gzip"
|
|
|
"io/ioutil"
|
|
"io/ioutil"
|
|
|
)
|
|
)
|
|
@@ -52,7 +53,10 @@ func (m *Message) encode(pe packetEncoder) error {
|
|
|
body = buf.Bytes()
|
|
body = buf.Bytes()
|
|
|
}
|
|
}
|
|
|
case COMPRESSION_SNAPPY:
|
|
case COMPRESSION_SNAPPY:
|
|
|
- // TODO
|
|
|
|
|
|
|
+ body, err = snappy.Encode(nil, m.Value)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return err
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
err = pe.putBytes(body)
|
|
err = pe.putBytes(body)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
@@ -108,7 +112,13 @@ func (m *Message) decode(pd packetDecoder) (err error) {
|
|
|
return err
|
|
return err
|
|
|
}
|
|
}
|
|
|
case COMPRESSION_SNAPPY:
|
|
case COMPRESSION_SNAPPY:
|
|
|
- // TODO
|
|
|
|
|
|
|
+ if m.Value == nil {
|
|
|
|
|
+ return DecodingError
|
|
|
|
|
+ }
|
|
|
|
|
+ m.Value, err = snappy.Decode(nil, m.Value)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return err
|
|
|
|
|
+ }
|
|
|
default:
|
|
default:
|
|
|
return DecodingError
|
|
return DecodingError
|
|
|
}
|
|
}
|