Browse Source

Publish broker

Evan Huus 11 years ago
parent
commit
e34ddd043d
5 changed files with 21 additions and 21 deletions
  1. 10 10
      broker.go
  2. 2 2
      client.go
  3. 5 5
      metadata_cache.go
  4. 2 2
      metadata_response.go
  5. 2 2
      producer.go

+ 10 - 10
broker.go

@@ -6,7 +6,7 @@ import (
 	"net"
 )
 
-type broker struct {
+type Broker struct {
 	id   int32
 	host *string
 	port int32
@@ -32,8 +32,8 @@ type requestToSend struct {
 	expectResponse bool
 }
 
-func newBroker(host string, port int32) (b *broker, err error) {
-	b = new(broker)
+func NewBroker(host string, port int32) (b *Broker, err error) {
+	b = new(Broker)
 	b.id = -1 // don't know it yet
 	b.host = &host
 	b.port = port
@@ -44,7 +44,7 @@ func newBroker(host string, port int32) (b *broker, err error) {
 	return b, nil
 }
 
-func (b *broker) connect() (err error) {
+func (b *Broker) connect() (err error) {
 	addr, err := net.ResolveIPAddr("ip", *b.host)
 	if err != nil {
 		return err
@@ -68,7 +68,7 @@ func (b *broker) connect() (err error) {
 	return nil
 }
 
-func (b *broker) forceDisconnect(reqRes *responsePromise, err error) {
+func (b *Broker) forceDisconnect(reqRes *responsePromise, err error) {
 	reqRes.errors <- err
 	close(reqRes.errors)
 	close(reqRes.packets)
@@ -79,13 +79,13 @@ func (b *broker) forceDisconnect(reqRes *responsePromise, err error) {
 	b.conn.Close()
 }
 
-func (b *broker) encode(pe packetEncoder) {
+func (b *Broker) encode(pe packetEncoder) {
 	pe.putInt32(b.id)
 	pe.putString(b.host)
 	pe.putInt32(b.port)
 }
 
-func (b *broker) decode(pd packetDecoder) (err error) {
+func (b *Broker) decode(pd packetDecoder) (err error) {
 	b.id, err = pd.getInt32()
 	if err != nil {
 		return err
@@ -112,7 +112,7 @@ func (b *broker) decode(pd packetDecoder) (err error) {
 	return nil
 }
 
-func (b *broker) sendRequestLoop() {
+func (b *Broker) sendRequestLoop() {
 	for request := range b.requests {
 		buf := <-request.response.packets
 		_, err := b.conn.Write(buf)
@@ -129,7 +129,7 @@ func (b *broker) sendRequestLoop() {
 	}
 }
 
-func (b *broker) rcvResponseLoop() {
+func (b *Broker) rcvResponseLoop() {
 	header := make([]byte, 8)
 	for response := range b.responses {
 		_, err := io.ReadFull(b.conn, header)
@@ -164,7 +164,7 @@ func (b *broker) rcvResponseLoop() {
 	}
 }
 
-func (b *broker) SendAndReceive(clientID *string, req requestEncoder) (decoder, error) {
+func (b *Broker) SendAndReceive(clientID *string, req requestEncoder) (decoder, error) {
 	fullRequest := request{b.correlation_id, clientID, req}
 	packet, err := buildBytes(&fullRequest)
 	if err != nil {

+ 2 - 2
client.go

@@ -15,7 +15,7 @@ func NewClient(id *string, host string, port int32) (client *Client, err error)
 	return client, nil
 }
 
-func (client *Client) Leader(topic string, partition_id int32) (*broker, error) {
+func (client *Client) leader(topic string, partition_id int32) (*Broker, error) {
 	leader := client.cache.leader(topic, partition_id)
 
 	if leader == nil {
@@ -34,7 +34,7 @@ func (client *Client) Leader(topic string, partition_id int32) (*broker, error)
 	return leader, nil
 }
 
-func (client *Client) Partitions(topic string) ([]int32, error) {
+func (client *Client) partitions(topic string) ([]int32, error) {
 	partitions := client.cache.partitions(topic)
 
 	if partitions == nil {

+ 5 - 5
metadata_cache.go

@@ -7,7 +7,7 @@ import (
 
 type metadataCache struct {
 	client  *Client
-	brokers map[int32]*broker          // maps broker ids to brokers
+	brokers map[int32]*Broker          // maps broker ids to brokers
 	leaders map[string]map[int32]int32 // maps topics to partition ids to broker ids
 	lock    sync.RWMutex               // protects access to the maps, only one since they're always accessed together
 }
@@ -15,13 +15,13 @@ type metadataCache struct {
 func newMetadataCache(client *Client, host string, port int32) (*metadataCache, error) {
 	mc := new(metadataCache)
 
-	starter, err := newBroker(host, port)
+	starter, err := NewBroker(host, port)
 	if err != nil {
 		return nil, err
 	}
 
 	mc.client = client
-	mc.brokers = make(map[int32]*broker)
+	mc.brokers = make(map[int32]*Broker)
 	mc.leaders = make(map[string]map[int32]int32)
 
 	mc.brokers[starter.id] = starter
@@ -35,7 +35,7 @@ func newMetadataCache(client *Client, host string, port int32) (*metadataCache,
 	return mc, nil
 }
 
-func (mc *metadataCache) leader(topic string, partition_id int32) *broker {
+func (mc *metadataCache) leader(topic string, partition_id int32) *Broker {
 	mc.lock.RLock()
 	defer mc.lock.RUnlock()
 
@@ -52,7 +52,7 @@ func (mc *metadataCache) leader(topic string, partition_id int32) *broker {
 	return nil
 }
 
-func (mc *metadataCache) any() *broker {
+func (mc *metadataCache) any() *Broker {
 	mc.lock.RLock()
 	defer mc.lock.RUnlock()
 

+ 2 - 2
metadata_response.go

@@ -1,7 +1,7 @@
 package kafka
 
 type metadataResponse struct {
-	brokers []broker
+	brokers []Broker
 	topics  []topicMetadata
 }
 
@@ -22,7 +22,7 @@ func (m *metadataResponse) decode(pd packetDecoder) (err error) {
 		return err
 	}
 
-	m.brokers = make([]broker, n)
+	m.brokers = make([]Broker, n)
 	for i := 0; i < n; i++ {
 		err = (&m.brokers[i]).decode(pd)
 		if err != nil {

+ 2 - 2
producer.go

@@ -17,7 +17,7 @@ func NewSimpleProducer(client *Client, topic string) *Producer {
 }
 
 func (p *Producer) choosePartition(key encoder) (int32, error) {
-	partitions, err := p.Partitions(p.topic)
+	partitions, err := p.partitions(p.topic)
 	if err != nil {
 		return -1, err
 	}
@@ -43,7 +43,7 @@ func (p *Producer) SendMessage(key, value encoder) (*ProduceResponse, error) {
 		return nil, err
 	}
 
-	broker, err := p.Leader(p.topic, partition)
+	broker, err := p.leader(p.topic, partition)
 	if err != nil {
 		return nil, err
 	}