Browse Source

Merge pull request #11 from Shopify/extendable_configuration

Extendable configuration
Evan Huus 12 years ago
parent
commit
722c9b21ae
7 changed files with 148 additions and 41 deletions
  1. 21 5
      client.go
  2. 4 4
      client_test.go
  3. 63 5
      consumer.go
  4. 4 4
      consumer_test.go
  5. 11 0
      errors.go
  6. 35 19
      producer.go
  7. 10 4
      producer_test.go

+ 21 - 5
client.go

@@ -6,12 +6,19 @@ import (
 	"time"
 )
 
+// ClientConfig is used to pass multiple configuration options to NewClient.
+type ClientConfig struct {
+	MetadataRetries int           // How many times to retry a metadata request when a partition is in the middle of leader election.
+	WaitForElection time.Duration // How long to wait for leader election to finish between retries.
+}
+
 // Client is a generic Kafka client. It manages connections to one or more Kafka brokers.
 // You MUST call Close() on a client to avoid leaks, it will not be garbage-collected
 // automatically when it passes out of scope. A single client can be safely shared by
 // multiple concurrent Producers and Consumers.
 type Client struct {
-	id      string                     // client id for broker requests
+	id      string
+	config  ClientConfig
 	brokers map[int32]*Broker          // maps broker ids to brokers
 	leaders map[string]map[int32]int32 // maps topics to partition ids to broker ids
 	lock    sync.RWMutex               // protects access to the maps, only one since they're always written together
@@ -20,7 +27,15 @@ type Client struct {
 // NewClient creates a new Client with the given client ID. It connects to the broker at the given
 // host:port address, and uses that broker to automatically fetch metadata on the rest of the kafka cluster.
 // If metadata cannot be retrieved (even if the connection otherwise succeeds) then the client is not created.
-func NewClient(id string, host string, port int32) (client *Client, err error) {
+func NewClient(id string, host string, port int32, config *ClientConfig) (client *Client, err error) {
+	if config == nil {
+		config = new(ClientConfig)
+	}
+
+	if config.MetadataRetries < 0 {
+		return nil, ConfigurationError("Invalid MetadataRetries")
+	}
+
 	tmp := NewBroker(host, port)
 	err = tmp.Open()
 	if err != nil {
@@ -33,6 +48,7 @@ func NewClient(id string, host string, port int32) (client *Client, err error) {
 
 	client = new(Client)
 	client.id = id
+	client.config = *config
 
 	client.brokers = make(map[int32]*Broker)
 	client.leaders = make(map[string]map[int32]int32)
@@ -43,7 +59,7 @@ func NewClient(id string, host string, port int32) (client *Client, err error) {
 	client.brokers[tmp.ID()] = tmp
 
 	// do an initial fetch of all cluster metadata by specifing an empty list of topics
-	err = client.refreshTopics(make([]string, 0), 3)
+	err = client.refreshTopics(make([]string, 0), client.config.MetadataRetries)
 	if err != nil {
 		client.Close() // this closes tmp, since it's still in the brokers hash
 		return nil, err
@@ -126,7 +142,7 @@ func (client *Client) refreshTopic(topic string) error {
 	tmp := make([]string, 1)
 	tmp[0] = topic
 	// we permit three retries by default, 'cause that seemed like a nice number
-	return client.refreshTopics(tmp, 3)
+	return client.refreshTopics(tmp, client.config.MetadataRetries)
 }
 
 // truly private helper functions
@@ -148,7 +164,7 @@ func (client *Client) refreshTopics(topics []string, retries int) error {
 				if retries <= 0 {
 					return LEADER_NOT_AVAILABLE
 				}
-				time.Sleep(250 * time.Millisecond) // wait for leader election
+				time.Sleep(client.config.WaitForElection) // wait for leader election
 				return client.refreshTopics(retry, retries-1)
 			}
 		case EncodingError:

+ 4 - 4
client_test.go

@@ -13,7 +13,7 @@ func TestSimpleClient(t *testing.T) {
 	// Only one response needed, an empty metadata response
 	responses <- []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
 
-	client, err := NewClient("clientID", "localhost", mockBroker.Port())
+	client, err := NewClient("clientID", "localhost", mockBroker.Port(), nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -37,7 +37,7 @@ func TestClientExtraBrokers(t *testing.T) {
 	binary.BigEndian.PutUint32(response[19:], uint32(mockExtra.Port()))
 	responses <- response
 
-	client, err := NewClient("clientID", "localhost", mockBroker.Port())
+	client, err := NewClient("clientID", "localhost", mockBroker.Port(), nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -70,7 +70,7 @@ func TestClientMetadata(t *testing.T) {
 	binary.BigEndian.PutUint32(response[19:], uint32(mockExtra.Port()))
 	responses <- response
 
-	client, err := NewClient("clientID", "localhost", mockBroker.Port())
+	client, err := NewClient("clientID", "localhost", mockBroker.Port(), nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -131,7 +131,7 @@ func TestClientRefreshBehaviour(t *testing.T) {
 		0x00, 0x00, 0x00, 0x00,
 		0x00, 0x00, 0x00, 0x00}
 
-	client, err := NewClient("clientID", "localhost", mockBroker.Port())
+	client, err := NewClient("clientID", "localhost", mockBroker.Port(), &ClientConfig{MetadataRetries: 1})
 	if err != nil {
 		t.Fatal(err)
 	}

+ 63 - 5
consumer.go

@@ -1,5 +1,20 @@
 package sarama
 
+// ConsumerConfig is used to pass multiple configuration options to NewConsumer.
+type ConsumerConfig struct {
+	// The default (maximum) amount of data to fetch from the broker in each request. The default of 0 is treated as 1024 bytes.
+	DefaultFetchSize int32
+	// The minimum amount of data to fetch in a request - the broker will wait until at least this many bytes are available.
+	// The default of 0 is treated as 'at least one' to prevent the consumer from spinning when no messages are available.
+	MinFetchSize int32
+	// The maximum permittable message size - messages larger than this will return MessageTooLarge. The default of 0 is
+	// treated as no limit.
+	MaxMessageSize int32
+	// The maximum amount of time (in ms) the broker will wait for MinFetchSize bytes to become available before it
+	// returns fewer than that anyways. The default of 0 is treated as no limit.
+	MaxWaitTime int32
+}
+
 // Consumer processes Kafka messages from a given topic and partition.
 // You MUST call Close() on a consumer to avoid leaks, it will not be garbage-collected automatically when
 // it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).
@@ -9,6 +24,7 @@ type Consumer struct {
 	topic     string
 	partition int32
 	group     string
+	config    ConsumerConfig
 
 	offset        int64
 	broker        *Broker
@@ -19,7 +35,31 @@ type Consumer struct {
 
 // NewConsumer creates a new consumer attached to the given client. It will read messages from the given topic and partition, as
 // part of the named consumer group.
-func NewConsumer(client *Client, topic string, partition int32, group string) (*Consumer, error) {
+func NewConsumer(client *Client, topic string, partition int32, group string, config *ConsumerConfig) (*Consumer, error) {
+	if config == nil {
+		config = new(ConsumerConfig)
+	}
+
+	if config.DefaultFetchSize < 0 {
+		return nil, ConfigurationError("Invalid DefaultFetchSize")
+	} else if config.DefaultFetchSize == 0 {
+		config.DefaultFetchSize = 1024
+	}
+
+	if config.MinFetchSize < 0 {
+		return nil, ConfigurationError("Invalid MinFetchSize")
+	} else if config.MinFetchSize == 0 {
+		config.MinFetchSize = 1
+	}
+
+	if config.MaxMessageSize < 0 {
+		return nil, ConfigurationError("Invalid MaxMessageSize")
+	}
+
+	if config.MaxWaitTime < 0 {
+		return nil, ConfigurationError("Invalid MaxWaitTime")
+	}
+
 	broker, err := client.leader(topic, partition)
 	if err != nil {
 		return nil, err
@@ -30,6 +70,7 @@ func NewConsumer(client *Client, topic string, partition int32, group string) (*
 	c.topic = topic
 	c.partition = partition
 	c.group = group
+	c.config = *config
 
 	// We should really be sending an OffsetFetchRequest, but that doesn't seem to
 	// work in kafka yet. Hopefully will in beta 2...
@@ -86,12 +127,12 @@ func (c *Consumer) sendError(err error) bool {
 
 func (c *Consumer) fetchMessages() {
 
-	var fetchSize int32 = 1024
+	var fetchSize int32 = c.config.DefaultFetchSize
 
 	for {
 		request := new(FetchRequest)
-		request.MinBytes = 1
-		request.MaxWaitTime = 1000
+		request.MinBytes = c.config.MinFetchSize
+		request.MaxWaitTime = c.config.MaxWaitTime
 		request.AddBlock(c.topic, c.partition, c.offset, fetchSize)
 
 		response, err := c.broker.Fetch(c.client.id, request)
@@ -149,7 +190,22 @@ func (c *Consumer) fetchMessages() {
 			// We got no messages. If we got a trailing one then we need to ask for more data.
 			// Otherwise we just poll again and wait for one to be produced...
 			if block.MsgSet.PartialTrailingMessage {
-				fetchSize *= 2
+				if c.config.MaxMessageSize == 0 {
+					fetchSize *= 2
+				} else {
+					if fetchSize == c.config.MaxMessageSize {
+						if c.sendError(MessageTooLarge) {
+							continue
+						} else {
+							return
+						}
+					} else {
+						fetchSize *= 2
+						if fetchSize > c.config.MaxMessageSize {
+							fetchSize = c.config.MaxMessageSize
+						}
+					}
+				}
 			}
 			select {
 			case <-c.stopper:
@@ -160,6 +216,8 @@ func (c *Consumer) fetchMessages() {
 			default:
 				continue
 			}
+		} else {
+			fetchSize = c.config.DefaultFetchSize
 		}
 
 		for _, msgBlock := range block.MsgSet.Messages {

+ 4 - 4
consumer_test.go

@@ -64,12 +64,12 @@ func TestSimpleConsumer(t *testing.T) {
 			0x00, 0x00, 0x00, 0x00}
 	}()
 
-	client, err := NewClient("clientID", "localhost", mockBroker.Port())
+	client, err := NewClient("clientID", "localhost", mockBroker.Port(), nil)
 	if err != nil {
 		t.Fatal(err)
 	}
 
-	consumer, err := NewConsumer(client, "myTopic", 0, "myConsumerGroup")
+	consumer, err := NewConsumer(client, "myTopic", 0, "myConsumerGroup", nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -90,14 +90,14 @@ func TestSimpleConsumer(t *testing.T) {
 }
 
 func ExampleConsumer() {
-	client, err := NewClient("myClient", "localhost", 9092)
+	client, err := NewClient("myClient", "localhost", 9092, nil)
 	if err != nil {
 		panic(err)
 	} else {
 		fmt.Println("> connected")
 	}
 
-	consumer, err := NewConsumer(client, "myTopic", 0, "myConsumerGroup")
+	consumer, err := NewConsumer(client, "myTopic", 0, "myConsumerGroup", nil)
 	if err != nil {
 		panic(err)
 	} else {

+ 11 - 0
errors.go

@@ -36,6 +36,17 @@ var InsufficientData = errors.New("kafka: Insufficient data to decode packet, mo
 // This can be a bad CRC or length field, or any other invalid value.
 var DecodingError = errors.New("kafka: Error while decoding packet.")
 
+// MessageTooLarge is returned when the next message to consume is larger than the configured MaxFetchSize
+var MessageTooLarge = errors.New("kafka: Message is larger than MaxFetchSize")
+
+// ConfigurationError is the type of error returned from NewClient, NewProducer or NewConsumer when the specified
+// configuration is invalid.
+type ConfigurationError string
+
+func (err ConfigurationError) Error() string {
+	return "kafka: Invalid Configuration: " + string(err)
+}
+
 // KError is the type of error that can be returned directly by the Kafka broker.
 // See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes
 type KError int16

+ 35 - 19
producer.go

@@ -1,26 +1,49 @@
 package sarama
 
+// ProducerConfig is used to pass multiple configuration options to NewProducer.
+type ProducerConfig struct {
+	Partitioner  Partitioner  // Chooses the partition to send messages to, or randomly if this is nil.
+	RequiredAcks RequiredAcks // The level of acknowledgement reliability needed from the broker (defaults to no acknowledgement).
+	Timeout      int32        // The maximum time in ms the broker will wait the receipt of the number of RequiredAcks.
+}
+
 // Producer publishes Kafka messages on a given topic. It routes messages to the correct broker, refreshing metadata as appropriate,
 // and parses responses for errors. A Producer itself does not need to be closed (thus no Close method) but you still need to close
 // its underlying Client.
 type Producer struct {
-	client      *Client
-	topic       string
-	partitioner Partitioner
+	client *Client
+	topic  string
+	config ProducerConfig
 }
 
-// NewProducer creates a new Producer using the given client. The resulting producer will publish messages on the given topic,
-// and partition messages using the given partitioner.
-func NewProducer(client *Client, topic string, partitioner Partitioner) *Producer {
+// NewProducer creates a new Producer using the given client. The resulting producer will publish messages on the given topic.
+func NewProducer(client *Client, topic string, config *ProducerConfig) (*Producer, error) {
+	if config == nil {
+		config = new(ProducerConfig)
+	}
+
+	if config.RequiredAcks < -1 {
+		return nil, ConfigurationError("Invalid RequiredAcks")
+	}
+
+	if config.Timeout < 0 {
+		return nil, ConfigurationError("Invalid Timeout")
+	}
+
+	if config.Partitioner == nil {
+		config.Partitioner = RandomPartitioner{}
+	}
+
 	p := new(Producer)
 	p.client = client
 	p.topic = topic
-	p.partitioner = partitioner
-	return p
+	p.config = *config
+
+	return p, nil
 }
 
-// SendMessage sends a message with the given key and value. If key is nil, the partition to send to is selected randomly, otherwise it
-// is selected by the Producer's Partitioner. To send strings as either key or value, see the StringEncoder type.
+// SendMessage sends a message with the given key and value. The partition to send to is selected by the Producer's Partitioner.
+// To send strings as either key or value, see the StringEncoder type.
 func (p *Producer) SendMessage(key, value Encoder) error {
 	return p.safeSendMessage(key, value, true)
 }
@@ -31,14 +54,7 @@ func (p *Producer) choosePartition(key Encoder) (int32, error) {
 		return -1, err
 	}
 
-	var partitioner Partitioner
-	if key == nil {
-		partitioner = RandomPartitioner{}
-	} else {
-		partitioner = p.partitioner
-	}
-
-	choice := partitioner.Partition(key, len(partitions))
+	choice := p.config.Partitioner.Partition(key, len(partitions))
 
 	if choice >= len(partitions) {
 		return -1, InvalidPartition
@@ -72,7 +88,7 @@ func (p *Producer) safeSendMessage(key, value Encoder, retry bool) error {
 		return err
 	}
 
-	request := &ProduceRequest{RequiredAcks: WAIT_FOR_LOCAL, Timeout: 0}
+	request := &ProduceRequest{RequiredAcks: p.config.RequiredAcks, Timeout: p.config.Timeout}
 	request.AddMessage(p.topic, partition, &Message{Key: keyBytes, Value: valBytes})
 
 	response, err := broker.Produce(p.client.id, request)

+ 10 - 4
producer_test.go

@@ -45,12 +45,15 @@ func TestSimpleProducer(t *testing.T) {
 		}
 	}()
 
-	client, err := NewClient("clientID", "localhost", mockBroker.Port())
+	client, err := NewClient("clientID", "localhost", mockBroker.Port(), nil)
 	if err != nil {
 		t.Fatal(err)
 	}
 
-	producer := NewProducer(client, "myTopic", &RandomPartitioner{})
+	producer, err := NewProducer(client, "myTopic", &ProducerConfig{RequiredAcks: WAIT_FOR_LOCAL})
+	if err != nil {
+		t.Fatal(err)
+	}
 	for i := 0; i < 10; i++ {
 		err = producer.SendMessage(nil, StringEncoder("ABC THE MESSAGE"))
 		if err != nil {
@@ -62,13 +65,16 @@ func TestSimpleProducer(t *testing.T) {
 }
 
 func ExampleProducer() {
-	client, err := NewClient("myClient", "localhost", 9092)
+	client, err := NewClient("myClient", "localhost", 9092, nil)
 	if err != nil {
 		panic(err)
 	} else {
 		fmt.Println("> connected")
 	}
-	producer := NewProducer(client, "myTopic", RandomPartitioner{})
+	producer, err := NewProducer(client, "myTopic", &ProducerConfig{RequiredAcks: WAIT_FOR_LOCAL})
+	if err != nil {
+		panic(err)
+	}
 
 	err = producer.SendMessage(nil, StringEncoder("testing 123"))
 	if err != nil {