Parcourir la source

Differentiate between decoding errors (like CRC errors) and insufficient data

Evan Huus il y a 12 ans
Parent
commit
8db9b32aa0

+ 13 - 1
protocol/errors.go

@@ -1,6 +1,9 @@
 package protocol
 
-import "errors"
+import (
+	"errors"
+	"fmt"
+)
 
 // The various errors that can be returned by the Kafka server.
 // See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes
@@ -75,6 +78,15 @@ func (err EncodingError) Error() string {
 	return "kafka: Could not encode packet. " + string(err)
 }
 
+// InsufficientData is returned when decoding and the packet is truncated. This can be expected
+// when requesting messages, since as an optimization the server is allowed to return a partial message at the end
+// of the message set.
+type InsufficientData int
+
+func (err InsufficientData) Error() string {
+	return fmt.Sprintf("kafka: Insufficient data to decode packet, at least %d more bytes expected.", int(err))
+}
+
 // Returned when there was an error decoding the Kafka server's response. Usually means that you've
 // connected to the wrong address.
 type DecodingError string

+ 1 - 1
protocol/offset_commit_response.go

@@ -2,7 +2,7 @@ package protocol
 
 type OffsetCommitResponse struct {
 	ClientID string
-	Errors map[string]map[int32]KError
+	Errors   map[string]map[int32]KError
 }
 
 func (r *OffsetCommitResponse) decode(pd packetDecoder) (err error) {

+ 1 - 1
protocol/offset_fetch_response.go

@@ -24,7 +24,7 @@ func (r *OffsetFetchResponseBlock) decode(pd packetDecoder) (err error) {
 
 type OffsetFetchResponse struct {
 	ClientID string
-	Blocks map[string]map[int32]*OffsetFetchResponseBlock
+	Blocks   map[string]map[int32]*OffsetFetchResponseBlock
 }
 
 func (r *OffsetFetchResponse) decode(pd packetDecoder) (err error) {

+ 16 - 14
protocol/real_decoder.go

@@ -19,7 +19,7 @@ func (rd *realDecoder) remaining() int {
 
 func (rd *realDecoder) getInt8() (int8, error) {
 	if rd.remaining() < 1 {
-		return -1, DecodingError("Insufficient data in getInt8.")
+		return -1, InsufficientData(1)
 	}
 	tmp := int8(rd.raw[rd.off])
 	rd.off += 1
@@ -28,7 +28,7 @@ func (rd *realDecoder) getInt8() (int8, error) {
 
 func (rd *realDecoder) getInt16() (int16, error) {
 	if rd.remaining() < 2 {
-		return -1, DecodingError("Insufficient data in getInt16.")
+		return -1, InsufficientData(2 - rd.remaining())
 	}
 	tmp := int16(binary.BigEndian.Uint16(rd.raw[rd.off:]))
 	rd.off += 2
@@ -37,7 +37,7 @@ func (rd *realDecoder) getInt16() (int16, error) {
 
 func (rd *realDecoder) getInt32() (int32, error) {
 	if rd.remaining() < 4 {
-		return -1, DecodingError("Insufficient data in getInt32.")
+		return -1, InsufficientData(4 - rd.remaining())
 	}
 	tmp := int32(binary.BigEndian.Uint32(rd.raw[rd.off:]))
 	rd.off += 4
@@ -46,7 +46,7 @@ func (rd *realDecoder) getInt32() (int32, error) {
 
 func (rd *realDecoder) getInt64() (int64, error) {
 	if rd.remaining() < 8 {
-		return -1, DecodingError("Insufficient data in getInt64.")
+		return -1, InsufficientData(8 - rd.remaining())
 	}
 	tmp := int64(binary.BigEndian.Uint64(rd.raw[rd.off:]))
 	rd.off += 8
@@ -57,14 +57,14 @@ func (rd *realDecoder) getInt64() (int64, error) {
 
 func (rd *realDecoder) getInt32Array() ([]int32, error) {
 	if rd.remaining() < 4 {
-		return nil, DecodingError("Insufficient data in getInt32Array.")
+		return nil, InsufficientData(4 - rd.remaining())
 	}
 	n := int(binary.BigEndian.Uint32(rd.raw[rd.off:]))
 	rd.off += 4
 
 	var ret []int32 = nil
 	if rd.remaining() < 4*n {
-		return nil, DecodingError("Insufficient data in getInt32Array.")
+		return nil, InsufficientData(4*n - rd.remaining())
 	} else if n > 0 {
 		ret = make([]int32, n)
 		for i := range ret {
@@ -77,14 +77,14 @@ func (rd *realDecoder) getInt32Array() ([]int32, error) {
 
 func (rd *realDecoder) getInt64Array() ([]int64, error) {
 	if rd.remaining() < 4 {
-		return nil, DecodingError("Insufficient data in getInt64Array.")
+		return nil, InsufficientData(4 - rd.remaining())
 	}
 	n := int(binary.BigEndian.Uint32(rd.raw[rd.off:]))
 	rd.off += 4
 
 	var ret []int64 = nil
 	if rd.remaining() < 8*n {
-		return nil, DecodingError("Insufficient data in getInt64Array.")
+		return nil, InsufficientData(8*n - rd.remaining())
 	} else if n > 0 {
 		ret = make([]int64, n)
 		for i := range ret {
@@ -97,11 +97,13 @@ func (rd *realDecoder) getInt64Array() ([]int64, error) {
 
 func (rd *realDecoder) getArrayCount() (int, error) {
 	if rd.remaining() < 4 {
-		return -1, DecodingError("Insufficient data in getArrayCount.")
+		return -1, InsufficientData(4 - rd.remaining())
 	}
 	tmp := int(binary.BigEndian.Uint32(rd.raw[rd.off:]))
 	rd.off += 4
-	if tmp > rd.remaining() || tmp > 2*math.MaxUint16 {
+	if tmp > rd.remaining() {
+		return -1, InsufficientData(tmp - rd.remaining())
+	} else if tmp > 2*math.MaxUint16 {
 		return -1, DecodingError("Array absurdly long in getArrayCount.")
 	}
 	return tmp, nil
@@ -131,7 +133,7 @@ func (rd *realDecoder) getString() (string, error) {
 	case n == 0:
 		return "", nil
 	case n > rd.remaining():
-		return "", DecodingError("String too long in getString.")
+		return "", InsufficientData(rd.remaining() - n)
 	default:
 		tmp := string(rd.raw[rd.off : rd.off+n])
 		rd.off += n
@@ -156,7 +158,7 @@ func (rd *realDecoder) getBytes() ([]byte, error) {
 	case n == 0:
 		return make([]byte, 0), nil
 	case n > rd.remaining():
-		return nil, DecodingError("Bytes too long in getBytes.")
+		return nil, InsufficientData(rd.remaining() - n)
 	default:
 		tmp := rd.raw[rd.off : rd.off+n]
 		rd.off += n
@@ -165,8 +167,8 @@ func (rd *realDecoder) getBytes() ([]byte, error) {
 }
 
 func (rd *realDecoder) getSubset(length int) (packetDecoder, error) {
-	if rd.remaining() < length {
-		return nil, DecodingError("Not enough data for subset.")
+	if length > rd.remaining() {
+		return nil, InsufficientData(length - rd.remaining())
 	}
 
 	return &realDecoder{raw: rd.raw[rd.off : rd.off+length]}, nil