Browse Source

Fix partial messages handling

Kafka 2.0 introduced chunked message down conversions which might
produce partial messages at the end of the set. These partial messages
are not well formed beyond offset and length, so they might cause
strange decoding errors down the line.
This fix makes `lengthField` a `dynamicPushDecoder` so it checks that
the `packetDecoder` has at least that many bytes. It fixes the problem
above and I think it's a more robust check in general.
Vlad Hanciuta 7 years ago
parent
commit
682468dcc7
2 changed files with 90 additions and 1 deletions
  1. 76 0
      fetch_response_test.go
  2. 14 1
      length_field.go

+ 76 - 0
fetch_response_test.go

@@ -27,6 +27,29 @@ var (
 		0xFF, 0xFF, 0xFF, 0xFF,
 		0xFF, 0xFF, 0xFF, 0xFF,
 		0x00, 0x00, 0x00, 0x02, 0x00, 0xEE}
 		0x00, 0x00, 0x00, 0x02, 0x00, 0xEE}
 
 
+	overflowMessageFetchResponse = []byte{
+		0x00, 0x00, 0x00, 0x01,
+		0x00, 0x05, 't', 'o', 'p', 'i', 'c',
+		0x00, 0x00, 0x00, 0x01,
+		0x00, 0x00, 0x00, 0x05,
+		0x00, 0x01,
+		0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10,
+		0x00, 0x00, 0x00, 0x30,
+		// messageSet
+		0x00, 0x00, 0x00, 0x00, 0x00, 0x55, 0x00, 0x00,
+		0x00, 0x00, 0x00, 0x10,
+		// message
+		0x23, 0x96, 0x4a, 0xf7, // CRC
+		0x00,
+		0x00,
+		0xFF, 0xFF, 0xFF, 0xFF,
+		0x00, 0x00, 0x00, 0x02, 0x00, 0xEE,
+		// overflow messageSet
+		0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
+		0x00, 0x00, 0x00, 0xFF,
+		// overflow bytes
+		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
+
 	oneRecordFetchResponse = []byte{
 	oneRecordFetchResponse = []byte{
 		0x00, 0x00, 0x00, 0x00, // ThrottleTime
 		0x00, 0x00, 0x00, 0x00, // ThrottleTime
 		0x00, 0x00, 0x00, 0x01, // Number of Topics
 		0x00, 0x00, 0x00, 0x01, // Number of Topics
@@ -148,6 +171,59 @@ func TestOneMessageFetchResponse(t *testing.T) {
 	}
 	}
 }
 }
 
 
+func TestOverflowMessageFetchResponse(t *testing.T) {
+	response := FetchResponse{}
+	testVersionDecodable(t, "overflow message", &response, overflowMessageFetchResponse, 0)
+
+	if len(response.Blocks) != 1 {
+		t.Fatal("Decoding produced incorrect number of topic blocks.")
+	}
+
+	if len(response.Blocks["topic"]) != 1 {
+		t.Fatal("Decoding produced incorrect number of partition blocks for topic.")
+	}
+
+	block := response.GetBlock("topic", 5)
+	if block == nil {
+		t.Fatal("GetBlock didn't return block.")
+	}
+	if block.Err != ErrOffsetOutOfRange {
+		t.Error("Decoding didn't produce correct error code.")
+	}
+	if block.HighWaterMarkOffset != 0x10101010 {
+		t.Error("Decoding didn't produce correct high water mark offset.")
+	}
+	partial, err := block.Records.isPartial()
+	if err != nil {
+		t.Fatalf("Unexpected error: %v", err)
+	}
+	if !partial {
+		t.Error("Overflow messages should be partial.")
+	}
+
+	n, err := block.Records.numRecords()
+	if err != nil {
+		t.Fatalf("Unexpected error: %v", err)
+	}
+	if n != 1 {
+		t.Fatal("Decoding produced incorrect number of messages.")
+	}
+	msgBlock := block.Records.msgSet.Messages[0]
+	if msgBlock.Offset != 0x550000 {
+		t.Error("Decoding produced incorrect message offset.")
+	}
+	msg := msgBlock.Msg
+	if msg.Codec != CompressionNone {
+		t.Error("Decoding produced incorrect message compression.")
+	}
+	if msg.Key != nil {
+		t.Error("Decoding produced message key where there was none.")
+	}
+	if !bytes.Equal(msg.Value, []byte{0x00, 0xEE}) {
+		t.Error("Decoding produced incorrect message value.")
+	}
+}
+
 func TestOneRecordFetchResponse(t *testing.T) {
 func TestOneRecordFetchResponse(t *testing.T) {
 	response := FetchResponse{}
 	response := FetchResponse{}
 	testVersionDecodable(t, "one record", &response, oneRecordFetchResponse, 4)
 	testVersionDecodable(t, "one record", &response, oneRecordFetchResponse, 4)

+ 14 - 1
length_field.go

@@ -5,6 +5,19 @@ import "encoding/binary"
 // LengthField implements the PushEncoder and PushDecoder interfaces for calculating 4-byte lengths.
 // LengthField implements the PushEncoder and PushDecoder interfaces for calculating 4-byte lengths.
 type lengthField struct {
 type lengthField struct {
 	startOffset int
 	startOffset int
+	length      int32
+}
+
+func (l *lengthField) decode(pd packetDecoder) error {
+	var err error
+	l.length, err = pd.getInt32()
+	if err != nil {
+		return err
+	}
+	if l.length > int32(pd.remaining()) {
+		return ErrInsufficientData
+	}
+	return nil
 }
 }
 
 
 func (l *lengthField) saveOffset(in int) {
 func (l *lengthField) saveOffset(in int) {
@@ -21,7 +34,7 @@ func (l *lengthField) run(curOffset int, buf []byte) error {
 }
 }
 
 
 func (l *lengthField) check(curOffset int, buf []byte) error {
 func (l *lengthField) check(curOffset int, buf []byte) error {
-	if uint32(curOffset-l.startOffset-4) != binary.BigEndian.Uint32(buf[l.startOffset:]) {
+	if int32(curOffset-l.startOffset-4) != l.length {
 		return PacketDecodingError{"length field invalid"}
 		return PacketDecodingError{"length field invalid"}
 	}
 	}