Explorar o código

Consumer configuration.

Evan Huus %!s(int64=12) %!d(string=hai) anos
pai
achega
eef2be2e6c
Modificáronse 4 ficheiros con 69 adicións e 12 borrados
  1. 59 5
      consumer.go
  2. 2 2
      consumer_test.go
  3. 4 1
      errors.go
  4. 4 4
      producer.go

+ 59 - 5
consumer.go

@@ -1,5 +1,20 @@
 package sarama
 
+// ConsumerConfig is used to pass multiple configuration options to NewConsumer.
+type ConsumerConfig struct {
+	// The default (maximum) amount of data to fetch from the broker in each request. The default of 0 is treated as 1024 bytes.
+	DefaultFetchSize int32
+	// The minimum amount of data to fetch in a request - the broker will wait until at least this many bytes are available.
+	// The default of 0 is treated as 'at least one' to prevent the consumer from spinning when no messages are available.
+	MinFetchSize int32
+	// The maximum permittable message size - messages larger than this will return MessageTooLarge. The default of 0 is
+	// treated as no limit.
+	MaxMessageSize int32
+	// The maximum amount of time (in ms) the broker will wait for MinFetchSize bytes to become available before it
+	// returns fewer than that anyways. The default of 0 is treated as no limit.
+	MaxWaitTime int32
+}
+
 // Consumer processes Kafka messages from a given topic and partition.
 // You MUST call Close() on a consumer to avoid leaks, it will not be garbage-collected automatically when
 // it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).
@@ -9,6 +24,7 @@ type Consumer struct {
 	topic     string
 	partition int32
 	group     string
+	config    ConsumerConfig
 
 	offset        int64
 	broker        *Broker
@@ -19,7 +35,27 @@ type Consumer struct {
 
 // NewConsumer creates a new consumer attached to the given client. It will read messages from the given topic and partition, as
 // part of the named consumer group.
-func NewConsumer(client *Client, topic string, partition int32, group string) (*Consumer, error) {
+func NewConsumer(client *Client, topic string, partition int32, group string, config ConsumerConfig) (*Consumer, error) {
+	if config.DefaultFetchSize < 0 {
+		return nil, ConfigurationError("Invalid DefaultFetchSize")
+	} else if config.DefaultFetchSize == 0 {
+		config.DefaultFetchSize = 1024
+	}
+
+	if config.MinFetchSize < 0 {
+		return nil, ConfigurationError("Invalid MinFetchSize")
+	} else if config.MinFetchSize == 0 {
+		config.MinFetchSize = 1
+	}
+
+	if config.MaxMessageSize < 0 {
+		return nil, ConfigurationError("Invalid MaxMessageSize")
+	}
+
+	if config.MaxWaitTime < 0 {
+		return nil, ConfigurationError("Invalid MaxWaitTime")
+	}
+
 	broker, err := client.leader(topic, partition)
 	if err != nil {
 		return nil, err
@@ -30,6 +66,7 @@ func NewConsumer(client *Client, topic string, partition int32, group string) (*
 	c.topic = topic
 	c.partition = partition
 	c.group = group
+	c.config = config
 
 	// We should really be sending an OffsetFetchRequest, but that doesn't seem to
 	// work in kafka yet. Hopefully will in beta 2...
@@ -86,12 +123,12 @@ func (c *Consumer) sendError(err error) bool {
 
 func (c *Consumer) fetchMessages() {
 
-	var fetchSize int32 = 1024
+	var fetchSize int32 = c.config.DefaultFetchSize
 
 	for {
 		request := new(FetchRequest)
-		request.MinBytes = 1
-		request.MaxWaitTime = 1000
+		request.MinBytes = c.config.MinFetchSize
+		request.MaxWaitTime = c.config.MaxWaitTime
 		request.AddBlock(c.topic, c.partition, c.offset, fetchSize)
 
 		response, err := c.broker.Fetch(c.client.id, request)
@@ -149,7 +186,22 @@ func (c *Consumer) fetchMessages() {
 			// We got no messages. If we got a trailing one then we need to ask for more data.
 			// Otherwise we just poll again and wait for one to be produced...
 			if block.MsgSet.PartialTrailingMessage {
-				fetchSize *= 2
+				if c.config.MaxMessageSize == 0 {
+					fetchSize *= 2
+				} else {
+					if fetchSize == c.config.MaxMessageSize {
+						if c.sendError(MessageTooLarge) {
+							continue
+						} else {
+							return
+						}
+					} else {
+						fetchSize *= 2
+						if fetchSize > c.config.MaxMessageSize {
+							fetchSize = c.config.MaxMessageSize
+						}
+					}
+				}
 			}
 			select {
 			case <-c.stopper:
@@ -160,6 +212,8 @@ func (c *Consumer) fetchMessages() {
 			default:
 				continue
 			}
+		} else {
+			fetchSize = c.config.DefaultFetchSize
 		}
 
 		for _, msgBlock := range block.MsgSet.Messages {

+ 2 - 2
consumer_test.go

@@ -69,7 +69,7 @@ func TestSimpleConsumer(t *testing.T) {
 		t.Fatal(err)
 	}
 
-	consumer, err := NewConsumer(client, "myTopic", 0, "myConsumerGroup")
+	consumer, err := NewConsumer(client, "myTopic", 0, "myConsumerGroup", ConsumerConfig{})
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -97,7 +97,7 @@ func ExampleConsumer() {
 		fmt.Println("> connected")
 	}
 
-	consumer, err := NewConsumer(client, "myTopic", 0, "myConsumerGroup")
+	consumer, err := NewConsumer(client, "myTopic", 0, "myConsumerGroup", ConsumerConfig{})
 	if err != nil {
 		panic(err)
 	} else {

+ 4 - 1
errors.go

@@ -36,12 +36,15 @@ var InsufficientData = errors.New("kafka: Insufficient data to decode packet, mo
 // This can be a bad CRC or length field, or any other invalid value.
 var DecodingError = errors.New("kafka: Error while decoding packet.")
 
+// MessageTooLarge is returned when the next message to consume is larger than the configured MaxFetchSize
+var MessageTooLarge = errors.New("kafka: Message is larger than MaxFetchSize")
+
 // ConfigurationError is the type of error returned from NewClient, NewProducer or NewConsumer when the specified
 // configuration is invalid.
 type ConfigurationError string
 
 func (err ConfigurationError) Error() string {
-	return "Invalid Configuration: " + string(err)
+	return "kafka: Invalid Configuration: " + string(err)
 }
 
 // KError is the type of error that can be returned directly by the Kafka broker.

+ 4 - 4
producer.go

@@ -26,15 +26,15 @@ func NewProducer(client *Client, topic string, config ProducerConfig) (*Producer
 		return nil, ConfigurationError("Invalid Timeout")
 	}
 
+	if config.Partitioner == nil {
+		config.Partitioner = RandomPartitioner{}
+	}
+
 	p := new(Producer)
 	p.client = client
 	p.topic = topic
 	p.config = config
 
-	if p.config.Partitioner == nil {
-		p.config.Partitioner = RandomPartitioner{}
-	}
-
 	return p, nil
 }