Browse Source

Merge pull request #1149 from wladh/master

Fix partial messages handling
Vlad Gorodetsky 7 years ago
parent
commit
647feef69a
8 changed files with 147 additions and 11 deletions
  1. 1 1
      .travis.yml
  2. 1 1
      dev.yml
  3. 17 6
      fetch_response.go
  4. 83 0
      fetch_response_test.go
  5. 14 1
      length_field.go
  6. 7 1
      message_set.go
  7. 21 0
      records.go
  8. 3 1
      utils.go

+ 1 - 1
.travis.yml

@@ -12,9 +12,9 @@ env:
   - KAFKA_HOSTNAME=localhost
   - DEBUG=true
   matrix:
-  - KAFKA_VERSION=0.11.0.2
   - KAFKA_VERSION=1.0.0
   - KAFKA_VERSION=1.1.0
+  - KAFKA_VERSION=2.0.0
 
 before_install:
 - export REPOSITORY_ROOT=${TRAVIS_BUILD_DIR}

+ 1 - 1
dev.yml

@@ -2,7 +2,7 @@ name: sarama
 
 up:
   - go:
-      version: '1.9'
+      version: '1.10'
 
 commands:
   test:

+ 17 - 6
fetch_response.go

@@ -104,15 +104,26 @@ func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error)
 			return err
 		}
 
-		// If we have at least one full records, we skip incomplete ones
-		if partial && len(b.RecordsSet) > 0 {
-			break
+		n, err := records.numRecords()
+		if err != nil {
+			return err
 		}
 
-		b.RecordsSet = append(b.RecordsSet, records)
+		if n > 0 || (partial && len(b.RecordsSet) == 0) {
+			b.RecordsSet = append(b.RecordsSet, records)
+
+			if b.Records == nil {
+				b.Records = records
+			}
+		}
 
-		if b.Records == nil {
-			b.Records = records
+		overflow, err := records.isOverflow()
+		if err != nil {
+			return err
+		}
+
+		if partial || overflow {
+			break
 		}
 	}
 

+ 83 - 0
fetch_response_test.go

@@ -27,6 +27,29 @@ var (
 		0xFF, 0xFF, 0xFF, 0xFF,
 		0x00, 0x00, 0x00, 0x02, 0x00, 0xEE}
 
+	overflowMessageFetchResponse = []byte{
+		0x00, 0x00, 0x00, 0x01,
+		0x00, 0x05, 't', 'o', 'p', 'i', 'c',
+		0x00, 0x00, 0x00, 0x01,
+		0x00, 0x00, 0x00, 0x05,
+		0x00, 0x01,
+		0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10,
+		0x00, 0x00, 0x00, 0x30,
+		// messageSet
+		0x00, 0x00, 0x00, 0x00, 0x00, 0x55, 0x00, 0x00,
+		0x00, 0x00, 0x00, 0x10,
+		// message
+		0x23, 0x96, 0x4a, 0xf7, // CRC
+		0x00,
+		0x00,
+		0xFF, 0xFF, 0xFF, 0xFF,
+		0x00, 0x00, 0x00, 0x02, 0x00, 0xEE,
+		// overflow messageSet
+		0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
+		0x00, 0x00, 0x00, 0xFF,
+		// overflow bytes
+		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
+
 	oneRecordFetchResponse = []byte{
 		0x00, 0x00, 0x00, 0x00, // ThrottleTime
 		0x00, 0x00, 0x00, 0x01, // Number of Topics
@@ -148,6 +171,66 @@ func TestOneMessageFetchResponse(t *testing.T) {
 	}
 }
 
+func TestOverflowMessageFetchResponse(t *testing.T) {
+	response := FetchResponse{}
+	testVersionDecodable(t, "overflow message", &response, overflowMessageFetchResponse, 0)
+
+	if len(response.Blocks) != 1 {
+		t.Fatal("Decoding produced incorrect number of topic blocks.")
+	}
+
+	if len(response.Blocks["topic"]) != 1 {
+		t.Fatal("Decoding produced incorrect number of partition blocks for topic.")
+	}
+
+	block := response.GetBlock("topic", 5)
+	if block == nil {
+		t.Fatal("GetBlock didn't return block.")
+	}
+	if block.Err != ErrOffsetOutOfRange {
+		t.Error("Decoding didn't produce correct error code.")
+	}
+	if block.HighWaterMarkOffset != 0x10101010 {
+		t.Error("Decoding didn't produce correct high water mark offset.")
+	}
+	partial, err := block.Records.isPartial()
+	if err != nil {
+		t.Fatalf("Unexpected error: %v", err)
+	}
+	if partial {
+		t.Error("Decoding detected a partial trailing message where there wasn't one.")
+	}
+	overflow, err := block.Records.isOverflow()
+	if err != nil {
+		t.Fatalf("Unexpected error: %v", err)
+	}
+	if !overflow {
+		t.Error("Decoding detected a partial trailing message where there wasn't one.")
+	}
+
+	n, err := block.Records.numRecords()
+	if err != nil {
+		t.Fatalf("Unexpected error: %v", err)
+	}
+	if n != 1 {
+		t.Fatal("Decoding produced incorrect number of messages.")
+	}
+	msgBlock := block.Records.MsgSet.Messages[0]
+	if msgBlock.Offset != 0x550000 {
+		t.Error("Decoding produced incorrect message offset.")
+	}
+	msg := msgBlock.Msg
+	if msg.Codec != CompressionNone {
+		t.Error("Decoding produced incorrect message compression.")
+	}
+	if msg.Key != nil {
+		t.Error("Decoding produced message key where there was none.")
+	}
+	if !bytes.Equal(msg.Value, []byte{0x00, 0xEE}) {
+		t.Error("Decoding produced incorrect message value.")
+	}
+}
+
 func TestOneRecordFetchResponse(t *testing.T) {
 	response := FetchResponse{}
 	testVersionDecodable(t, "one record", &response, oneRecordFetchResponse, 4)

+ 14 - 1
length_field.go

@@ -5,6 +5,19 @@ import "encoding/binary"
 // LengthField implements the PushEncoder and PushDecoder interfaces for calculating 4-byte lengths.
 type lengthField struct {
 	startOffset int
+	length      int32
+}
+
+func (l *lengthField) decode(pd packetDecoder) error {
+	var err error
+	l.length, err = pd.getInt32()
+	if err != nil {
+		return err
+	}
+	if l.length > int32(pd.remaining()) {
+		return ErrInsufficientData
+	}
+	return nil
 }
 
 func (l *lengthField) saveOffset(in int) {
@@ -21,7 +34,7 @@ func (l *lengthField) run(curOffset int, buf []byte) error {
 }
 
 func (l *lengthField) check(curOffset int, buf []byte) error {
-	if uint32(curOffset-l.startOffset-4) != binary.BigEndian.Uint32(buf[l.startOffset:]) {
+	if int32(curOffset-l.startOffset-4) != l.length {
 		return PacketDecodingError{"length field invalid"}
 	}
 

+ 7 - 1
message_set.go

@@ -47,6 +47,7 @@ func (msb *MessageBlock) decode(pd packetDecoder) (err error) {
 
 type MessageSet struct {
 	PartialTrailingMessage bool // whether the set on the wire contained an incomplete trailing MessageBlock
+	OverflowMessage        bool // whether the set on the wire contained an overflow message
 	Messages               []*MessageBlock
 }
 
@@ -85,7 +86,12 @@ func (ms *MessageSet) decode(pd packetDecoder) (err error) {
 		case ErrInsufficientData:
 			// As an optimization the server is allowed to return a partial message at the
 			// end of the message set. Clients should handle this case. So we just ignore such things.
-			ms.PartialTrailingMessage = true
+			if msb.Offset == -1 {
+				// This is an overflow message caused by chunked down conversion
+				ms.OverflowMessage = true
+			} else {
+				ms.PartialTrailingMessage = true
+			}
 			return nil
 		default:
 			return err

+ 21 - 0
records.go

@@ -163,6 +163,27 @@ func (r *Records) isControl() (bool, error) {
 	return false, fmt.Errorf("unknown records type: %v", r.recordsType)
 }
 
+func (r *Records) isOverflow() (bool, error) {
+	if r.recordsType == unknownRecords {
+		if empty, err := r.setTypeFromFields(); err != nil || empty {
+			return false, err
+		}
+	}
+
+	switch r.recordsType {
+	case unknownRecords:
+		return false, nil
+	case legacyRecords:
+		if r.MsgSet == nil {
+			return false, nil
+		}
+		return r.MsgSet.OverflowMessage, nil
+	case defaultRecords:
+		return false, nil
+	}
+	return false, fmt.Errorf("unknown records type: %v", r.recordsType)
+}
+
 func magicValue(pd packetDecoder) (int8, error) {
 	dec, err := pd.peek(magicOffset, magicLength)
 	if err != nil {

+ 3 - 1
utils.go

@@ -155,6 +155,7 @@ var (
 	V0_11_0_2 = newKafkaVersion(0, 11, 0, 2)
 	V1_0_0_0  = newKafkaVersion(1, 0, 0, 0)
 	V1_1_0_0  = newKafkaVersion(1, 1, 0, 0)
+	V2_0_0_0  = newKafkaVersion(2, 0, 0, 0)
 
 	SupportedVersions = []KafkaVersion{
 		V0_8_2_0,
@@ -173,9 +174,10 @@ var (
 		V0_11_0_2,
 		V1_0_0_0,
 		V1_1_0_0,
+		V2_0_0_0,
 	}
 	MinVersion = V0_8_2_0
-	MaxVersion = V1_1_0_0
+	MaxVersion = V2_0_0_0
 )
 
 func ParseKafkaVersion(s string) (KafkaVersion, error) {