Browse Source

make decoding unknown message versions error

If sarama encounters an unknown message version somehow (e.g. due to a
bug in Kafka 0.11's downconversion), right now sarama will carry on
decoding the message, resulting in much harder to understand errors.

Instead, bail out early with a friendly error message.
Tom Crayford 8 năm trước cách đây
mục cha
commit
920e2d0f8d
2 tập tin đã thay đổi với 36 bổ sung1 xóa
  1. 5 1
      message.go
  2. 31 0
      message_test.go

+ 5 - 1
message.go

@@ -122,13 +122,17 @@ func (m *Message) decode(pd packetDecoder) (err error) {
 		return err
 	}
 
+	if m.Version > 1 {
+		return PacketDecodingError{fmt.Sprintf("unknown magic byte (%v)", m.Version)}
+	}
+
 	attribute, err := pd.getInt8()
 	if err != nil {
 		return err
 	}
 	m.Codec = CompressionCodec(attribute & compressionCodecMask)
 
-	if m.Version >= 1 {
+	if m.Version == 1 {
 		millis, err := pd.getInt64()
 		if err != nil {
 			return err

+ 31 - 0
message_test.go

@@ -14,6 +14,21 @@ var (
 		0xFF, 0xFF, 0xFF, 0xFF, // key
 		0xFF, 0xFF, 0xFF, 0xFF} // value
 
+	emptyV1Message = []byte{
+		204, 47, 121, 217, // CRC
+		0x01,                                           // magic version byte
+		0x00,                                           // attribute flags
+		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // timestamp
+		0xFF, 0xFF, 0xFF, 0xFF, // key
+		0xFF, 0xFF, 0xFF, 0xFF} // value
+
+	emptyV2Message = []byte{
+		167, 236, 104, 3, // CRC
+		0x02,                   // magic version byte
+		0x00,                   // attribute flags
+		0xFF, 0xFF, 0xFF, 0xFF, // key
+		0xFF, 0xFF, 0xFF, 0xFF} // value
+
 	emptyGzipMessage = []byte{
 		97, 79, 149, 90, //CRC
 		0x00,                   // magic version byte
@@ -179,3 +194,19 @@ func TestMessageDecodingBulkLZ4(t *testing.T) {
 		t.Errorf("Decoding produced a set with %d messages, but 2 were expected.", len(message.Set.Messages))
 	}
 }
+
+func TestMessageDecodingVersion1(t *testing.T) {
+	message := Message{Version: 1}
+	testDecodable(t, "decoding empty v1 message", &message, emptyV1Message)
+}
+
+func TestMessageDecodingUnknownVersions(t *testing.T) {
+	message := Message{Version: 2}
+	err := decode(emptyV2Message, &message)
+	if err == nil {
+		t.Error("Decoding did not produce an error for an unknown magic byte")
+	}
+	if err.Error() != "kafka: error decoding packet: unknown magic byte (2)" {
+		t.Error("Decoding an unknown magic byte produced an unknown error ", err)
+	}
+}