Browse Source

Client.Close(), RoundRobinPartitioner, documentation

Evan Huus 12 years ago
parent
commit
46b3d6e171
3 changed files with 40 additions and 8 deletions
  1. 14 0
      kafka/client.go
  2. 18 0
      kafka/partitioner.go
  3. 8 8
      kafka/producer.go

+ 14 - 0
kafka/client.go

@@ -2,11 +2,18 @@ package kafka
 
 
 import k "sarama/protocol"
 import k "sarama/protocol"
 
 
+// Client is a generic Kafka client. It manages connections to one or more Kafka brokers.
+// You MUST call Close() on a client to avoid leaks, it will not be garbage-collected
+// automatically when it passes out of scope. A single client can be safely shared by
+// multiple concurrent Producers and Consumers.
 type Client struct {
 type Client struct {
 	id    string
 	id    string
 	cache *metadataCache
 	cache *metadataCache
 }
 }
 
 
+// NewClient creates a new Client with the given client ID. It connects to the broker at the given
+// host:port address, and uses that broker to automatically fetch metadata on the rest of the kafka cluster.
+// If metadata cannot be retrieved (even if the connection otherwise succeeds) then the client is not created.
 func NewClient(id string, host string, port int32) (client *Client, err error) {
 func NewClient(id string, host string, port int32) (client *Client, err error) {
 	client = new(Client)
 	client = new(Client)
 	client.id = id
 	client.id = id
@@ -17,6 +24,13 @@ func NewClient(id string, host string, port int32) (client *Client, err error) {
 	return client, nil
 	return client, nil
 }
 }
 
 
+// Close shuts down all broker connections managed by this client. It is required to call this function before
+// a client object passes out of scope, as it will otherwise leak memory. You must close any Producers or Consumers
+// using a client before you close the client.
+func (client *Client) Close() {
+	client.cache.closeAll()
+}
+
 func (client *Client) leader(topic string, partition_id int32) (*k.Broker, error) {
 func (client *Client) leader(topic string, partition_id int32) (*k.Broker, error) {
 	leader := client.cache.leader(topic, partition_id)
 	leader := client.cache.leader(topic, partition_id)
 
 

+ 18 - 0
kafka/partitioner.go

@@ -2,13 +2,31 @@ package kafka
 
 
 import "math/rand"
 import "math/rand"
 
 
+// Partitioner is anything that, given a Kafka message key and a number of partitions indexed [0...numPartitions-1],
+// decides to which partition to send the message. RandomPartitioner and RoundRobinPartitioner are the
+// two simple default implementations.
 type Partitioner interface {
 type Partitioner interface {
 	Partition(key Encoder, numPartitions int) int
 	Partition(key Encoder, numPartitions int) int
 }
 }
 
 
+// RandomPartitioner implements the Partitioner interface by choosing a random partition each time.
 type RandomPartitioner struct {
 type RandomPartitioner struct {
 }
 }
 
 
 func (p RandomPartitioner) Partition(key Encoder, numPartitions int) int {
 func (p RandomPartitioner) Partition(key Encoder, numPartitions int) int {
 	return rand.Intn(numPartitions)
 	return rand.Intn(numPartitions)
 }
 }
+
+// RoundRobinPartitioner implements the Partitioner interface by walking through the available partitions one at a time.
+type RoundRobinPartitioner struct {
+	partition int
+}
+
+func (p *RoundRobinPartitioner) Partition(key Encoder, numPartitions int) int {
+	if p.partition >= numPartitions {
+		p.partition = 0
+	}
+	ret := p.partition
+	p.partition++
+	return ret
+}

+ 8 - 8
kafka/producer.go

@@ -19,6 +19,14 @@ func NewSimpleProducer(client *Client, topic string) *Producer {
 	return NewProducer(client, topic, RandomPartitioner{}, k.WAIT_FOR_LOCAL, 0)
 	return NewProducer(client, topic, RandomPartitioner{}, k.WAIT_FOR_LOCAL, 0)
 }
 }
 
 
+func (p *Producer) SendMessage(key, value Encoder) error {
+	return p.safeSendMessage(key, value, 1)
+}
+
+func (p *Producer) SendSimpleMessage(msg string) error {
+	return p.safeSendMessage(nil, encodableString(msg), 1)
+}
+
 func (p *Producer) choosePartition(key Encoder) (int32, error) {
 func (p *Producer) choosePartition(key Encoder) (int32, error) {
 	partitions, err := p.client.partitions(p.topic)
 	partitions, err := p.client.partitions(p.topic)
 	if err != nil {
 	if err != nil {
@@ -104,11 +112,3 @@ func (p *Producer) safeSendMessage(key, value Encoder, retries int) error {
 		return block.Err
 		return block.Err
 	}
 	}
 }
 }
-
-func (p *Producer) SendMessage(key, value Encoder) error {
-	return p.safeSendMessage(key, value, 1)
-}
-
-func (p *Producer) SendSimpleMessage(msg string) error {
-	return p.safeSendMessage(nil, encodableString(msg), 1)
-}