|
|
@@ -1,11 +1,71 @@
|
|
|
package kafka
|
|
|
|
|
|
+type compressionCodec int
|
|
|
+const (
|
|
|
+ COMPRESSION_NONE compressionCodec = 0
|
|
|
+ COMPRESSION_GZIP compressionCodec = 1
|
|
|
+ COMPRESSION_SNAPPY compressionCodec = 2
|
|
|
+)
|
|
|
+
|
|
|
+// The spec just says: "This is a version id used to allow backwards compatible evolution of the message
|
|
|
+// binary format." but it doesn't say what the current value is, so presumably 0...
|
|
|
+const MESSAGE_FORMAT int8 = 0
|
|
|
+
|
|
|
type message struct {
|
|
|
+ codec compressionCodec
|
|
|
+ key *[]byte
|
|
|
+ value *[]byte
|
|
|
}
|
|
|
|
|
|
func (m *message) encode(pe packetEncoder) {
|
|
|
+ pe.pushCRC32()
|
|
|
+
|
|
|
+ pe.putInt8(MESSAGE_FORMAT)
|
|
|
+
|
|
|
+ var attributes int8 = 0
|
|
|
+ attributes |= int8(m.codec & 0x07)
|
|
|
+ pe.putInt8(attributes)
|
|
|
+
|
|
|
+ pe.putBytes(m.key)
|
|
|
+ pe.putBytes(m.value)
|
|
|
+
|
|
|
+ pe.pop()
|
|
|
}
|
|
|
|
|
|
func (m *message) decode(pd packetDecoder) (err error) {
|
|
|
+ err = pd.pushCRC32()
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ format, err := pd.getInt8()
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ if format != MESSAGE_FORMAT {
|
|
|
+ return DecodingError{}
|
|
|
+ }
|
|
|
+
|
|
|
+ attribute, err := pd.getInt8()
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ m.codec = compressionCodec(attribute & 0x07)
|
|
|
+
|
|
|
+ m.key, err = pd.getBytes()
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ m.value, err = pd.getBytes()
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ err = pd.pop()
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
return nil
|
|
|
}
|