Browse Source

Replace broker's host/port with just addr

Simplifies much of the splitting/joining logic: the only join is now on decoding
a broker in a metadata response.
Evan Huus 11 years ago
parent
commit
dae9785f7e
7 changed files with 36 additions and 49 deletions
  1. 11 16
      broker.go
  2. 11 13
      broker_test.go
  3. 2 2
      client.go
  4. 4 4
      client_test.go
  5. 2 2
      consumer_test.go
  6. 4 10
      metadata_response_test.go
  7. 2 2
      producer_test.go

+ 11 - 16
broker.go

@@ -1,6 +1,7 @@
 package sarama
 
 import (
+	"fmt"
 	"io"
 	"net"
 	"sync"
@@ -9,8 +10,7 @@ import (
 // Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
 type Broker struct {
 	id   int32
-	host string
-	port int32
+	addr string
 
 	correlation_id int32
 	conn           net.Conn
@@ -29,11 +29,10 @@ type responsePromise struct {
 
 // NewBroker creates and returns a Broker targetting the given host:port address.
 // This does not attempt to actually connect, you have to call Open() for that.
-func NewBroker(host string, port int32) *Broker {
+func NewBroker(addr string) *Broker {
 	b := new(Broker)
 	b.id = -1 // don't know it yet
-	b.host = host
-	b.port = port
+	b.addr = addr
 	return b
 }
 
@@ -52,13 +51,7 @@ func (b *Broker) Open() error {
 	go func() {
 		defer b.lock.Unlock()
 
-		var addr *net.IPAddr
-		addr, b.conn_err = net.ResolveIPAddr("ip", b.host)
-		if b.conn_err != nil {
-			return
-		}
-
-		b.conn, b.conn_err = net.DialTCP("tcp", nil, &net.TCPAddr{IP: addr.IP, Port: int(b.port)})
+		b.conn, b.conn_err = net.Dial("tcp", b.addr)
 		if b.conn_err != nil {
 			return
 		}
@@ -109,7 +102,7 @@ func (b *Broker) ID() int32 {
 	return b.id
 }
 
-// Equals compares two brokers. Two brokers are considered equal if they have the same host, port, and id,
+// Equals compares two brokers. Two brokers are considered equal if they have the same address and id,
 // or if they are both nil.
 func (b *Broker) Equals(a *Broker) bool {
 	switch {
@@ -118,7 +111,7 @@ func (b *Broker) Equals(a *Broker) bool {
 	case (a == nil && b != nil) || (a != nil && b == nil):
 		return false
 	}
-	return a.id == b.id && a.host == b.host && a.port == b.port
+	return a.id == b.id && a.addr == b.addr
 }
 
 func (b *Broker) GetMetadata(clientID string, request *MetadataRequest) (*MetadataResponse, error) {
@@ -260,16 +253,18 @@ func (b *Broker) decode(pd packetDecoder) (err error) {
 		return err
 	}
 
-	b.host, err = pd.getString()
+	host, err := pd.getString()
 	if err != nil {
 		return err
 	}
 
-	b.port, err = pd.getInt32()
+	port, err := pd.getInt32()
 	if err != nil {
 		return err
 	}
 
+	b.addr = fmt.Sprint(host, ":", port)
+
 	return nil
 }
 

+ 11 - 13
broker_test.go

@@ -28,11 +28,14 @@ type MockBroker struct {
 	t         *testing.T
 }
 
-// Port is the kernel-select TCP port the broker is listening on.
 func (b *MockBroker) Port() int32 {
 	return b.port
 }
 
+func (b *MockBroker) Addr() string {
+	return b.listener.Addr().String()
+}
+
 // Close closes the response channel originally provided, then waits to make sure
 // that all requests/responses matched up before exiting.
 func (b *MockBroker) Close() {
@@ -140,7 +143,7 @@ func NewMockBroker(t *testing.T, responses chan []byte) *MockBroker {
 }
 
 func ExampleBroker() error {
-	broker := NewBroker("localhost", 9092)
+	broker := NewBroker("localhost:9092")
 	err := broker.Open()
 	if err != nil {
 		return err
@@ -168,7 +171,7 @@ func TestBrokerEquals(t *testing.T) {
 		t.Error("Two nil brokers didn't compare equal.")
 	}
 
-	b1 = NewBroker("abc", 123)
+	b1 = NewBroker("abc:123")
 
 	if b1.Equals(b2) {
 		t.Error("Non-nil and nil brokers compared equal.")
@@ -177,17 +180,12 @@ func TestBrokerEquals(t *testing.T) {
 		t.Error("Nil and non-nil brokers compared equal.")
 	}
 
-	b2 = NewBroker("abc", 1234)
-	if b1.Equals(b2) || b2.Equals(b1) {
-		t.Error("Brokers with different ports compared equal.")
-	}
-
-	b2 = NewBroker("abcd", 123)
+	b2 = NewBroker("abc:1234")
 	if b1.Equals(b2) || b2.Equals(b1) {
-		t.Error("Brokers with different hosts compared equal.")
+		t.Error("Brokers with different addrs compared equal.")
 	}
 
-	b2 = NewBroker("abc", 123)
+	b2 = NewBroker("abc:123")
 	b2.id = -2
 	if b1.Equals(b2) || b2.Equals(b1) {
 		t.Error("Brokers with different ids compared equal.")
@@ -201,7 +199,7 @@ func TestBrokerEquals(t *testing.T) {
 
 func TestBrokerID(t *testing.T) {
 
-	broker := NewBroker("abc", 123)
+	broker := NewBroker("abc:123")
 
 	if broker.ID() != -1 {
 		t.Error("New broker didn't have an ID of -1.")
@@ -218,7 +216,7 @@ func TestSimpleBrokerCommunication(t *testing.T) {
 	mockBroker := NewMockBroker(t, responses)
 	defer mockBroker.Close()
 
-	broker := NewBroker("localhost", mockBroker.Port())
+	broker := NewBroker(mockBroker.Addr())
 	err := broker.Open()
 	if err != nil {
 		t.Fatal(err)

+ 2 - 2
client.go

@@ -27,7 +27,7 @@ type Client struct {
 // NewClient creates a new Client with the given client ID. It connects to the broker at the given
 // host:port address, and uses that broker to automatically fetch metadata on the rest of the kafka cluster.
 // If metadata cannot be retrieved (even if the connection otherwise succeeds) then the client is not created.
-func NewClient(id string, host string, port int32, config *ClientConfig) (client *Client, err error) {
+func NewClient(id string, addr string, config *ClientConfig) (client *Client, err error) {
 	if config == nil {
 		config = new(ClientConfig)
 	}
@@ -36,7 +36,7 @@ func NewClient(id string, host string, port int32, config *ClientConfig) (client
 		return nil, ConfigurationError("Invalid MetadataRetries")
 	}
 
-	tmp := NewBroker(host, port)
+	tmp := NewBroker(addr)
 	err = tmp.Open()
 	if err != nil {
 		return nil, err

+ 4 - 4
client_test.go

@@ -13,7 +13,7 @@ func TestSimpleClient(t *testing.T) {
 	// Only one response needed, an empty metadata response
 	responses <- []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
 
-	client, err := NewClient("clientID", "localhost", mockBroker.Port(), nil)
+	client, err := NewClient("clientID", mockBroker.Addr(), nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -37,7 +37,7 @@ func TestClientExtraBrokers(t *testing.T) {
 	binary.BigEndian.PutUint32(response[19:], uint32(mockExtra.Port()))
 	responses <- response
 
-	client, err := NewClient("clientID", "localhost", mockBroker.Port(), nil)
+	client, err := NewClient("clientID", mockBroker.Addr(), nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -70,7 +70,7 @@ func TestClientMetadata(t *testing.T) {
 	binary.BigEndian.PutUint32(response[19:], uint32(mockExtra.Port()))
 	responses <- response
 
-	client, err := NewClient("clientID", "localhost", mockBroker.Port(), nil)
+	client, err := NewClient("clientID", mockBroker.Addr(), nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -130,7 +130,7 @@ func TestClientRefreshBehaviour(t *testing.T) {
 		0x00, 0x00, 0x00, 0x00,
 		0x00, 0x00, 0x00, 0x00}
 
-	client, err := NewClient("clientID", "localhost", mockBroker.Port(), &ClientConfig{MetadataRetries: 1})
+	client, err := NewClient("clientID", mockBroker.Addr(), &ClientConfig{MetadataRetries: 1})
 	if err != nil {
 		t.Fatal(err)
 	}

+ 2 - 2
consumer_test.go

@@ -64,7 +64,7 @@ func TestSimpleConsumer(t *testing.T) {
 			0x00, 0x00, 0x00, 0x00}
 	}()
 
-	client, err := NewClient("clientID", "localhost", mockBroker.Port(), nil)
+	client, err := NewClient("clientID", mockBroker.Addr(), nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -89,7 +89,7 @@ func TestSimpleConsumer(t *testing.T) {
 }
 
 func ExampleConsumer() {
-	client, err := NewClient("myClient", "localhost", 9092, nil)
+	client, err := NewClient("myClient", "localhost:9092", nil)
 	if err != nil {
 		panic(err)
 	} else {

+ 4 - 10
metadata_response_test.go

@@ -58,20 +58,14 @@ func TestMetadataResponseWithBrokers(t *testing.T) {
 		if response.Brokers[0].id != 0xabff {
 			t.Error("Decoding produced invalid broker 0 id.")
 		}
-		if response.Brokers[0].host != "localhost" {
-			t.Error("Decoding produced invalid broker 0 host.")
-		}
-		if response.Brokers[0].port != 0x33 {
-			t.Error("Decoding produced invalid broker 0 port.")
+		if response.Brokers[0].addr != "localhost:51" {
+			t.Error("Decoding produced invalid broker 0 address.")
 		}
 		if response.Brokers[1].id != 0x010203 {
 			t.Error("Decoding produced invalid broker 1 id.")
 		}
-		if response.Brokers[1].host != "google.com" {
-			t.Error("Decoding produced invalid broker 1 host.")
-		}
-		if response.Brokers[1].port != 0x111 {
-			t.Error("Decoding produced invalid broker 1 port.")
+		if response.Brokers[1].addr != "google.com:273" {
+			t.Error("Decoding produced invalid broker 1 address.")
 		}
 	} else {
 		t.Error("Decoding produced", len(response.Brokers), "brokers where there were two!")

+ 2 - 2
producer_test.go

@@ -45,7 +45,7 @@ func TestSimpleProducer(t *testing.T) {
 		}
 	}()
 
-	client, err := NewClient("clientID", "localhost", mockBroker.Port(), nil)
+	client, err := NewClient("clientID", mockBroker.Addr(), nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -66,7 +66,7 @@ func TestSimpleProducer(t *testing.T) {
 }
 
 func ExampleProducer() {
-	client, err := NewClient("myClient", "localhost", 9092, nil)
+	client, err := NewClient("myClient", "localhost:9092", nil)
 	if err != nil {
 		panic(err)
 	} else {