Browse Source

Merge pull request #115 from Shopify/fixes

Default configuration value fixes
Willem van Bergen 11 years ago
parent
commit
70f3ca2135
7 changed files with 52 additions and 20 deletions
  1. 7 0
      client_test.go
  2. 14 8
      consumer.go
  3. 7 0
      consumer_test.go
  4. 5 2
      produce_message.go
  5. 3 1
      produce_request_test.go
  6. 8 8
      producer.go
  7. 8 1
      producer_test.go

+ 7 - 0
client_test.go

@@ -4,6 +4,13 @@ import (
 	"testing"
 	"testing"
 )
 )
 
 
+func TestDefaultClientConfigValidates(t *testing.T) {
+	config := NewClientConfig()
+	if err := config.Validate(); err != nil {
+		t.Error(err)
+	}
+}
+
 func TestSimpleClient(t *testing.T) {
 func TestSimpleClient(t *testing.T) {
 
 
 	mb := NewMockBroker(t, 1)
 	mb := NewMockBroker(t, 1)

+ 14 - 8
consumer.go

@@ -1,5 +1,9 @@
 package sarama
 package sarama
 
 
+import (
+	"time"
+)
+
 // OffsetMethod is passed in ConsumerConfig to tell the consumer how to determine the starting offset.
 // OffsetMethod is passed in ConsumerConfig to tell the consumer how to determine the starting offset.
 type OffsetMethod int
 type OffsetMethod int
 
 
@@ -25,10 +29,10 @@ type ConsumerConfig struct {
 	// The maximum permittable message size - messages larger than this will return MessageTooLarge. The default of 0 is
 	// The maximum permittable message size - messages larger than this will return MessageTooLarge. The default of 0 is
 	// treated as no limit.
 	// treated as no limit.
 	MaxMessageSize int32
 	MaxMessageSize int32
-	// The maximum amount of time (in ms) the broker will wait for MinFetchSize bytes to become available before it
+	// The maximum amount of time the broker will wait for MinFetchSize bytes to become available before it
 	// returns fewer than that anyways. The default is 250ms, since 0 causes the consumer to spin when no events are available.
 	// returns fewer than that anyways. The default is 250ms, since 0 causes the consumer to spin when no events are available.
-	// 100-500ms is a reasonable range for most cases.
-	MaxWaitTime int32
+	// 100-500ms is a reasonable range for most cases. Kafka only supports precision up to milliseconds; nanoseconds will be truncated.
+	MaxWaitTime time.Duration
 
 
 	// The method used to determine at which offset to begin consuming messages.
 	// The method used to determine at which offset to begin consuming messages.
 	OffsetMethod OffsetMethod
 	OffsetMethod OffsetMethod
@@ -164,7 +168,7 @@ func (c *Consumer) fetchMessages() {
 	for {
 	for {
 		request := new(FetchRequest)
 		request := new(FetchRequest)
 		request.MinBytes = c.config.MinFetchSize
 		request.MinBytes = c.config.MinFetchSize
-		request.MaxWaitTime = c.config.MaxWaitTime
+		request.MaxWaitTime = int32(c.config.MaxWaitTime / time.Millisecond)
 		request.AddBlock(c.topic, c.partition, c.offset, fetchSize)
 		request.AddBlock(c.topic, c.partition, c.offset, fetchSize)
 
 
 		response, err := c.broker.Fetch(c.client.id, request)
 		response, err := c.broker.Fetch(c.client.id, request)
@@ -311,7 +315,7 @@ func NewConsumerConfig() *ConsumerConfig {
 	return &ConsumerConfig{
 	return &ConsumerConfig{
 		DefaultFetchSize: 32768,
 		DefaultFetchSize: 32768,
 		MinFetchSize:     1,
 		MinFetchSize:     1,
-		MaxWaitTime:      250,
+		MaxWaitTime:      250 * time.Millisecond,
 		EventBufferSize:  16,
 		EventBufferSize:  16,
 	}
 	}
 }
 }
@@ -331,10 +335,12 @@ func (config *ConsumerConfig) Validate() error {
 		return ConfigurationError("Invalid MaxMessageSize")
 		return ConfigurationError("Invalid MaxMessageSize")
 	}
 	}
 
 
-	if config.MaxWaitTime <= 0 {
-		return ConfigurationError("Invalid MaxWaitTime")
-	} else if config.MaxWaitTime < 100 {
+	if config.MaxWaitTime < 1*time.Millisecond {
+		return ConfigurationError("Invalid MaxWaitTime, it needs to be at least 1ms")
+	} else if config.MaxWaitTime < 100*time.Millisecond {
 		Logger.Println("ConsumerConfig.MaxWaitTime is very low, which can cause high CPU and network usage. See documentation for details.")
 		Logger.Println("ConsumerConfig.MaxWaitTime is very low, which can cause high CPU and network usage. See documentation for details.")
+	} else if config.MaxWaitTime%time.Millisecond != 0 {
+		Logger.Println("ConsumerConfig.MaxWaitTime only supports millisecond precision; nanoseconds will be truncated.")
 	}
 	}
 
 
 	if config.EventBufferSize < 0 {
 	if config.EventBufferSize < 0 {

+ 7 - 0
consumer_test.go

@@ -6,6 +6,13 @@ import (
 	"time"
 	"time"
 )
 )
 
 
+func TestDefaultConsumerConfigValidates(t *testing.T) {
+	config := NewConsumerConfig()
+	if err := config.Validate(); err != nil {
+		t.Error(err)
+	}
+}
+
 func TestSimpleConsumer(t *testing.T) {
 func TestSimpleConsumer(t *testing.T) {
 	mb1 := NewMockBroker(t, 1)
 	mb1 := NewMockBroker(t, 1)
 	mb2 := NewMockBroker(t, 2)
 	mb2 := NewMockBroker(t, 2)

+ 5 - 2
produce_message.go

@@ -1,6 +1,9 @@
 package sarama
 package sarama
 
 
-import "log"
+import (
+	"log"
+	"time"
+)
 
 
 type produceMessage struct {
 type produceMessage struct {
 	tp         topicPartition
 	tp         topicPartition
@@ -45,7 +48,7 @@ func (msg *produceMessage) hasTopicPartition(topic string, partition int32) bool
 }
 }
 
 
 func (b produceRequestBuilder) toRequest(config *ProducerConfig) *ProduceRequest {
 func (b produceRequestBuilder) toRequest(config *ProducerConfig) *ProduceRequest {
-	req := &ProduceRequest{RequiredAcks: config.RequiredAcks, Timeout: config.Timeout}
+	req := &ProduceRequest{RequiredAcks: config.RequiredAcks, Timeout: int32(config.Timeout / time.Millisecond)}
 
 
 	// If compression is enabled, we need to group messages by topic-partition and
 	// If compression is enabled, we need to group messages by topic-partition and
 	// wrap them in MessageSets. We already discarded that grouping, so we
 	// wrap them in MessageSets. We already discarded that grouping, so we

+ 3 - 1
produce_request_test.go

@@ -1,6 +1,8 @@
 package sarama
 package sarama
 
 
-import "testing"
+import (
+	"testing"
+)
 
 
 var (
 var (
 	produceRequestEmpty = []byte{
 	produceRequestEmpty = []byte{

+ 8 - 8
producer.go

@@ -15,11 +15,11 @@ import (
 // channel.
 // channel.
 type ProducerConfig struct {
 type ProducerConfig struct {
 	Partitioner      Partitioner      // Chooses the partition to send messages to, or randomly if this is nil.
 	Partitioner      Partitioner      // Chooses the partition to send messages to, or randomly if this is nil.
-	RequiredAcks     RequiredAcks     // The level of acknowledgement reliability needed from the broker (defaults to no acknowledgement).
-	Timeout          int32            // The maximum time in ms the broker will wait the receipt of the number of RequiredAcks.
+	RequiredAcks     RequiredAcks     // The level of acknowledgement reliability needed from the broker (defaults to WaitForLocal).
+	Timeout          time.Duration    // The maximum duration the broker will wait the receipt of the number of RequiredAcks. This is only relevant when RequiredAcks is set to WaitForAll or a number > 1. Only supports millisecond resolution, nanoseconds will be truncated.
 	Compression      CompressionCodec // The type of compression to use on messages (defaults to no compression).
 	Compression      CompressionCodec // The type of compression to use on messages (defaults to no compression).
 	MaxBufferedBytes uint32           // The maximum number of bytes to buffer per-broker before sending to Kafka.
 	MaxBufferedBytes uint32           // The maximum number of bytes to buffer per-broker before sending to Kafka.
-	MaxBufferTime    uint32           // The maximum number of milliseconds to buffer messages before sending to a broker.
+	MaxBufferTime    time.Duration    // The maximum duration to buffer messages before sending to a broker.
 }
 }
 
 
 // Producer publishes Kafka messages. It routes messages to the correct broker
 // Producer publishes Kafka messages. It routes messages to the correct broker
@@ -207,13 +207,11 @@ func (p *Producer) newBrokerProducer(broker *Broker) *brokerProducer {
 		hasMessages: make(chan bool, 1),
 		hasMessages: make(chan bool, 1),
 	}
 	}
 
 
-	maxBufferTime := time.Duration(p.config.MaxBufferTime) * time.Millisecond
-
 	var wg sync.WaitGroup
 	var wg sync.WaitGroup
 	wg.Add(1)
 	wg.Add(1)
 
 
 	go func() {
 	go func() {
-		timer := time.NewTimer(maxBufferTime)
+		timer := time.NewTimer(p.config.MaxBufferTime)
 		var shutdownRequired bool
 		var shutdownRequired bool
 		wg.Done()
 		wg.Done()
 		for {
 		for {
@@ -229,7 +227,7 @@ func (p *Producer) newBrokerProducer(broker *Broker) *brokerProducer {
 			case <-bp.stopper:
 			case <-bp.stopper:
 				goto shutdown
 				goto shutdown
 			}
 			}
-			timer.Reset(maxBufferTime)
+			timer.Reset(p.config.MaxBufferTime)
 		}
 		}
 	shutdown:
 	shutdown:
 		delete(p.brokerProducers, bp.broker)
 		delete(p.brokerProducers, bp.broker)
@@ -456,7 +454,7 @@ func NewProducerConfig() *ProducerConfig {
 	return &ProducerConfig{
 	return &ProducerConfig{
 		Partitioner:      NewRandomPartitioner(),
 		Partitioner:      NewRandomPartitioner(),
 		RequiredAcks:     WaitForLocal,
 		RequiredAcks:     WaitForLocal,
-		MaxBufferTime:    1,
+		MaxBufferTime:    1 * time.Millisecond,
 		MaxBufferedBytes: 1,
 		MaxBufferedBytes: 1,
 	}
 	}
 }
 }
@@ -470,6 +468,8 @@ func (config *ProducerConfig) Validate() error {
 
 
 	if config.Timeout < 0 {
 	if config.Timeout < 0 {
 		return ConfigurationError("Invalid Timeout")
 		return ConfigurationError("Invalid Timeout")
+	} else if config.Timeout%time.Millisecond != 0 {
+		Logger.Println("ProducerConfig.Timeout only supports millisecond resolution; nanoseconds will be truncated.")
 	}
 	}
 
 
 	if config.MaxBufferedBytes == 0 {
 	if config.MaxBufferedBytes == 0 {

+ 8 - 1
producer_test.go

@@ -10,11 +10,18 @@ const TestMessage = "ABC THE MESSAGE"
 
 
 func defaultProducerConfig() *ProducerConfig {
 func defaultProducerConfig() *ProducerConfig {
 	config := NewProducerConfig()
 	config := NewProducerConfig()
-	config.MaxBufferTime = 1000000                                // don't flush based on time
+	config.MaxBufferTime = 1000000 * time.Millisecond             // don't flush based on time
 	config.MaxBufferedBytes = uint32((len(TestMessage) * 10) - 1) // flush after 10 messages
 	config.MaxBufferedBytes = uint32((len(TestMessage) * 10) - 1) // flush after 10 messages
 	return config
 	return config
 }
 }
 
 
+func TestDefaultProducerConfigValidates(t *testing.T) {
+	config := NewProducerConfig()
+	if err := config.Validate(); err != nil {
+		t.Error(err)
+	}
+}
+
 func TestSimpleProducer(t *testing.T) {
 func TestSimpleProducer(t *testing.T) {
 
 
 	mb1 := NewMockBroker(t, 1)
 	mb1 := NewMockBroker(t, 1)