|
@@ -96,16 +96,9 @@ func (b *Broker) Connected() (bool, error) {
|
|
|
return b.conn != nil, b.connErr
|
|
return b.conn != nil, b.connErr
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (b *Broker) Close() (err error) {
|
|
|
|
|
|
|
+func (b *Broker) Close() error {
|
|
|
b.lock.Lock()
|
|
b.lock.Lock()
|
|
|
defer b.lock.Unlock()
|
|
defer b.lock.Unlock()
|
|
|
- defer func() {
|
|
|
|
|
- if err == nil {
|
|
|
|
|
- Logger.Printf("Closed connection to broker %s\n", b.addr)
|
|
|
|
|
- } else {
|
|
|
|
|
- Logger.Printf("Failed to close connection to broker %s: %s\n", b.addr, err)
|
|
|
|
|
- }
|
|
|
|
|
- }()
|
|
|
|
|
|
|
|
|
|
if b.conn == nil {
|
|
if b.conn == nil {
|
|
|
return ErrNotConnected
|
|
return ErrNotConnected
|
|
@@ -114,7 +107,7 @@ func (b *Broker) Close() (err error) {
|
|
|
close(b.responses)
|
|
close(b.responses)
|
|
|
<-b.done
|
|
<-b.done
|
|
|
|
|
|
|
|
- err = b.conn.Close()
|
|
|
|
|
|
|
+ err := b.conn.Close()
|
|
|
|
|
|
|
|
b.conn = nil
|
|
b.conn = nil
|
|
|
b.connErr = nil
|
|
b.connErr = nil
|
|
@@ -123,7 +116,13 @@ func (b *Broker) Close() (err error) {
|
|
|
|
|
|
|
|
atomic.StoreInt32(&b.opened, 0)
|
|
atomic.StoreInt32(&b.opened, 0)
|
|
|
|
|
|
|
|
- return
|
|
|
|
|
|
|
+ if err == nil {
|
|
|
|
|
+ Logger.Printf("Closed connection to broker %s\n", b.addr)
|
|
|
|
|
+ } else {
|
|
|
|
|
+ Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ return err
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known.
|
|
// ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known.
|