|
|
@@ -11,14 +11,17 @@ type brokerKey struct {
|
|
|
}
|
|
|
|
|
|
type brokerManager struct {
|
|
|
+ client *Client
|
|
|
defaultBroker *broker
|
|
|
leaders map[brokerKey]*broker
|
|
|
leadersLock sync.RWMutex
|
|
|
}
|
|
|
|
|
|
-func newBrokerManager(host string, port int32) (bm *brokerManager, err error) {
|
|
|
+func newBrokerManager(client *Client, host string, port int32) (bm *brokerManager, err error) {
|
|
|
bm = new(brokerManager)
|
|
|
|
|
|
+ bm.client = client
|
|
|
+
|
|
|
bm.defaultBroker, err = newBroker(host, port)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
@@ -58,7 +61,7 @@ func (bm *brokerManager) refreshTopics(topics []*string) error {
|
|
|
return errors.New("kafka: lost all broker connections")
|
|
|
}
|
|
|
|
|
|
- responseChan, err := b.sendRequest(REQUEST_METADATA, &metadataRequest{topics})
|
|
|
+ responseChan, err := b.sendRequest(bm.client.id, REQUEST_METADATA, &metadataRequest{topics})
|
|
|
if err != nil {
|
|
|
// TODO
|
|
|
}
|