Browse Source

Merge pull request #14 from Shopify/more_configuration

More configuration
Evan Huus 12 years ago
parent
commit
b7d535890d
2 changed files with 12 additions and 8 deletions
  1. 7 4
      consumer.go
  2. 5 4
      producer.go

+ 7 - 4
consumer.go

@@ -13,6 +13,8 @@ type ConsumerConfig struct {
 	// The maximum amount of time (in ms) the broker will wait for MinFetchSize bytes to become available before it
 	// returns fewer than that anyways. The default of 0 is treated as no limit.
 	MaxWaitTime int32
+	// The offset to start fetching messages from
+	StartingOffset int64
 }
 
 // Consumer processes Kafka messages from a given topic and partition.
@@ -60,6 +62,10 @@ func NewConsumer(client *Client, topic string, partition int32, group string, co
 		return nil, ConfigurationError("Invalid MaxWaitTime")
 	}
 
+	if config.StartingOffset < 0 {
+		return nil, ConfigurationError("Invalid StartingOffset")
+	}
+
 	broker, err := client.leader(topic, partition)
 	if err != nil {
 		return nil, err
@@ -71,10 +77,7 @@ func NewConsumer(client *Client, topic string, partition int32, group string, co
 	c.partition = partition
 	c.group = group
 	c.config = *config
-
-	// We should really be sending an OffsetFetchRequest, but that doesn't seem to
-	// work in kafka yet. Hopefully will in beta 2...
-	c.offset = 0
+	c.offset = config.StartingOffset
 	c.broker = broker
 	c.stopper = make(chan bool)
 	c.done = make(chan bool)

+ 5 - 4
producer.go

@@ -2,9 +2,10 @@ package sarama
 
 // ProducerConfig is used to pass multiple configuration options to NewProducer.
 type ProducerConfig struct {
-	Partitioner  Partitioner  // Chooses the partition to send messages to, or randomly if this is nil.
-	RequiredAcks RequiredAcks // The level of acknowledgement reliability needed from the broker (defaults to no acknowledgement).
-	Timeout      int32        // The maximum time in ms the broker will wait the receipt of the number of RequiredAcks.
+	Partitioner  Partitioner      // Chooses the partition to send messages to, or randomly if this is nil.
+	RequiredAcks RequiredAcks     // The level of acknowledgement reliability needed from the broker (defaults to no acknowledgement).
+	Timeout      int32            // The maximum time in ms the broker will wait the receipt of the number of RequiredAcks.
+	Compression  CompressionCodec // The type of compression to use on messages (defaults to no compression).
 }
 
 // Producer publishes Kafka messages on a given topic. It routes messages to the correct broker, refreshing metadata as appropriate,
@@ -98,7 +99,7 @@ func (p *Producer) safeSendMessage(key, value Encoder, retry bool) error {
 	}
 
 	request := &ProduceRequest{RequiredAcks: p.config.RequiredAcks, Timeout: p.config.Timeout}
-	request.AddMessage(p.topic, partition, &Message{Key: keyBytes, Value: valBytes})
+	request.AddMessage(p.topic, partition, &Message{Codec: p.config.Compression, Key: keyBytes, Value: valBytes})
 
 	response, err := broker.Produce(p.client.id, request)
 	switch err {