浏览代码

Error handling, documentation

Evan Huus 12 年之前
父节点
当前提交
7284dce946
共有 3 个文件被更改,包括 24 次插入6 次删除
  1. 14 0
      broker.go
  2. 9 5
      errors.go
  3. 1 1
      metadata_cache.go

+ 14 - 0
broker.go

@@ -36,10 +36,15 @@ func NewBroker(host string, port int32) *Broker {
 	return b
 }
 
+// Opens a connection to the remote broker.
 func (b *Broker) Connect() error {
 	b.lock.Lock()
 	defer b.lock.Unlock()
 
+	if b.conn != nil {
+		return AlreadyConnected
+	}
+
 	addr, err := net.ResolveIPAddr("ip", *b.host)
 	if err != nil {
 		return err
@@ -60,10 +65,15 @@ func (b *Broker) Connect() error {
 	return nil
 }
 
+// Closes the connection to the remote broker.
 func (b *Broker) Close() error {
 	b.lock.Lock()
 	defer b.lock.Unlock()
 
+	if b.conn == nil {
+		return NotConnected
+	}
+
 	close(b.responses)
 	<-b.done
 
@@ -136,6 +146,10 @@ func (b *Broker) send(clientID *string, req requestEncoder, promiseResponse bool
 	b.lock.Lock()
 	defer b.lock.Unlock()
 
+	if b.conn == nil {
+		return nil, NotConnected
+	}
+
 	fullRequest := request{b.correlation_id, clientID, req}
 	buf, err := encode(&fullRequest)
 	if err != nil {

+ 9 - 5
errors.go

@@ -1,5 +1,7 @@
 package kafka
 
+import "errors"
+
 // The various errors that can be returned by the Kafka server.
 // See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes
 type KError int16
@@ -58,12 +60,14 @@ func (err KError) Error() string {
 	}
 }
 
-type OutOfBrokers struct {
-}
+// Error returned when the client has run out of brokers to talk to (none of them are responding).
+var OutOfBrokers = errors.New("kafka: Client has run out of available brokers to talk to. Is your cluster reachable?")
 
-func (err OutOfBrokers) Error() string {
-	return "kafka: Client has run out of available brokers to talk to. Is your cluster reachable?"
-}
+// Error returned when calling Connect() on a Broker that is already connected.
+var AlreadyConnected = errors.New("kafka: broker: already connected")
+
+// Error returned when trying to send or call Close() on a Broker that is not connected.
+var NotConnected = errors.New("kafka: broker: not connected")
 
 // Returned when there was an error encoding a Kafka packet. This can happen, for example,
 // if you try to encode a string over 2^15 characters in length, since Kafka's encoding rules do

+ 1 - 1
metadata_cache.go

@@ -148,7 +148,7 @@ func (mc *metadataCache) refreshTopics(topics []*string) error {
 
 	}
 
-	return OutOfBrokers{}
+	return OutOfBrokers
 }
 
 func (mc *metadataCache) refreshTopic(topic string) error {