Browse Source

Prefix error variables with Err

Willem van Bergen 10 years ago
parent
commit
2018fd405e
12 changed files with 69 additions and 68 deletions
  1. 4 4
      broker.go
  2. 10 10
      client.go
  3. 5 5
      consumer.go
  4. 1 1
      encoder_decoder.go
  5. 21 20
      errors.go
  6. 2 2
      functional_test.go
  7. 1 1
      message.go
  8. 1 1
      message_set.go
  9. 4 4
      prep_encoder.go
  10. 5 5
      producer.go
  11. 14 14
      real_decoder.go
  12. 1 1
      sarama.go

+ 4 - 4
broker.go

@@ -97,8 +97,8 @@ func (b *Broker) Open(conf *BrokerConfig) error {
 
 	if b.conn != nil {
 		b.lock.Unlock()
-		Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, AlreadyConnected)
-		return AlreadyConnected
+		Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, ErrAlreadyConnected)
+		return ErrAlreadyConnected
 	}
 
 	go withRecover(func() {
@@ -143,7 +143,7 @@ func (b *Broker) Close() (err error) {
 	}()
 
 	if b.conn == nil {
-		return NotConnected
+		return ErrNotConnected
 	}
 
 	close(b.responses)
@@ -267,7 +267,7 @@ func (b *Broker) send(clientID string, req requestEncoder, promiseResponse bool)
 		if b.connErr != nil {
 			return nil, b.connErr
 		}
-		return nil, NotConnected
+		return nil, ErrNotConnected
 	}
 
 	fullRequest := request{b.correlationID, clientID, req}

+ 10 - 10
client.go

@@ -131,7 +131,7 @@ func (client *Client) Close() error {
 		// Chances are this is being called from a defer() and the error will go unobserved
 		// so we go ahead and log the event in this case.
 		Logger.Printf("Close() called on already closed client")
-		return ClosedClient
+		return ErrClosedClient
 	}
 
 	client.lock.Lock()
@@ -162,7 +162,7 @@ func (client *Client) Closed() bool {
 func (client *Client) Topics() ([]string, error) {
 	// Check to see whether the client is closed
 	if client.Closed() {
-		return nil, ClosedClient
+		return nil, ErrClosedClient
 	}
 
 	client.lock.RLock()
@@ -180,7 +180,7 @@ func (client *Client) Topics() ([]string, error) {
 func (client *Client) Partitions(topic string) ([]int32, error) {
 	// Check to see whether the client is closed
 	if client.Closed() {
-		return nil, ClosedClient
+		return nil, ErrClosedClient
 	}
 
 	partitions := client.cachedPartitions(topic, allPartitions)
@@ -205,7 +205,7 @@ func (client *Client) Partitions(topic string) ([]int32, error) {
 func (client *Client) WritablePartitions(topic string) ([]int32, error) {
 	// Check to see whether the client is closed
 	if client.Closed() {
-		return nil, ClosedClient
+		return nil, ErrClosedClient
 	}
 
 	partitions := client.cachedPartitions(topic, writablePartitions)
@@ -234,7 +234,7 @@ func (client *Client) WritablePartitions(topic string) ([]int32, error) {
 // Replicas returns the set of all replica IDs for the given partition.
 func (client *Client) Replicas(topic string, partitionID int32) ([]int32, error) {
 	if client.Closed() {
-		return nil, ClosedClient
+		return nil, ErrClosedClient
 	}
 
 	metadata, err := client.getMetadata(topic, partitionID)
@@ -254,7 +254,7 @@ func (client *Client) Replicas(topic string, partitionID int32) ([]int32, error)
 // This method should be considered effectively deprecated.
 func (client *Client) ReplicasInSync(topic string, partitionID int32) ([]int32, error) {
 	if client.Closed() {
-		return nil, ClosedClient
+		return nil, ErrClosedClient
 	}
 
 	metadata, err := client.getMetadata(topic, partitionID)
@@ -315,7 +315,7 @@ func (client *Client) GetOffset(topic string, partitionID int32, where OffsetTim
 
 	block := response.GetBlock(topic, partitionID)
 	if block == nil {
-		return -1, IncompleteResponse
+		return -1, ErrIncompleteResponse
 	}
 	if block.Err != NoError {
 		return -1, block.Err
@@ -512,7 +512,7 @@ func (client *Client) refreshMetadata(topics []string, retriesRemaining int) err
 	// resources.  Check to see if we're dealing with a closed Client and error
 	// out immediately if so.
 	if client.Closed() {
-		return ClosedClient
+		return ErrClosedClient
 	}
 
 	// Kafka will throw exceptions on an empty topic and not return a proper
@@ -548,7 +548,7 @@ func (client *Client) refreshMetadata(topics []string, retriesRemaining int) err
 			}
 
 			return err
-		case EncodingError:
+		case ErrPacketEncodingFailure:
 			// didn't even send, return the error
 			return err
 		default:
@@ -567,7 +567,7 @@ func (client *Client) refreshMetadata(topics []string, retriesRemaining int) err
 		return client.refreshMetadata(topics, retriesRemaining-1)
 	}
 
-	return OutOfBrokers
+	return ErrOutOfBrokers
 }
 
 // if no fatal error, returns a list of topics that need retrying due to LeaderNotAvailable

+ 5 - 5
consumer.go

@@ -62,7 +62,7 @@ func (config *ConsumerConfig) Validate() error {
 type PartitionConsumerConfig struct {
 	// The default (maximum) amount of data to fetch from the broker in each request. The default is 32768 bytes.
 	DefaultFetchSize int32
-	// The maximum permittable message size - messages larger than this will return MessageTooLarge. The default of 0 is
+	// The maximum permittable message size - messages larger than this will return ErrMessageTooLarge. The default of 0 is
 	// treated as no limit.
 	MaxMessageSize int32
 	// The method used to determine at which offset to begin consuming messages. The default is to start at the most recent message.
@@ -134,7 +134,7 @@ type Consumer struct {
 func NewConsumer(client *Client, config *ConsumerConfig) (*Consumer, error) {
 	// Check that we are not dealing with a closed Client before processing any other arguments
 	if client.Closed() {
-		return nil, ClosedClient
+		return nil, ErrClosedClient
 	}
 
 	if config == nil {
@@ -466,7 +466,7 @@ func (w *brokerConsumer) subscriptionConsumer() {
 		for child := range w.subscriptions {
 			block := response.GetBlock(child.topic, child.partition)
 			if block == nil {
-				child.sendError(IncompleteResponse)
+				child.sendError(ErrIncompleteResponse)
 				child.trigger <- none{}
 				delete(w.subscriptions, child)
 				continue
@@ -543,7 +543,7 @@ func (w *brokerConsumer) handleResponse(child *PartitionConsumer, block *FetchRe
 		if block.MsgSet.PartialTrailingMessage {
 			if child.config.MaxMessageSize > 0 && child.fetchSize == child.config.MaxMessageSize {
 				// we can't ask for more data, we've hit the configured limit
-				child.sendError(MessageTooLarge)
+				child.sendError(ErrMessageTooLarge)
 				child.offset++ // skip this one so we can keep processing future messages
 			} else {
 				child.fetchSize *= 2
@@ -588,7 +588,7 @@ func (w *brokerConsumer) handleResponse(child *PartitionConsumer, block *FetchRe
 	}
 
 	if incomplete || !atLeastOne {
-		child.sendError(IncompleteResponse)
+		child.sendError(ErrIncompleteResponse)
 		child.trigger <- none{}
 		delete(w.subscriptions, child)
 	}

+ 1 - 1
encoder_decoder.go

@@ -21,7 +21,7 @@ func encode(in encoder) ([]byte, error) {
 	}
 
 	if prepEnc.length < 0 || uint32(prepEnc.length) > MaxRequestSize {
-		return nil, EncodingError
+		return nil, ErrPacketEncodingFailure
 	}
 
 	realEnc.raw = make([]byte, prepEnc.length)

+ 21 - 20
errors.go

@@ -5,38 +5,39 @@ import (
 	"fmt"
 )
 
-// OutOfBrokers is the error returned when the client has run out of brokers to talk to because all of them errored
+// ErrOutOfBrokers is the error returned when the client has run out of brokers to talk to because all of them errored
 // or otherwise failed to respond.
-var OutOfBrokers = errors.New("kafka: Client has run out of available brokers to talk to. Is your cluster reachable?")
+var ErrOutOfBrokers = errors.New("kafka: Client has run out of available brokers to talk to. Is your cluster reachable?")
 
-// ClosedClient is the error returned when a method is called on a client that has been closed.
-var ClosedClient = errors.New("kafka: Tried to use a client that was closed")
+// ErrClosedClient is the error returned when a method is called on a client that has been closed.
+var ErrClosedClient = errors.New("kafka: Tried to use a client that was closed")
 
-// IncompleteResponse is the error returned when the server returns a syntactically valid response, but it does
+// ErrIncompleteResponse is the error returned when the server returns a syntactically valid response, but it does
 // not contain the expected information.
-var IncompleteResponse = errors.New("kafka: Response did not contain all the expected topic/partition blocks")
+var ErrIncompleteResponse = errors.New("kafka: Response did not contain all the expected topic/partition blocks")
 
-// InvalidPartition is the error returned when a partitioner returns an invalid partition index
+// ErrInvalidPartition is the error returned when a partitioner returns an invalid partition index
 // (meaning one outside of the range [0...numPartitions-1]).
-var InvalidPartition = errors.New("kafka: Partitioner returned an invalid partition index")
+var ErrInvalidPartition = errors.New("kafka: Partitioner returned an invalid partition index")
 
-// AlreadyConnected is the error returned when calling Open() on a Broker that is already connected.
-var AlreadyConnected = errors.New("kafka: broker: already connected")
+// ErrAlreadyConnected is the error returned when calling Open() on a Broker that is already connected.
+var ErrAlreadyConnected = errors.New("kafka: broker already connected")
 
-// NotConnected is the error returned when trying to send or call Close() on a Broker that is not connected.
-var NotConnected = errors.New("kafka: broker: not connected")
+// ErrNotConnected is the error returned when trying to send or call Close() on a Broker that is not connected.
+var ErrNotConnected = errors.New("kafka: broker not connected")
 
-// EncodingError is returned from a failure while encoding a Kafka packet. This can happen, for example,
+// ErrPacketEncodingFailure is returned from a failure while encoding a Kafka packet. This can happen, for example,
 // if you try to encode a string over 2^15 characters in length, since Kafka's encoding rules do not permit that.
-var EncodingError = errors.New("kafka: Error while encoding packet")
+var ErrPacketEncodingFailure = errors.New("kafka: Error while encoding packet")
 
-// InsufficientData is returned when decoding and the packet is truncated. This can be expected
+// ErrInsufficientData is returned when decoding and the packet is truncated. This can be expected
 // when requesting messages, since as an optimization the server is allowed to return a partial message at the end
 // of the message set.
-var InsufficientData = errors.New("kafka: Insufficient data to decode packet, more bytes expected")
 
-// ShuttingDown is returned when a producer receives a message during shutdown.
-var ShuttingDown = errors.New("kafka: Message received by producer in process of shutting down")
+var ErrInsufficientData = errors.New("kafka: Insufficient data to decode packet, more bytes expected")
+
+// ErrShuttingDown is returned when a producer receives a message during shutdown.
+var ErrShuttingDown = errors.New("kafka: Message received by producer in process of shutting down")
 
 // DecodingError is returned when there was an error (other than truncated data) decoding the Kafka broker's response.
 // This can be a bad CRC or length field, or any other invalid value.
@@ -48,8 +49,8 @@ func (err DecodingError) Error() string {
 	return fmt.Sprintf("kafka: Error while decoding packet: %s", err.Info)
 }
 
-// MessageTooLarge is returned when the next message to consume is larger than the configured MaxFetchSize
-var MessageTooLarge = errors.New("kafka: Message is larger than MaxFetchSize")
+// ErrMessageTooLarge is returned when the next message to consume is larger than the configured MaxFetchSize
+var ErrMessageTooLarge = errors.New("kafka: Message is larger than MaxFetchSize")
 
 // ConfigurationError is the type of error returned from NewClient, NewProducer or NewConsumer when the specified
 // configuration is invalid.

+ 2 - 2
functional_test.go

@@ -48,8 +48,8 @@ func TestFuncConnectionFailure(t *testing.T) {
 	config.MetadataRetries = 1
 
 	_, err := NewClient("test", []string{"localhost:9000"}, config)
-	if err != OutOfBrokers {
-		t.Fatal("Expected returned error to be OutOfBrokers, but was: ", err)
+	if err != ErrOutOfBrokers {
+		t.Fatal("Expected returned error to be ErrOutOfBrokers, but was: ", err)
 	}
 }
 

+ 1 - 1
message.go

@@ -72,7 +72,7 @@ func (m *Message) encode(pe packetEncoder) error {
 			m.compressedCache = tmp
 			payload = m.compressedCache
 		default:
-			return EncodingError
+			return ErrPacketEncodingFailure
 		}
 	}
 

+ 1 - 1
message_set.go

@@ -69,7 +69,7 @@ func (ms *MessageSet) decode(pd packetDecoder) (err error) {
 		switch err {
 		case nil:
 			ms.Messages = append(ms.Messages, msb)
-		case InsufficientData:
+		case ErrInsufficientData:
 			// As an optimization the server is allowed to return a partial message at the
 			// end of the message set. Clients should handle this case. So we just ignore such things.
 			ms.PartialTrailingMessage = true

+ 4 - 4
prep_encoder.go

@@ -29,7 +29,7 @@ func (pe *prepEncoder) putInt64(in int64) {
 
 func (pe *prepEncoder) putArrayLength(in int) error {
 	if in > math.MaxInt32 {
-		return EncodingError
+		return ErrPacketEncodingFailure
 	}
 	pe.length += 4
 	return nil
@@ -43,7 +43,7 @@ func (pe *prepEncoder) putBytes(in []byte) error {
 		return nil
 	}
 	if len(in) > math.MaxInt32 {
-		return EncodingError
+		return ErrPacketEncodingFailure
 	}
 	pe.length += len(in)
 	return nil
@@ -51,7 +51,7 @@ func (pe *prepEncoder) putBytes(in []byte) error {
 
 func (pe *prepEncoder) putRawBytes(in []byte) error {
 	if len(in) > math.MaxInt32 {
-		return EncodingError
+		return ErrPacketEncodingFailure
 	}
 	pe.length += len(in)
 	return nil
@@ -60,7 +60,7 @@ func (pe *prepEncoder) putRawBytes(in []byte) error {
 func (pe *prepEncoder) putString(in string) error {
 	pe.length += 2
 	if len(in) > math.MaxInt16 {
-		return EncodingError
+		return ErrPacketEncodingFailure
 	}
 	pe.length += len(in)
 	return nil

+ 5 - 5
producer.go

@@ -126,7 +126,7 @@ func NewProducer(client *Client, config *ProducerConfig) (*Producer, error) {
 	// Check that we are not dealing with a closed Client before processing
 	// any other arguments
 	if client.Closed() {
-		return nil, ClosedClient
+		return nil, ErrClosedClient
 	}
 
 	if config == nil {
@@ -319,7 +319,7 @@ func (p *Producer) topicDispatcher() {
 	p.retries <- &MessageToSend{flags: shutdown}
 
 	for msg := range p.input {
-		p.returnError(msg, ShuttingDown)
+		p.returnError(msg, ErrShuttingDown)
 	}
 
 	close(p.errors)
@@ -586,7 +586,7 @@ func (p *Producer) flusher(broker *Broker, input chan []*MessageToSend) {
 		switch err {
 		case nil:
 			break
-		case EncodingError:
+		case ErrPacketEncodingFailure:
 			p.returnErrors(batch, err)
 			continue
 		default:
@@ -612,7 +612,7 @@ func (p *Producer) flusher(broker *Broker, input chan []*MessageToSend) {
 
 				block := response.GetBlock(topic, partition)
 				if block == nil {
-					p.returnErrors(msgs, IncompleteResponse)
+					p.returnErrors(msgs, ErrIncompleteResponse)
 					continue
 				}
 
@@ -718,7 +718,7 @@ func (p *Producer) assignPartition(partitioner Partitioner, msg *MessageToSend)
 	if err != nil {
 		return err
 	} else if choice < 0 || choice >= numPartitions {
-		return InvalidPartition
+		return ErrInvalidPartition
 	}
 
 	msg.partition = partitions[choice]

+ 14 - 14
real_decoder.go

@@ -16,7 +16,7 @@ type realDecoder struct {
 func (rd *realDecoder) getInt8() (int8, error) {
 	if rd.remaining() < 1 {
 		rd.off = len(rd.raw)
-		return -1, InsufficientData
+		return -1, ErrInsufficientData
 	}
 	tmp := int8(rd.raw[rd.off])
 	rd.off += binary.Size(tmp)
@@ -26,7 +26,7 @@ func (rd *realDecoder) getInt8() (int8, error) {
 func (rd *realDecoder) getInt16() (int16, error) {
 	if rd.remaining() < 2 {
 		rd.off = len(rd.raw)
-		return -1, InsufficientData
+		return -1, ErrInsufficientData
 	}
 	tmp := int16(binary.BigEndian.Uint16(rd.raw[rd.off:]))
 	rd.off += binary.Size(tmp)
@@ -36,7 +36,7 @@ func (rd *realDecoder) getInt16() (int16, error) {
 func (rd *realDecoder) getInt32() (int32, error) {
 	if rd.remaining() < 4 {
 		rd.off = len(rd.raw)
-		return -1, InsufficientData
+		return -1, ErrInsufficientData
 	}
 	tmp := int32(binary.BigEndian.Uint32(rd.raw[rd.off:]))
 	rd.off += binary.Size(tmp)
@@ -46,7 +46,7 @@ func (rd *realDecoder) getInt32() (int32, error) {
 func (rd *realDecoder) getInt64() (int64, error) {
 	if rd.remaining() < 8 {
 		rd.off = len(rd.raw)
-		return -1, InsufficientData
+		return -1, ErrInsufficientData
 	}
 	tmp := int64(binary.BigEndian.Uint64(rd.raw[rd.off:]))
 	rd.off += binary.Size(tmp)
@@ -56,13 +56,13 @@ func (rd *realDecoder) getInt64() (int64, error) {
 func (rd *realDecoder) getArrayLength() (int, error) {
 	if rd.remaining() < 4 {
 		rd.off = len(rd.raw)
-		return -1, InsufficientData
+		return -1, ErrInsufficientData
 	}
 	tmp := int(binary.BigEndian.Uint32(rd.raw[rd.off:]))
 	rd.off += 4
 	if tmp > rd.remaining() {
 		rd.off = len(rd.raw)
-		return -1, InsufficientData
+		return -1, ErrInsufficientData
 	} else if tmp > 2*math.MaxUint16 {
 		return -1, DecodingError{Info: "getArrayLength failed: Invalid array length"}
 	}
@@ -89,7 +89,7 @@ func (rd *realDecoder) getBytes() ([]byte, error) {
 		return make([]byte, 0), nil
 	case n > rd.remaining():
 		rd.off = len(rd.raw)
-		return nil, InsufficientData
+		return nil, ErrInsufficientData
 	}
 
 	tmpStr := rd.raw[rd.off : rd.off+n]
@@ -115,7 +115,7 @@ func (rd *realDecoder) getString() (string, error) {
 		return "", nil
 	case n > rd.remaining():
 		rd.off = len(rd.raw)
-		return "", InsufficientData
+		return "", ErrInsufficientData
 	}
 
 	tmpStr := string(rd.raw[rd.off : rd.off+n])
@@ -126,14 +126,14 @@ func (rd *realDecoder) getString() (string, error) {
 func (rd *realDecoder) getInt32Array() ([]int32, error) {
 	if rd.remaining() < 4 {
 		rd.off = len(rd.raw)
-		return nil, InsufficientData
+		return nil, ErrInsufficientData
 	}
 	n := int(binary.BigEndian.Uint32(rd.raw[rd.off:]))
 	rd.off += 4
 
 	if rd.remaining() < 4*n {
 		rd.off = len(rd.raw)
-		return nil, InsufficientData
+		return nil, ErrInsufficientData
 	}
 
 	if n == 0 {
@@ -155,14 +155,14 @@ func (rd *realDecoder) getInt32Array() ([]int32, error) {
 func (rd *realDecoder) getInt64Array() ([]int64, error) {
 	if rd.remaining() < 4 {
 		rd.off = len(rd.raw)
-		return nil, InsufficientData
+		return nil, ErrInsufficientData
 	}
 	n := int(binary.BigEndian.Uint32(rd.raw[rd.off:]))
 	rd.off += 4
 
 	if rd.remaining() < 8*n {
 		rd.off = len(rd.raw)
-		return nil, InsufficientData
+		return nil, ErrInsufficientData
 	}
 
 	if n == 0 {
@@ -190,7 +190,7 @@ func (rd *realDecoder) remaining() int {
 func (rd *realDecoder) getSubset(length int) (packetDecoder, error) {
 	if length > rd.remaining() {
 		rd.off = len(rd.raw)
-		return nil, InsufficientData
+		return nil, ErrInsufficientData
 	}
 
 	start := rd.off
@@ -206,7 +206,7 @@ func (rd *realDecoder) push(in pushDecoder) error {
 	reserve := in.reserveLength()
 	if rd.remaining() < reserve {
 		rd.off = len(rd.raw)
-		return InsufficientData
+		return ErrInsufficientData
 	}
 
 	rd.stack = append(rd.stack, in)

+ 1 - 1
sarama.go

@@ -28,7 +28,7 @@ type StdLogger interface {
 var PanicHandler func(interface{})
 
 // MaxRequestSize is the maximum size (in bytes) of any request that Sarama will attempt to send. Trying
-// to send a request larger than this will result in an EncodingError. The default of 100 MiB is aligned
+// to send a request larger than this will result in an ErrPacketEncodingFailure. The default of 100 MiB is aligned
 // with Kafka's default `socket.request.max.bytes`, which is the largest request the broker will attempt
 // to process.
 var MaxRequestSize uint32 = 100 * 1024 * 1024