Selaa lähdekoodia

Add basic logging for Broker connection events.

Willem van Bergen 12 vuotta sitten
vanhempi
commit
a5e7b9795e
3 muutettua tiedostoa jossa 38 lisäystä ja 12 poistoa
  1. 31 7
      broker.go
  2. 3 3
      broker_test.go
  3. 4 2
      client.go

+ 31 - 7
broker.go

@@ -5,12 +5,14 @@ import (
 	"io"
 	"net"
 	"sync"
+	"log"
 )
 
 // Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
 type Broker struct {
-	id   int32
-	addr string
+	id     int32
+	addr   string
+	logger *log.Logger
 
 	correlationID int32
 	conn          net.Conn
@@ -29,8 +31,8 @@ type responsePromise struct {
 
 // NewBroker creates and returns a Broker targetting the given host:port address.
 // This does not attempt to actually connect, you have to call Open() for that.
-func NewBroker(addr string) *Broker {
-	return &Broker{id: -1, addr: addr}
+func NewBroker(addr string, logger *log.Logger) *Broker {
+	return &Broker{id: -1, addr: addr, logger: logger}
 }
 
 // Open tries to connect to the Broker. It takes the broker lock synchronously, then spawns a goroutine which
@@ -42,6 +44,10 @@ func (b *Broker) Open() error {
 
 	if b.conn != nil {
 		b.lock.Unlock()
+		if b.logger != nil {
+			b.logger.Printf("Failed to connect to broker %s\n", b.addr)
+			b.logger.Println(AlreadyConnected)
+		}
 		return AlreadyConnected
 	}
 
@@ -50,6 +56,10 @@ func (b *Broker) Open() error {
 
 		b.conn, b.connErr = net.Dial("tcp", b.addr)
 		if b.connErr != nil {
+			if b.logger != nil {
+				b.logger.Printf("Failed to connect to broker %s\n", b.addr)
+				b.logger.Println(b.connErr)
+			}
 			return
 		}
 
@@ -58,6 +68,10 @@ func (b *Broker) Open() error {
 		// permit a few outstanding requests before we block waiting for responses
 		b.responses = make(chan responsePromise, 4)
 
+		if b.logger != nil {
+			b.logger.Printf("Connected to broker %s\n", b.addr)
+		}
+
 		go b.responseReceiver()
 	}()
 
@@ -73,9 +87,19 @@ func (b *Broker) Connected() (bool, error) {
 	return b.conn != nil, b.connErr
 }
 
-func (b *Broker) Close() error {
+func (b *Broker) Close() (err error) {
 	b.lock.Lock()
 	defer b.lock.Unlock()
+	if b.logger != nil {
+		defer func() {
+			if err == nil {
+				b.logger.Printf("Closed connection to broker #%d %s\n", b.id, b.addr)
+			} else {
+				b.logger.Printf("Failed to close connection to broker #%d %s.\n", b.id, b.addr)
+				b.logger.Println(err)
+			}
+		}()
+	}
 
 	if b.conn == nil {
 		return NotConnected
@@ -84,14 +108,14 @@ func (b *Broker) Close() error {
 	close(b.responses)
 	<-b.done
 
-	err := b.conn.Close()
+	err = b.conn.Close()
 
 	b.conn = nil
 	b.connErr = nil
 	b.done = nil
 	b.responses = nil
 
-	return err
+	return
 }
 
 // ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known.

+ 3 - 3
broker_test.go

@@ -143,7 +143,7 @@ func NewMockBroker(t *testing.T, responses chan []byte) *MockBroker {
 }
 
 func ExampleBroker() error {
-	broker := NewBroker("localhost:9092")
+	broker := NewBroker("localhost:9092", nil)
 	err := broker.Open()
 	if err != nil {
 		return err
@@ -163,7 +163,7 @@ func ExampleBroker() error {
 
 func TestBrokerAccessors(t *testing.T) {
 
-	broker := NewBroker("abc:123")
+	broker := NewBroker("abc:123", nil)
 
 	if broker.ID() != -1 {
 		t.Error("New broker didn't have an ID of -1.")
@@ -184,7 +184,7 @@ func TestSimpleBrokerCommunication(t *testing.T) {
 	mockBroker := NewMockBroker(t, responses)
 	defer mockBroker.Close()
 
-	broker := NewBroker(mockBroker.Addr())
+	broker := NewBroker(mockBroker.Addr(), nil)
 	err := broker.Open()
 	if err != nil {
 		t.Fatal(err)

+ 4 - 2
client.go

@@ -4,12 +4,14 @@ import (
 	"sort"
 	"sync"
 	"time"
+	"log"
 )
 
 // ClientConfig is used to pass multiple configuration options to NewClient.
 type ClientConfig struct {
 	MetadataRetries int           // How many times to retry a metadata request when a partition is in the middle of leader election.
 	WaitForElection time.Duration // How long to wait for leader election to finish between retries.
+	Logger          *log.Logger   // The interface to log broker connection events to.
 }
 
 // Client is a generic Kafka client. It manages connections to one or more Kafka brokers.
@@ -51,7 +53,7 @@ func NewClient(id string, addrs []string, config *ClientConfig) (*Client, error)
 		id:               id,
 		config:           *config,
 		extraBrokerAddrs: addrs,
-		extraBroker:      NewBroker(addrs[0]),
+		extraBroker:      NewBroker(addrs[0], config.Logger),
 		brokers:          make(map[int32]*Broker),
 		leaders:          make(map[string]map[int32]int32),
 	}
@@ -164,7 +166,7 @@ func (client *Client) disconnectBroker(broker *Broker) {
 	if broker == client.extraBroker {
 		client.extraBrokerAddrs = client.extraBrokerAddrs[1:]
 		if len(client.extraBrokerAddrs) > 0 {
-			client.extraBroker = NewBroker(client.extraBrokerAddrs[0])
+			client.extraBroker = NewBroker(client.extraBrokerAddrs[0], client.config.Logger)
 			client.extraBroker.Open()
 		} else {
 			client.extraBroker = nil