瀏覽代碼

More error case fixes

Evan Huus 12 年之前
父節點
當前提交
e9be1161e9
共有 2 個文件被更改,包括 10 次插入11 次删除
  1. 8 9
      brokerManager.go
  2. 2 2
      errors.go

+ 8 - 9
brokerManager.go

@@ -27,7 +27,9 @@ func newBrokerManager(client *Client, host string, port int32) (bm *brokerManage
 	}
 
 	bm.leaders = make(map[brokerKey]*broker)
-	err = bm.refreshAllTopics()
+
+	// do an initial fetch of all cluster metadata by specifing an empty list of topics
+	err = bm.refreshTopics(make([]*string, 0))
 	if err != nil {
 		return nil, err
 	}
@@ -57,22 +59,24 @@ func (bm *brokerManager) getDefault() *broker {
 func (bm *brokerManager) refreshTopics(topics []*string) error {
 	b := bm.getDefault()
 	if b == nil {
-		return outOfBrokers{}
+		return OutOfBrokers{}
 	}
 
 	responseChan, err := b.sendRequest(bm.client.id, REQUEST_METADATA, &metadataRequest{topics})
 	if err != nil {
-		// TODO
+		return err
 	}
+
 	decoder := realDecoder{raw: <-responseChan}
 	response := new(metadata)
 	err = response.decode(&decoder)
 	if err != nil {
-		// how badly should we blow up here ?
+		return err
 	}
 
 	bm.leadersLock.Lock()
 	defer bm.leadersLock.Unlock()
+
 	for i := range response.topics {
 		topic := &response.topics[i]
 		if topic.err != NO_ERROR {
@@ -95,8 +99,3 @@ func (bm *brokerManager) refreshTopic(topic string) error {
 	tmp[0] = &topic
 	return bm.refreshTopics(tmp)
 }
-
-func (bm *brokerManager) refreshAllTopics() error {
-	tmp := make([]*string, 0)
-	return bm.refreshTopics(tmp)
-}

+ 2 - 2
errors.go

@@ -56,10 +56,10 @@ func (err KError) Error() string {
 	}
 }
 
-type outOfBrokers struct {
+type OutOfBrokers struct {
 }
 
-func (err outOfBrokers) Error() string {
+func (err OutOfBrokers) Error() string {
 	return "kafka: Client has run out of available brokers to talk to. Is your cluster reachable?"
 }