فهرست منبع

Use useful error types

Evan Huus 12 سال پیش
والد
کامیت
ffd8887fc5
4فایلهای تغییر یافته به همراه34 افزوده شده و 16 حذف شده
  1. 1 2
      broker.go
  2. 4 5
      brokerManager.go
  3. 21 0
      kError.go
  4. 8 9
      realDecoder.go

+ 1 - 2
broker.go

@@ -2,7 +2,6 @@ package kafka
 
 import (
 	"encoding/binary"
-	"errors"
 	"math"
 	"net"
 )
@@ -156,7 +155,7 @@ func (b *broker) sendRequest(clientID *string, api API, body encoder) (chan []by
 
 	req.encode(&prepEnc)
 	if prepEnc.err {
-		return nil, errors.New("kafka encoding error")
+		return nil, encodingError{}
 	}
 
 	realEnc.raw = make([]byte, prepEnc.length+4)

+ 4 - 5
brokerManager.go

@@ -1,9 +1,6 @@
 package kafka
 
-import (
-	"errors"
-	"sync"
-)
+import "sync"
 
 type brokerKey struct {
 	topic     string
@@ -22,6 +19,8 @@ func newBrokerManager(client *Client, host string, port int32) (bm *brokerManage
 
 	bm.client = client
 
+	// we create a new broker object as the default 'master' broker
+	// if this broker is also a leader then we will end up with two broker objects for it, but that's not a big deal
 	bm.defaultBroker, err = newBroker(host, port)
 	if err != nil {
 		return nil, err
@@ -58,7 +57,7 @@ func (bm *brokerManager) getDefault() *broker {
 func (bm *brokerManager) refreshTopics(topics []*string) error {
 	b := bm.getDefault()
 	if b == nil {
-		return errors.New("kafka: lost all broker connections")
+		return outOfBrokers{}
 	}
 
 	responseChan, err := b.sendRequest(bm.client.id, REQUEST_METADATA, &metadataRequest{topics})

+ 21 - 0
kError.go

@@ -55,3 +55,24 @@ func (err kError) Error() string {
 		return "Unknown error, how did this happen?"
 	}
 }
+
+type outOfBrokers struct {
+}
+
+func (err outOfBrokers) Error() string {
+	return "kafka: Client has run out of available brokers to talk to. Is your cluster reachable?"
+}
+
+type encodingError struct {
+}
+
+func (err encodingError) Error() string {
+	return "kafka: Could not encode packet."
+}
+
+type decodingError struct {
+}
+
+func (err decodingError) Error() string {
+	return "kafka: Could not decode packet. Is the server really speaking kafka?"
+}

+ 8 - 9
realDecoder.go

@@ -2,7 +2,6 @@ package kafka
 
 import (
 	"encoding/binary"
-	"errors"
 	"math"
 )
 
@@ -17,7 +16,7 @@ func (rd *realDecoder) avail() int {
 
 func (rd *realDecoder) getInt16() (int16, error) {
 	if rd.avail() < 2 {
-		return -1, errors.New("kafka getInt16: not enough data")
+		return -1, decodingError{}
 	}
 	tmp := int16(binary.BigEndian.Uint16(rd.raw[rd.off:]))
 	rd.off += 2
@@ -26,7 +25,7 @@ func (rd *realDecoder) getInt16() (int16, error) {
 
 func (rd *realDecoder) getInt32() (int32, error) {
 	if rd.avail() < 4 {
-		return -1, errors.New("kafka getInt32: not enough data")
+		return -1, decodingError{}
 	}
 	tmp := int32(binary.BigEndian.Uint32(rd.raw[rd.off:]))
 	rd.off += 4
@@ -49,13 +48,13 @@ func (rd *realDecoder) getString() (*string, error) {
 
 	switch {
 	case n < -1:
-		return nil, errors.New("kafka getString: invalid negative length")
+		return nil, decodingError{}
 	case n == -1:
 		return nil, nil
 	case n == 0:
 		return new(string), nil
 	case n > rd.avail():
-		return nil, errors.New("kafka getString: not enough data")
+		return nil, decodingError{}
 	default:
 		tmp := new(string)
 		*tmp = string(rd.raw[rd.off : rd.off+n])
@@ -74,14 +73,14 @@ func (rd *realDecoder) getBytes() (*[]byte, error) {
 
 	switch {
 	case n < -1:
-		return nil, errors.New("kafka getBytes: invalid negative length")
+		return nil, decodingError{}
 	case n == -1:
 		return nil, nil
 	case n == 0:
 		tmp := make([]byte, 0)
 		return &tmp, nil
 	case n > rd.avail():
-		return nil, errors.New("kafka getString: not enough data")
+		return nil, decodingError{}
 	default:
 		tmp := rd.raw[rd.off : rd.off+n]
 		return &tmp, nil
@@ -90,12 +89,12 @@ func (rd *realDecoder) getBytes() (*[]byte, error) {
 
 func (rd *realDecoder) getArrayCount() (int, error) {
 	if rd.avail() < 4 {
-		return -1, errors.New("kafka getArrayCount: not enough data")
+		return -1, decodingError{}
 	}
 	tmp := int(binary.BigEndian.Uint32(rd.raw[rd.off:]))
 	rd.off += 4
 	if tmp > rd.avail() || tmp > 2*math.MaxUint16 {
-		return -1, errors.New("kafka getArrayCount: unreasonably long array")
+		return -1, decodingError{}
 	}
 	return tmp, nil
 }