Browse Source

Make PacketEncodingError and PacketDecodingError more similar to each other.

Willem van Bergen 10 years ago
parent
commit
566627d2ad
12 changed files with 46 additions and 38 deletions
  1. 1 3
      broker.go
  2. 2 2
      client.go
  3. 1 1
      crc32_field.go
  4. 4 2
      encoder_decoder.go
  5. 16 10
      errors.go
  6. 1 1
      length_field.go
  7. 6 5
      message.go
  8. 5 4
      prep_encoder.go
  9. 2 2
      producer.go
  10. 5 5
      real_decoder.go
  11. 1 1
      response_header.go
  12. 2 2
      sarama.go

+ 1 - 3
broker.go

@@ -384,9 +384,7 @@ func (b *Broker) responseReceiver() {
 		if decodedHeader.correlationID != response.correlationID {
 		if decodedHeader.correlationID != response.correlationID {
 			// TODO if decoded ID < cur ID, discard until we catch up
 			// TODO if decoded ID < cur ID, discard until we catch up
 			// TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response
 			// TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response
-			response.errors <- DecodingError{
-				Info: fmt.Sprintf("CorrelationID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID),
-			}
+			response.errors <- PacketDecodingError{fmt.Sprintf("CorrelationID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)}
 			continue
 			continue
 		}
 		}
 
 

+ 2 - 2
client.go

@@ -532,7 +532,7 @@ func (client *Client) refreshMetadata(topics []string, retriesRemaining int) err
 		}
 		}
 		response, err := broker.GetMetadata(client.id, &MetadataRequest{Topics: topics})
 		response, err := broker.GetMetadata(client.id, &MetadataRequest{Topics: topics})
 
 
-		switch err {
+		switch err.(type) {
 		case nil:
 		case nil:
 			// valid response, use it
 			// valid response, use it
 			retry, err := client.update(response)
 			retry, err := client.update(response)
@@ -548,7 +548,7 @@ func (client *Client) refreshMetadata(topics []string, retriesRemaining int) err
 			}
 			}
 
 
 			return err
 			return err
-		case ErrPacketEncodingFailure:
+		case PacketEncodingError:
 			// didn't even send, return the error
 			// didn't even send, return the error
 			return err
 			return err
 		default:
 		default:

+ 1 - 1
crc32_field.go

@@ -28,7 +28,7 @@ func (c *crc32Field) check(curOffset int, buf []byte) error {
 	crc := crc32.ChecksumIEEE(buf[c.startOffset+4 : curOffset])
 	crc := crc32.ChecksumIEEE(buf[c.startOffset+4 : curOffset])
 
 
 	if crc != binary.BigEndian.Uint32(buf[c.startOffset:]) {
 	if crc != binary.BigEndian.Uint32(buf[c.startOffset:]) {
-		return DecodingError{Info: "CRC didn't match"}
+		return PacketDecodingError{"CRC didn't match"}
 	}
 	}
 
 
 	return nil
 	return nil

+ 4 - 2
encoder_decoder.go

@@ -1,5 +1,7 @@
 package sarama
 package sarama
 
 
+import "fmt"
+
 // Encoder is the interface that wraps the basic Encode method.
 // Encoder is the interface that wraps the basic Encode method.
 // Anything implementing Encoder can be turned into bytes using Kafka's encoding rules.
 // Anything implementing Encoder can be turned into bytes using Kafka's encoding rules.
 type encoder interface {
 type encoder interface {
@@ -21,7 +23,7 @@ func encode(in encoder) ([]byte, error) {
 	}
 	}
 
 
 	if prepEnc.length < 0 || uint32(prepEnc.length) > MaxRequestSize {
 	if prepEnc.length < 0 || uint32(prepEnc.length) > MaxRequestSize {
-		return nil, ErrPacketEncodingFailure
+		return nil, PacketEncodingError{fmt.Sprintf("Invalid request size: %d", prepEnc.length)}
 	}
 	}
 
 
 	realEnc.raw = make([]byte, prepEnc.length)
 	realEnc.raw = make([]byte, prepEnc.length)
@@ -53,7 +55,7 @@ func decode(buf []byte, in decoder) error {
 	}
 	}
 
 
 	if helper.off != len(buf) {
 	if helper.off != len(buf) {
-		return DecodingError{Info: "Length was invalid"}
+		return PacketDecodingError{"Length was invalid"}
 	}
 	}
 
 
 	return nil
 	return nil

+ 16 - 10
errors.go

@@ -26,10 +26,6 @@ var ErrAlreadyConnected = errors.New("kafka: broker already connected")
 // ErrNotConnected is the error returned when trying to send or call Close() on a Broker that is not connected.
 // ErrNotConnected is the error returned when trying to send or call Close() on a Broker that is not connected.
 var ErrNotConnected = errors.New("kafka: broker not connected")
 var ErrNotConnected = errors.New("kafka: broker not connected")
 
 
-// ErrPacketEncodingFailure is returned from a failure while encoding a Kafka packet. This can happen, for example,
-// if you try to encode a string over 2^15 characters in length, since Kafka's encoding rules do not permit that.
-var ErrPacketEncodingFailure = errors.New("kafka: Error while encoding packet")
-
 // ErrInsufficientData is returned when decoding and the packet is truncated. This can be expected
 // ErrInsufficientData is returned when decoding and the packet is truncated. This can be expected
 // when requesting messages, since as an optimization the server is allowed to return a partial message at the end
 // when requesting messages, since as an optimization the server is allowed to return a partial message at the end
 // of the message set.
 // of the message set.
@@ -39,19 +35,29 @@ var ErrInsufficientData = errors.New("kafka: Insufficient data to decode packet,
 // ErrShuttingDown is returned when a producer receives a message during shutdown.
 // ErrShuttingDown is returned when a producer receives a message during shutdown.
 var ErrShuttingDown = errors.New("kafka: Message received by producer in process of shutting down")
 var ErrShuttingDown = errors.New("kafka: Message received by producer in process of shutting down")
 
 
-// DecodingError is returned when there was an error (other than truncated data) decoding the Kafka broker's response.
+// ErrMessageTooLarge is returned when the next message to consume is larger than the configured MaxFetchSize
+var ErrMessageTooLarge = errors.New("kafka: Message is larger than MaxFetchSize")
+
+// PacketEncodingError is returned from a failure while encoding a Kafka packet. This can happen, for example,
+// if you try to encode a string over 2^15 characters in length, since Kafka's encoding rules do not permit that.
+type PacketEncodingError struct {
+	Info string
+}
+
+func (err PacketEncodingError) Error() string {
+	return fmt.Sprintf("kafka: Error while encoding packet: %s", err.Info)
+}
+
+// PacketDecodingError is returned when there was an error (other than truncated data) decoding the Kafka broker's response.
 // This can be a bad CRC or length field, or any other invalid value.
 // This can be a bad CRC or length field, or any other invalid value.
-type DecodingError struct {
+type PacketDecodingError struct {
 	Info string
 	Info string
 }
 }
 
 
-func (err DecodingError) Error() string {
+func (err PacketDecodingError) Error() string {
 	return fmt.Sprintf("kafka: Error while decoding packet: %s", err.Info)
 	return fmt.Sprintf("kafka: Error while decoding packet: %s", err.Info)
 }
 }
 
 
-// ErrMessageTooLarge is returned when the next message to consume is larger than the configured MaxFetchSize
-var ErrMessageTooLarge = errors.New("kafka: Message is larger than MaxFetchSize")
-
 // ConfigurationError is the type of error returned from NewClient, NewProducer or NewConsumer when the specified
 // ConfigurationError is the type of error returned from NewClient, NewProducer or NewConsumer when the specified
 // configuration is invalid.
 // configuration is invalid.
 type ConfigurationError string
 type ConfigurationError string

+ 1 - 1
length_field.go

@@ -22,7 +22,7 @@ func (l *lengthField) run(curOffset int, buf []byte) error {
 
 
 func (l *lengthField) check(curOffset int, buf []byte) error {
 func (l *lengthField) check(curOffset int, buf []byte) error {
 	if uint32(curOffset-l.startOffset-4) != binary.BigEndian.Uint32(buf[l.startOffset:]) {
 	if uint32(curOffset-l.startOffset-4) != binary.BigEndian.Uint32(buf[l.startOffset:]) {
-		return DecodingError{Info: "Lengthfield check failed"}
+		return PacketDecodingError{"Lengthfield check failed"}
 	}
 	}
 
 
 	return nil
 	return nil

+ 6 - 5
message.go

@@ -3,6 +3,7 @@ package sarama
 import (
 import (
 	"bytes"
 	"bytes"
 	"compress/gzip"
 	"compress/gzip"
+	"fmt"
 	"io/ioutil"
 	"io/ioutil"
 )
 )
 
 
@@ -72,7 +73,7 @@ func (m *Message) encode(pe packetEncoder) error {
 			m.compressedCache = tmp
 			m.compressedCache = tmp
 			payload = m.compressedCache
 			payload = m.compressedCache
 		default:
 		default:
-			return ErrPacketEncodingFailure
+			return PacketEncodingError{fmt.Sprintf("Unsupported compression codec: %d", m.Codec)}
 		}
 		}
 	}
 	}
 
 
@@ -94,7 +95,7 @@ func (m *Message) decode(pd packetDecoder) (err error) {
 		return err
 		return err
 	}
 	}
 	if format != messageFormat {
 	if format != messageFormat {
-		return DecodingError{Info: "Unexpected messageFormat"}
+		return PacketDecodingError{"Unexpected messageFormat"}
 	}
 	}
 
 
 	attribute, err := pd.getInt8()
 	attribute, err := pd.getInt8()
@@ -118,7 +119,7 @@ func (m *Message) decode(pd packetDecoder) (err error) {
 		// nothing to do
 		// nothing to do
 	case CompressionGZIP:
 	case CompressionGZIP:
 		if m.Value == nil {
 		if m.Value == nil {
-			return DecodingError{Info: "GZIP compression specified, but no data to uncompress"}
+			return PacketDecodingError{"GZIP compression specified, but no data to uncompress"}
 		}
 		}
 		reader, err := gzip.NewReader(bytes.NewReader(m.Value))
 		reader, err := gzip.NewReader(bytes.NewReader(m.Value))
 		if err != nil {
 		if err != nil {
@@ -130,14 +131,14 @@ func (m *Message) decode(pd packetDecoder) (err error) {
 		return m.decodeSet()
 		return m.decodeSet()
 	case CompressionSnappy:
 	case CompressionSnappy:
 		if m.Value == nil {
 		if m.Value == nil {
-			return DecodingError{Info: "Snappy compression specified, but no data to uncompress"}
+			return PacketDecodingError{"Snappy compression specified, but no data to uncompress"}
 		}
 		}
 		if m.Value, err = snappyDecode(m.Value); err != nil {
 		if m.Value, err = snappyDecode(m.Value); err != nil {
 			return err
 			return err
 		}
 		}
 		return m.decodeSet()
 		return m.decodeSet()
 	default:
 	default:
-		return DecodingError{Info: "Invalid compression specified"}
+		return PacketDecodingError{fmt.Sprintf("Invalid compression specified: %d", m.Codec)}
 	}
 	}
 
 
 	err = pd.pop()
 	err = pd.pop()

+ 5 - 4
prep_encoder.go

@@ -2,6 +2,7 @@ package sarama
 
 
 import (
 import (
 	"encoding/binary"
 	"encoding/binary"
+	"fmt"
 	"math"
 	"math"
 )
 )
 
 
@@ -29,7 +30,7 @@ func (pe *prepEncoder) putInt64(in int64) {
 
 
 func (pe *prepEncoder) putArrayLength(in int) error {
 func (pe *prepEncoder) putArrayLength(in int) error {
 	if in > math.MaxInt32 {
 	if in > math.MaxInt32 {
-		return ErrPacketEncodingFailure
+		return PacketEncodingError{fmt.Sprintf("Array too long: %d", in)}
 	}
 	}
 	pe.length += 4
 	pe.length += 4
 	return nil
 	return nil
@@ -43,7 +44,7 @@ func (pe *prepEncoder) putBytes(in []byte) error {
 		return nil
 		return nil
 	}
 	}
 	if len(in) > math.MaxInt32 {
 	if len(in) > math.MaxInt32 {
-		return ErrPacketEncodingFailure
+		return PacketEncodingError{fmt.Sprintf("Byteslice too long: %d", len(in))}
 	}
 	}
 	pe.length += len(in)
 	pe.length += len(in)
 	return nil
 	return nil
@@ -51,7 +52,7 @@ func (pe *prepEncoder) putBytes(in []byte) error {
 
 
 func (pe *prepEncoder) putRawBytes(in []byte) error {
 func (pe *prepEncoder) putRawBytes(in []byte) error {
 	if len(in) > math.MaxInt32 {
 	if len(in) > math.MaxInt32 {
-		return ErrPacketEncodingFailure
+		return PacketEncodingError{fmt.Sprintf("Byteslice too long: %d", len(in))}
 	}
 	}
 	pe.length += len(in)
 	pe.length += len(in)
 	return nil
 	return nil
@@ -60,7 +61,7 @@ func (pe *prepEncoder) putRawBytes(in []byte) error {
 func (pe *prepEncoder) putString(in string) error {
 func (pe *prepEncoder) putString(in string) error {
 	pe.length += 2
 	pe.length += 2
 	if len(in) > math.MaxInt16 {
 	if len(in) > math.MaxInt16 {
-		return ErrPacketEncodingFailure
+		return PacketEncodingError{fmt.Sprintf("String too long: %d", len(in))}
 	}
 	}
 	pe.length += len(in)
 	pe.length += len(in)
 	return nil
 	return nil

+ 2 - 2
producer.go

@@ -583,10 +583,10 @@ func (p *Producer) flusher(broker *Broker, input chan []*MessageToSend) {
 
 
 		response, err := broker.Produce(p.client.id, request)
 		response, err := broker.Produce(p.client.id, request)
 
 
-		switch err {
+		switch err.(type) {
 		case nil:
 		case nil:
 			break
 			break
-		case ErrPacketEncodingFailure:
+		case PacketEncodingError:
 			p.returnErrors(batch, err)
 			p.returnErrors(batch, err)
 			continue
 			continue
 		default:
 		default:

+ 5 - 5
real_decoder.go

@@ -64,7 +64,7 @@ func (rd *realDecoder) getArrayLength() (int, error) {
 		rd.off = len(rd.raw)
 		rd.off = len(rd.raw)
 		return -1, ErrInsufficientData
 		return -1, ErrInsufficientData
 	} else if tmp > 2*math.MaxUint16 {
 	} else if tmp > 2*math.MaxUint16 {
-		return -1, DecodingError{Info: "getArrayLength failed: Invalid array length"}
+		return -1, PacketDecodingError{"getArrayLength failed: Invalid array length"}
 	}
 	}
 	return tmp, nil
 	return tmp, nil
 }
 }
@@ -82,7 +82,7 @@ func (rd *realDecoder) getBytes() ([]byte, error) {
 
 
 	switch {
 	switch {
 	case n < -1:
 	case n < -1:
-		return nil, DecodingError{Info: "getBytes failed"}
+		return nil, PacketDecodingError{"getBytes failed: Invalid length"}
 	case n == -1:
 	case n == -1:
 		return nil, nil
 		return nil, nil
 	case n == 0:
 	case n == 0:
@@ -108,7 +108,7 @@ func (rd *realDecoder) getString() (string, error) {
 
 
 	switch {
 	switch {
 	case n < -1:
 	case n < -1:
-		return "", DecodingError{Info: "getString failed"}
+		return "", PacketDecodingError{"getString failed: invalid length"}
 	case n == -1:
 	case n == -1:
 		return "", nil
 		return "", nil
 	case n == 0:
 	case n == 0:
@@ -141,7 +141,7 @@ func (rd *realDecoder) getInt32Array() ([]int32, error) {
 	}
 	}
 
 
 	if n < 0 {
 	if n < 0 {
-		return nil, DecodingError{Info: "getInt32Array failed"}
+		return nil, PacketDecodingError{"getInt32Array failed: invalid length"}
 	}
 	}
 
 
 	ret := make([]int32, n)
 	ret := make([]int32, n)
@@ -170,7 +170,7 @@ func (rd *realDecoder) getInt64Array() ([]int64, error) {
 	}
 	}
 
 
 	if n < 0 {
 	if n < 0 {
-		return nil, DecodingError{Info: "getInt64Array failed"}
+		return nil, PacketDecodingError{"getInt64Array failed: invalid length"}
 	}
 	}
 
 
 	ret := make([]int64, n)
 	ret := make([]int64, n)

+ 1 - 1
response_header.go

@@ -13,7 +13,7 @@ func (r *responseHeader) decode(pd packetDecoder) (err error) {
 		return err
 		return err
 	}
 	}
 	if r.length <= 4 || r.length > MaxResponseSize {
 	if r.length <= 4 || r.length > MaxResponseSize {
-		return DecodingError{Info: fmt.Sprintf("Message too large or too small. Got %d", r.length)}
+		return PacketDecodingError{fmt.Sprintf("Message of length %d too large or too small", r.length)}
 	}
 	}
 
 
 	r.correlationID, err = pd.getInt32()
 	r.correlationID, err = pd.getInt32()

+ 2 - 2
sarama.go

@@ -28,13 +28,13 @@ type StdLogger interface {
 var PanicHandler func(interface{})
 var PanicHandler func(interface{})
 
 
 // MaxRequestSize is the maximum size (in bytes) of any request that Sarama will attempt to send. Trying
 // MaxRequestSize is the maximum size (in bytes) of any request that Sarama will attempt to send. Trying
-// to send a request larger than this will result in an ErrPacketEncodingFailure. The default of 100 MiB is aligned
+// to send a request larger than this will result in an PacketEncodingError. The default of 100 MiB is aligned
 // with Kafka's default `socket.request.max.bytes`, which is the largest request the broker will attempt
 // with Kafka's default `socket.request.max.bytes`, which is the largest request the broker will attempt
 // to process.
 // to process.
 var MaxRequestSize uint32 = 100 * 1024 * 1024
 var MaxRequestSize uint32 = 100 * 1024 * 1024
 
 
 // MaxResponseSize is the maximum size (in bytes) of any response that Sarama will attempt to parse. If
 // MaxResponseSize is the maximum size (in bytes) of any response that Sarama will attempt to parse. If
-// a broker returns a response message larger than this value, Sarama will return a DecodingError. The
+// a broker returns a response message larger than this value, Sarama will return a PacketDecodingError. The
 // default of 100 MiB is aligned with Kafka's default `socket.request.max.bytes`, which is the largest
 // default of 100 MiB is aligned with Kafka's default `socket.request.max.bytes`, which is the largest
 // request the broker will attempt to process.
 // request the broker will attempt to process.
 var MaxResponseSize int32 = 100 * 1024 * 1024
 var MaxResponseSize int32 = 100 * 1024 * 1024