浏览代码

Potentially useful state of broker communication

still more to do
Evan Huus 12 年之前
父节点
当前提交
d47b9a1312
共有 3 个文件被更改,包括 111 次插入11 次删除
  1. 29 6
      broker.go
  2. 71 5
      brokerManager.go
  3. 11 0
      metadata.go

+ 29 - 6
broker.go

@@ -12,11 +12,13 @@ type broker struct {
 	host   *string
 	port   int32
 
-	client         *Client
 	correlation_id int32
-	conn           net.Conn
-	requests       chan reqResPair
-	responses      chan reqResPair
+
+	conn net.Conn
+	addr net.TCPAddr
+
+	requests  chan reqResPair
+	responses chan reqResPair
 }
 
 type reqResPair struct {
@@ -37,7 +39,23 @@ func newBroker(host string, port int32) (b *broker, err error) {
 }
 
 func (b *broker) connect() (err error) {
-	// TODO
+	addr, err := net.ResolveIPAddr("ip", *b.host)
+	if err != nil {
+		return err
+	}
+
+	b.addr.IP = addr.IP
+	b.addr.Zone = addr.Zone
+	b.addr.Port = int(b.port)
+
+	b.conn, err = net.DialTCP("tcp", nil, &b.addr)
+	if err != nil {
+		return err
+	}
+
+	go b.sendRequestLoop()
+	go b.rcvResponseLoop()
+
 	return nil
 }
 
@@ -63,6 +81,11 @@ func (b *broker) decode(pd packetDecoder) (err error) {
 		return err
 	}
 
+	err = b.connect()
+	if err != nil {
+		return err
+	}
+
 	return nil
 }
 
@@ -129,7 +152,7 @@ func (b *broker) sendRequest(api API, body encoder) (chan []byte, error) {
 	var prepEnc prepEncoder
 	var realEnc realEncoder
 
-	req := request{api, b.correlation_id, b.client.id, body}
+	req := request{api, b.correlation_id, nil, body}
 
 	req.encode(&prepEnc)
 	if prepEnc.err {

+ 71 - 5
brokerManager.go

@@ -1,6 +1,9 @@
 package kafka
 
-import "sync"
+import (
+	"errors"
+	"sync"
+)
 
 type brokerKey struct {
 	topic     string
@@ -15,20 +18,83 @@ type brokerManager struct {
 
 func newBrokerManager(host string, port int32) (bm *brokerManager, err error) {
 	bm = new(brokerManager)
+
 	bm.defaultBroker, err = newBroker(host, port)
 	if err != nil {
 		return nil, err
 	}
+
 	bm.leaders = make(map[brokerKey]*broker)
+	err = bm.refreshAllTopics()
+	if err != nil {
+		return nil, err
+	}
+
 	return bm, nil
 }
 
 func (bm *brokerManager) lookupLeader(topic string, partition int32) *broker {
 	bm.leadersLock.RLock()
-	tmp := bm.leaders[brokerKey{topic, partition}]
-	bm.leadersLock.RUnlock()
-	return tmp
+	defer bm.leadersLock.RUnlock()
+	return bm.leaders[brokerKey{topic, partition}]
+}
+
+func (bm *brokerManager) getDefault() *broker {
+
+	if bm.defaultBroker == nil {
+		bm.leadersLock.RLock()
+		defer bm.leadersLock.RUnlock()
+		for _, bm.defaultBroker = range bm.leaders {
+			break
+		}
+	}
+
+	return bm.defaultBroker
+}
+
+func (bm *brokerManager) refreshTopics(topics []*string) error {
+	b := bm.getDefault()
+	if b == nil {
+		return errors.New("kafka: lost all broker connections")
+	}
+
+	responseChan, err := b.sendRequest(REQUEST_METADATA, &metadataRequest{topics})
+	if err != nil {
+		// TODO
+	}
+	decoder := realDecoder{raw: <-responseChan}
+	response := new(metadata)
+	err = response.decode(&decoder)
+	if err != nil {
+		// how badly should we blow up here ?
+	}
+
+	bm.leadersLock.Lock()
+	defer bm.leadersLock.Unlock()
+	for i := range response.topics {
+		topic := &response.topics[i]
+		if topic.err != NO_ERROR {
+			return topic.err
+		}
+		for j := range topic.partitions {
+			partition := &topic.partitions[j]
+			if partition.err != NO_ERROR {
+				return partition.err
+			}
+			bm.leaders[brokerKey{*topic.name, partition.id}] = response.brokerById(partition.leader)
+		}
+	}
+
+	return nil
+}
+
+func (bm *brokerManager) refreshTopic(topic string) error {
+	tmp := make([]*string, 1)
+	tmp[0] = &topic
+	return bm.refreshTopics(tmp)
 }
 
-func (bm *brokerManager) refreshTopic(topic string) {
+func (bm *brokerManager) refreshAllTopics() error {
+	tmp := make([]*string, 0)
+	return bm.refreshTopics(tmp)
 }

+ 11 - 0
metadata.go

@@ -3,6 +3,8 @@ package kafka
 type metadata struct {
 	brokers []broker
 	topics  []topicMetadata
+
+	brokerMap map[int32]*broker
 }
 
 func (m *metadata) encode(pe packetEncoder) {
@@ -23,11 +25,13 @@ func (m *metadata) decode(pd packetDecoder) (err error) {
 	}
 
 	m.brokers = make([]broker, n)
+	m.brokerMap = make(map[int32]*broker, n)
 	for i := 0; i < n; i++ {
 		err = (&m.brokers[i]).decode(pd)
 		if err != nil {
 			return err
 		}
+		m.brokerMap[m.brokers[i].nodeId] = &m.brokers[i]
 	}
 
 	n, err = pd.getArrayCount()
@@ -45,3 +49,10 @@ func (m *metadata) decode(pd packetDecoder) (err error) {
 
 	return nil
 }
+
+func (m *metadata) brokerById(id int32) *broker {
+	if m.brokerMap == nil {
+		return nil
+	}
+	return m.brokerMap[id]
+}