|
|
@@ -5,14 +5,12 @@ import (
|
|
|
"io"
|
|
|
"net"
|
|
|
"sync"
|
|
|
- "log"
|
|
|
)
|
|
|
|
|
|
// Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
|
|
|
type Broker struct {
|
|
|
id int32
|
|
|
addr string
|
|
|
- logger *log.Logger
|
|
|
|
|
|
correlationID int32
|
|
|
conn net.Conn
|
|
|
@@ -31,8 +29,8 @@ type responsePromise struct {
|
|
|
|
|
|
// NewBroker creates and returns a Broker targetting the given host:port address.
|
|
|
// This does not attempt to actually connect, you have to call Open() for that.
|
|
|
-func NewBroker(addr string, logger *log.Logger) *Broker {
|
|
|
- return &Broker{id: -1, addr: addr, logger: logger}
|
|
|
+func NewBroker(addr string) *Broker {
|
|
|
+ return &Broker{id: -1, addr: addr}
|
|
|
}
|
|
|
|
|
|
// Open tries to connect to the Broker. It takes the broker lock synchronously, then spawns a goroutine which
|
|
|
@@ -44,10 +42,8 @@ func (b *Broker) Open() error {
|
|
|
|
|
|
if b.conn != nil {
|
|
|
b.lock.Unlock()
|
|
|
- if b.logger != nil {
|
|
|
- b.logger.Printf("Failed to connect to broker %s\n", b.addr)
|
|
|
- b.logger.Println(AlreadyConnected)
|
|
|
- }
|
|
|
+ Logger.Printf("Failed to connect to broker %s\n", b.addr)
|
|
|
+ Logger.Println(AlreadyConnected)
|
|
|
return AlreadyConnected
|
|
|
}
|
|
|
|
|
|
@@ -56,10 +52,8 @@ func (b *Broker) Open() error {
|
|
|
|
|
|
b.conn, b.connErr = net.Dial("tcp", b.addr)
|
|
|
if b.connErr != nil {
|
|
|
- if b.logger != nil {
|
|
|
- b.logger.Printf("Failed to connect to broker %s\n", b.addr)
|
|
|
- b.logger.Println(b.connErr)
|
|
|
- }
|
|
|
+ Logger.Printf("Failed to connect to broker %s\n", b.addr)
|
|
|
+ Logger.Println(b.connErr)
|
|
|
return
|
|
|
}
|
|
|
|
|
|
@@ -68,10 +62,7 @@ func (b *Broker) Open() error {
|
|
|
// permit a few outstanding requests before we block waiting for responses
|
|
|
b.responses = make(chan responsePromise, 4)
|
|
|
|
|
|
- if b.logger != nil {
|
|
|
- b.logger.Printf("Connected to broker %s\n", b.addr)
|
|
|
- }
|
|
|
-
|
|
|
+ Logger.Printf("Connected to broker %s\n", b.addr)
|
|
|
go b.responseReceiver()
|
|
|
}()
|
|
|
|
|
|
@@ -90,16 +81,14 @@ func (b *Broker) Connected() (bool, error) {
|
|
|
func (b *Broker) Close() (err error) {
|
|
|
b.lock.Lock()
|
|
|
defer b.lock.Unlock()
|
|
|
- if b.logger != nil {
|
|
|
- defer func() {
|
|
|
- if err == nil {
|
|
|
- b.logger.Printf("Closed connection to broker #%d %s\n", b.id, b.addr)
|
|
|
- } else {
|
|
|
- b.logger.Printf("Failed to close connection to broker #%d %s.\n", b.id, b.addr)
|
|
|
- b.logger.Println(err)
|
|
|
- }
|
|
|
- }()
|
|
|
- }
|
|
|
+ defer func() {
|
|
|
+ if err == nil {
|
|
|
+ Logger.Printf("Closed connection to broker #%d %s\n", b.id, b.addr)
|
|
|
+ } else {
|
|
|
+ Logger.Printf("Failed to close connection to broker #%d %s.\n", b.id, b.addr)
|
|
|
+ Logger.Println(err)
|
|
|
+ }
|
|
|
+ }()
|
|
|
|
|
|
if b.conn == nil {
|
|
|
return NotConnected
|