|
|
@@ -42,6 +42,8 @@ func (b *Broker) Open() error {
|
|
|
|
|
|
if b.conn != nil {
|
|
|
b.lock.Unlock()
|
|
|
+ Logger.Printf("Failed to connect to broker %s\n", b.addr)
|
|
|
+ Logger.Println(AlreadyConnected)
|
|
|
return AlreadyConnected
|
|
|
}
|
|
|
|
|
|
@@ -50,6 +52,8 @@ func (b *Broker) Open() error {
|
|
|
|
|
|
b.conn, b.connErr = net.Dial("tcp", b.addr)
|
|
|
if b.connErr != nil {
|
|
|
+ Logger.Printf("Failed to connect to broker %s\n", b.addr)
|
|
|
+ Logger.Println(b.connErr)
|
|
|
return
|
|
|
}
|
|
|
|
|
|
@@ -58,6 +62,7 @@ func (b *Broker) Open() error {
|
|
|
// permit a few outstanding requests before we block waiting for responses
|
|
|
b.responses = make(chan responsePromise, 4)
|
|
|
|
|
|
+ Logger.Printf("Connected to broker %s\n", b.addr)
|
|
|
go b.responseReceiver()
|
|
|
}()
|
|
|
|
|
|
@@ -73,9 +78,17 @@ func (b *Broker) Connected() (bool, error) {
|
|
|
return b.conn != nil, b.connErr
|
|
|
}
|
|
|
|
|
|
-func (b *Broker) Close() error {
|
|
|
+func (b *Broker) Close() (err error) {
|
|
|
b.lock.Lock()
|
|
|
defer b.lock.Unlock()
|
|
|
+ defer func() {
|
|
|
+ if err == nil {
|
|
|
+ Logger.Printf("Closed connection to broker %s\n", b.addr)
|
|
|
+ } else {
|
|
|
+ Logger.Printf("Failed to close connection to broker %s.\n", b.addr)
|
|
|
+ Logger.Println(err)
|
|
|
+ }
|
|
|
+ }()
|
|
|
|
|
|
if b.conn == nil {
|
|
|
return NotConnected
|
|
|
@@ -84,14 +97,14 @@ func (b *Broker) Close() error {
|
|
|
close(b.responses)
|
|
|
<-b.done
|
|
|
|
|
|
- err := b.conn.Close()
|
|
|
+ err = b.conn.Close()
|
|
|
|
|
|
b.conn = nil
|
|
|
b.connErr = nil
|
|
|
b.done = nil
|
|
|
b.responses = nil
|
|
|
|
|
|
- return err
|
|
|
+ return
|
|
|
}
|
|
|
|
|
|
// ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known.
|