浏览代码

Merge pull request #63 from Shopify/verbose_decoding_error

More informative logging around DecodingError, and bumped max msg size
Burke Libbey 12 年之前
父节点
当前提交
ac41aba73c
共有 8 个文件被更改,包括 25 次插入17 次删除
  1. 1 1
      broker.go
  2. 1 1
      crc32_field.go
  3. 1 1
      encoder_decoder.go
  4. 7 1
      errors.go
  5. 1 1
      length_field.go
  6. 4 4
      message.go
  7. 5 5
      real_decoder.go
  8. 5 3
      response_header.go

+ 1 - 1
broker.go

@@ -313,7 +313,7 @@ func (b *Broker) responseReceiver() {
 			continue
 			continue
 		}
 		}
 		if decodedHeader.correlationID != response.correlationID {
 		if decodedHeader.correlationID != response.correlationID {
-			response.errors <- DecodingError
+			response.errors <- DecodingError{Info: "CorrelationID didn't match"}
 			continue
 			continue
 		}
 		}
 
 

+ 1 - 1
crc32_field.go

@@ -28,7 +28,7 @@ func (c *crc32Field) check(curOffset int, buf []byte) error {
 	crc := crc32.ChecksumIEEE(buf[c.startOffset+4 : curOffset])
 	crc := crc32.ChecksumIEEE(buf[c.startOffset+4 : curOffset])
 
 
 	if crc != binary.BigEndian.Uint32(buf[c.startOffset:]) {
 	if crc != binary.BigEndian.Uint32(buf[c.startOffset:]) {
-		return DecodingError
+		return DecodingError{Info: "CRC didn't match"}
 	}
 	}
 
 
 	return nil
 	return nil

+ 1 - 1
encoder_decoder.go

@@ -49,7 +49,7 @@ func decode(buf []byte, in decoder) error {
 	}
 	}
 
 
 	if helper.off != len(buf) {
 	if helper.off != len(buf) {
-		return DecodingError
+		return DecodingError{Info: "Length was invalid"}
 	}
 	}
 
 
 	return nil
 	return nil

+ 7 - 1
errors.go

@@ -37,7 +37,13 @@ var InsufficientData = errors.New("kafka: Insufficient data to decode packet, mo
 
 
 // DecodingError is returned when there was an error (other than truncated data) decoding the Kafka broker's response.
 // DecodingError is returned when there was an error (other than truncated data) decoding the Kafka broker's response.
 // This can be a bad CRC or length field, or any other invalid value.
 // This can be a bad CRC or length field, or any other invalid value.
-var DecodingError = errors.New("kafka: Error while decoding packet.")
+type DecodingError struct {
+	Info string
+}
+
+func (err DecodingError) Error() string {
+	return fmt.Sprintf("kafka: Error while decoding packet: %s", err.Info)
+}
 
 
 // MessageTooLarge is returned when the next message to consume is larger than the configured MaxFetchSize
 // MessageTooLarge is returned when the next message to consume is larger than the configured MaxFetchSize
 var MessageTooLarge = errors.New("kafka: Message is larger than MaxFetchSize")
 var MessageTooLarge = errors.New("kafka: Message is larger than MaxFetchSize")

+ 1 - 1
length_field.go

@@ -22,7 +22,7 @@ func (l *lengthField) run(curOffset int, buf []byte) error {
 
 
 func (l *lengthField) check(curOffset int, buf []byte) error {
 func (l *lengthField) check(curOffset int, buf []byte) error {
 	if uint32(curOffset-l.startOffset-4) != binary.BigEndian.Uint32(buf[l.startOffset:]) {
 	if uint32(curOffset-l.startOffset-4) != binary.BigEndian.Uint32(buf[l.startOffset:]) {
-		return DecodingError
+		return DecodingError{Info: "Lengthfield check failed"}
 	}
 	}
 
 
 	return nil
 	return nil

+ 4 - 4
message.go

@@ -91,7 +91,7 @@ func (m *Message) decode(pd packetDecoder) (err error) {
 		return err
 		return err
 	}
 	}
 	if format != messageFormat {
 	if format != messageFormat {
-		return DecodingError
+		return DecodingError{Info: "Unexpected messageFormat"}
 	}
 	}
 
 
 	attribute, err := pd.getInt8()
 	attribute, err := pd.getInt8()
@@ -115,7 +115,7 @@ func (m *Message) decode(pd packetDecoder) (err error) {
 		// nothing to do
 		// nothing to do
 	case CompressionGZIP:
 	case CompressionGZIP:
 		if m.Value == nil {
 		if m.Value == nil {
-			return DecodingError
+			return DecodingError{Info: "GZIP compression specified, but no data to uncompress"}
 		}
 		}
 		reader, err := gzip.NewReader(bytes.NewReader(m.Value))
 		reader, err := gzip.NewReader(bytes.NewReader(m.Value))
 		if err != nil {
 		if err != nil {
@@ -127,14 +127,14 @@ func (m *Message) decode(pd packetDecoder) (err error) {
 		}
 		}
 	case CompressionSnappy:
 	case CompressionSnappy:
 		if m.Value == nil {
 		if m.Value == nil {
-			return DecodingError
+			return DecodingError{Info: "Snappy compression specified, but no data to uncompress"}
 		}
 		}
 		m.Value, err = snappy.Decode(nil, m.Value)
 		m.Value, err = snappy.Decode(nil, m.Value)
 		if err != nil {
 		if err != nil {
 			return err
 			return err
 		}
 		}
 	default:
 	default:
-		return DecodingError
+		return DecodingError{Info: "Invalid compression specified"}
 	}
 	}
 
 
 	err = pd.pop()
 	err = pd.pop()

+ 5 - 5
real_decoder.go

@@ -64,7 +64,7 @@ func (rd *realDecoder) getArrayLength() (int, error) {
 		rd.off = len(rd.raw)
 		rd.off = len(rd.raw)
 		return -1, InsufficientData
 		return -1, InsufficientData
 	} else if tmp > 2*math.MaxUint16 {
 	} else if tmp > 2*math.MaxUint16 {
-		return -1, DecodingError
+		return -1, DecodingError{Info: "getArrayLength failed: Invalid array length"}
 	}
 	}
 	return tmp, nil
 	return tmp, nil
 }
 }
@@ -82,7 +82,7 @@ func (rd *realDecoder) getBytes() ([]byte, error) {
 
 
 	switch {
 	switch {
 	case n < -1:
 	case n < -1:
-		return nil, DecodingError
+		return nil, DecodingError{Info: "getBytes failed"}
 	case n == -1:
 	case n == -1:
 		return nil, nil
 		return nil, nil
 	case n == 0:
 	case n == 0:
@@ -108,7 +108,7 @@ func (rd *realDecoder) getString() (string, error) {
 
 
 	switch {
 	switch {
 	case n < -1:
 	case n < -1:
-		return "", DecodingError
+		return "", DecodingError{Info: "getString failed"}
 	case n == -1:
 	case n == -1:
 		return "", nil
 		return "", nil
 	case n == 0:
 	case n == 0:
@@ -141,7 +141,7 @@ func (rd *realDecoder) getInt32Array() ([]int32, error) {
 	}
 	}
 
 
 	if n < 0 {
 	if n < 0 {
-		return nil, DecodingError
+		return nil, DecodingError{Info: "getInt32Array failed"}
 	}
 	}
 
 
 	ret := make([]int32, n)
 	ret := make([]int32, n)
@@ -170,7 +170,7 @@ func (rd *realDecoder) getInt64Array() ([]int64, error) {
 	}
 	}
 
 
 	if n < 0 {
 	if n < 0 {
-		return nil, DecodingError
+		return nil, DecodingError{Info: "getInt64Array failed"}
 	}
 	}
 
 
 	ret := make([]int64, n)
 	ret := make([]int64, n)

+ 5 - 3
response_header.go

@@ -1,19 +1,21 @@
 package sarama
 package sarama
 
 
-import "math"
+import "fmt"
 
 
 type responseHeader struct {
 type responseHeader struct {
 	length        int32
 	length        int32
 	correlationID int32
 	correlationID int32
 }
 }
 
 
+const maxMessageSize = 32 * 1024 * 1024 // 32MB
+
 func (r *responseHeader) decode(pd packetDecoder) (err error) {
 func (r *responseHeader) decode(pd packetDecoder) (err error) {
 	r.length, err = pd.getInt32()
 	r.length, err = pd.getInt32()
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
-	if r.length <= 4 || r.length > 2*math.MaxUint16 {
-		return DecodingError
+	if r.length <= 4 || r.length > maxMessageSize {
+		return DecodingError{Info: fmt.Sprintf("Message too large or too small. Got %d", r.length)}
 	}
 	}
 
 
 	r.correlationID, err = pd.getInt32()
 	r.correlationID, err = pd.getInt32()