|
|
@@ -37,6 +37,8 @@ type Client struct {
|
|
|
// and uses that broker to automatically fetch metadata on the rest of the kafka cluster. If metadata cannot
|
|
|
// be retrieved from any of the given broker addresses, the client is not created.
|
|
|
func NewClient(id string, addrs []string, config *ClientConfig) (*Client, error) {
|
|
|
+ Logger.Println("Initializing new client")
|
|
|
+
|
|
|
if config == nil {
|
|
|
config = new(ClientConfig)
|
|
|
}
|
|
|
@@ -74,6 +76,8 @@ func NewClient(id string, addrs []string, config *ClientConfig) (*Client, error)
|
|
|
return nil, err
|
|
|
}
|
|
|
|
|
|
+ Logger.Println("Successfully initialized new client")
|
|
|
+
|
|
|
return client, nil
|
|
|
}
|
|
|
|
|
|
@@ -83,6 +87,7 @@ func NewClient(id string, addrs []string, config *ClientConfig) (*Client, error)
|
|
|
func (client *Client) Close() error {
|
|
|
client.lock.Lock()
|
|
|
defer client.lock.Unlock()
|
|
|
+ Logger.Println("Closing Client")
|
|
|
|
|
|
for _, broker := range client.brokers {
|
|
|
myBroker := broker // NB: block-local prevents clobbering
|
|
|
@@ -171,6 +176,7 @@ func (client *Client) RefreshAllMetadata() error {
|
|
|
func (client *Client) disconnectBroker(broker *Broker) {
|
|
|
client.lock.Lock()
|
|
|
defer client.lock.Unlock()
|
|
|
+ Logger.Printf("Disconnecting Broker %d\n", broker.ID())
|
|
|
|
|
|
client.deadBrokerAddrs = append(client.deadBrokerAddrs, broker.addr)
|
|
|
|