Browse Source

Support nil compressed messages

Per https://kafka.apache.org/documentation.html#compaction messages with null
payloads are valid when using log compaction. Make this work for all messages,
not just uncompressed ones. Should fix #634.
Evan Huus 9 years ago
parent
commit
87fb5e3465
1 changed files with 3 additions and 3 deletions
  1. 3 3
      message.go

+ 3 - 3
message.go

@@ -50,7 +50,7 @@ func (m *Message) encode(pe packetEncoder) error {
 	if m.compressedCache != nil {
 	if m.compressedCache != nil {
 		payload = m.compressedCache
 		payload = m.compressedCache
 		m.compressedCache = nil
 		m.compressedCache = nil
-	} else {
+	} else if m.Value != nil {
 		switch m.Codec {
 		switch m.Codec {
 		case CompressionNone:
 		case CompressionNone:
 			payload = m.Value
 			payload = m.Value
@@ -116,7 +116,7 @@ func (m *Message) decode(pd packetDecoder) (err error) {
 		// nothing to do
 		// nothing to do
 	case CompressionGZIP:
 	case CompressionGZIP:
 		if m.Value == nil {
 		if m.Value == nil {
-			return PacketDecodingError{"GZIP compression specified, but no data to uncompress"}
+			break
 		}
 		}
 		reader, err := gzip.NewReader(bytes.NewReader(m.Value))
 		reader, err := gzip.NewReader(bytes.NewReader(m.Value))
 		if err != nil {
 		if err != nil {
@@ -130,7 +130,7 @@ func (m *Message) decode(pd packetDecoder) (err error) {
 		}
 		}
 	case CompressionSnappy:
 	case CompressionSnappy:
 		if m.Value == nil {
 		if m.Value == nil {
-			return PacketDecodingError{"Snappy compression specified, but no data to uncompress"}
+			break
 		}
 		}
 		if m.Value, err = snappyDecode(m.Value); err != nil {
 		if m.Value, err = snappyDecode(m.Value); err != nil {
 			return err
 			return err