浏览代码

Compression codec is high three bits, not low

And apparently none-zero values for the low three bits mean your messages are
silently dropped by the broker. I've also updated the wiki to be more clear
which bits need setting.

Fixes #32.
Evan Huus 12 年之前
父节点
当前提交
7d6070c06e
共有 2 个文件被更改,包括 5 次插入5 次删除
  1. 3 3
      message.go
  2. 2 2
      message_test.go

+ 3 - 3
message.go

@@ -8,7 +8,7 @@ import (
 )
 
 // CompressionCodec represents the various compression codecs recognized by Kafka in messages.
-type CompressionCodec int8
+type CompressionCodec uint8
 
 const (
 	CompressionNone   CompressionCodec = 0
@@ -33,7 +33,7 @@ func (m *Message) encode(pe packetEncoder) error {
 
 	pe.putInt8(messageFormat)
 
-	attributes := int8(m.Codec) & 0x07
+	attributes := int8(m.Codec << 5)
 	pe.putInt8(attributes)
 
 	err := pe.putBytes(m.Key)
@@ -95,7 +95,7 @@ func (m *Message) decode(pd packetDecoder) (err error) {
 	if err != nil {
 		return err
 	}
-	m.Codec = CompressionCodec(attribute & 0x07)
+	m.Codec = CompressionCodec(attribute >> 5)
 
 	m.Key, err = pd.getBytes()
 	if err != nil {

+ 2 - 2
message_test.go

@@ -11,9 +11,9 @@ var (
 		0xFF, 0xFF, 0xFF, 0xFF} // value
 
 	emptyGzipMessage = []byte{
-		97, 79, 149, 90, //CRC
+		0xAA, 0x27, 0x4D, 0x22, //CRC
 		0x00,                   // magic version byte
-		0x01,                   // attribute flags
+		0x20,                   // attribute flags
 		0xFF, 0xFF, 0xFF, 0xFF, // key
 		// value
 		0x00, 0x00, 0x00, 0x17,