Quellcode durchsuchen

Merge pull request #336 from Shopify/configuration_tweaks

Configuration tweaks
Willem van Bergen vor 10 Jahren
Ursprung
Commit
35543a89bc
9 geänderte Dateien mit 132 neuen und 55 gelöschten Zeilen
  1. 20 5
      config.go
  2. 24 20
      consumer.go
  3. 48 3
      consumer_test.go
  4. 4 2
      functional_test.go
  5. 5 3
      mocks/producer.go
  6. 1 1
      mocks/producer_test.go
  7. 18 10
      producer.go
  8. 10 10
      producer_test.go
  9. 2 1
      sync_producer.go

+ 20 - 5
config.go

@@ -42,7 +42,9 @@ type Config struct {
 		// Similar to the `partitioner.class` setting for the JVM producer.
 		Partitioner PartitionerConstructor
 		// If enabled, successfully delivered messages will be returned on the Successes channel (default disabled).
-		AckSuccesses bool
+		ReturnSuccesses bool
+		// If enabled, messages that failed to deliver will be returned on the Errors channel, including error (default enabled).
+		ReturnErrors bool
 
 		// The following config options control how often messages are batched up and sent to the broker. By default,
 		// messages are sent as fast as possible, and all messages received while the current batch is in-flight are placed
@@ -68,6 +70,11 @@ type Config struct {
 
 	// Consumer is the namespace for configuration related to consuming messages, used by the Consumer.
 	Consumer struct {
+		Retry struct {
+			// How long to wait after a failing to read from a partition before trying again (default 2s).
+			Backoff time.Duration
+		}
+
 		// Fetch is the namespace for controlling how many bytes are retrieved by any given request.
 		Fetch struct {
 			// The minimum number of message bytes to fetch in a request - the broker will wait until at least this many are available.
@@ -87,6 +94,9 @@ type Config struct {
 		// 100-500ms is a reasonable range for most cases. Kafka only supports precision up to milliseconds; nanoseconds will be truncated.
 		// Equivalent to the JVM's `fetch.wait.max.ms`.
 		MaxWaitTime time.Duration
+
+		// If enabled, any errors that occured while consuming are returned on the Errors channel (default disabled).
+		ReturnErrors bool
 	}
 
 	// A user-provided string sent with every request to the brokers for logging, debugging, and auditing purposes.
@@ -117,10 +127,13 @@ func NewConfig() *Config {
 	c.Producer.Partitioner = NewHashPartitioner
 	c.Producer.Retry.Max = 3
 	c.Producer.Retry.Backoff = 100 * time.Millisecond
+	c.Producer.ReturnErrors = true
 
 	c.Consumer.Fetch.Min = 1
 	c.Consumer.Fetch.Default = 32768
+	c.Consumer.Retry.Backoff = 2 * time.Second
 	c.Consumer.MaxWaitTime = 250 * time.Millisecond
+	c.Consumer.ReturnErrors = false
 
 	c.ChannelBufferSize = 256
 
@@ -175,7 +188,7 @@ func (c *Config) Validate() error {
 		return ConfigurationError("Invalid Metadata.RefreshFrequency, must be >= 0")
 	}
 
-	// validate the Produce values
+	// validate the Producer values
 	switch {
 	case c.Producer.MaxMessageBytes <= 0:
 		return ConfigurationError("Invalid Producer.MaxMessageBytes, must be > 0")
@@ -196,12 +209,12 @@ func (c *Config) Validate() error {
 	case c.Producer.Flush.MaxMessages > 0 && c.Producer.Flush.MaxMessages < c.Producer.Flush.Messages:
 		return ConfigurationError("Invalid Producer.Flush.MaxMessages, must be >= Producer.Flush.Messages when set")
 	case c.Producer.Retry.Max < 0:
-		return ConfigurationError("Invalid Producer.MaxRetries, must be >= 0")
+		return ConfigurationError("Invalid Producer.Retry.Max, must be >= 0")
 	case c.Producer.Retry.Backoff < 0:
-		return ConfigurationError("Invalid Producer.RetryBackoff, must be >= 0")
+		return ConfigurationError("Invalid Producer.Retry.Backoff, must be >= 0")
 	}
 
-	// validate the Consume values
+	// validate the Consumer values
 	switch {
 	case c.Consumer.Fetch.Min <= 0:
 		return ConfigurationError("Invalid Consumer.Fetch.Min, must be > 0")
@@ -211,6 +224,8 @@ func (c *Config) Validate() error {
 		return ConfigurationError("Invalid Consumer.Fetch.Max, must be >= 0")
 	case c.Consumer.MaxWaitTime < 1*time.Millisecond:
 		return ConfigurationError("Invalid Consumer.MaxWaitTime, must be > 1ms")
+	case c.Consumer.Retry.Backoff < 0:
+		return ConfigurationError("Invalid Consumer.Retry.Backoff, must be >= 0")
 	}
 
 	// validate misc shared values

+ 24 - 20
consumer.go

@@ -207,9 +207,16 @@ func (c *consumer) unrefBrokerConsumer(broker *Broker) {
 // PartitionConsumer
 
 // PartitionConsumer processes Kafka messages from a given topic and partition. You MUST call Close()
-// on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of
-// scope (this is in addition to calling Close on the underlying consumer's client, which is still necessary).
-// You have to read from both the Messages and Errors channels to prevent the consumer from locking eventually.
+// or AsyncClose() on a consumer to avoid leaks, it will not be garbage-collected automatically when
+// it passes out of scope (this is in addition to calling Close on the underlying consumer's client,
+// which is still necessary).
+//
+// The simplest way of using a PartitionCOnsumer is to loop over if Messages channel using a for/range
+// loop. The PartitionConsumer will under no circumstances stop by itself once it is started. It will
+// just keep retrying ig it encounters errors. By default, it just logs these errors to sarama.Logger;
+// if you want to handle errors yourself, set your config's Consumer.ReturnErrors to true, and read
+// from the Errors channel as well, using a select statement or in a separate goroutine. Check out
+// the examples of Consumer to see examples of these different approaches.
 type PartitionConsumer interface {
 
 	// AsyncClose initiates a shutdown of the PartitionConsumer. This method will return immediately,
@@ -224,15 +231,13 @@ type PartitionConsumer interface {
 	// call this before calling Close on the underlying client.
 	Close() error
 
-	// Errors returns the read channel for any errors that occurred while consuming the partition.
-	// You have to read this channel to prevent the consumer from deadlock. Under no circumstances,
-	// the partition consumer will shut down by itself. It will just wait until it is able to continue
-	// consuming messages. If you want to shut down your consumer, you will have trigger it yourself
-	// by consuming this channel and calling Close or AsyncClose when appropriate.
-	Errors() <-chan *ConsumerError
-
-	// Messages returns the read channel for the messages that are returned by the broker
+	// Messages returns the read channel for the messages that are returned by the broker.
 	Messages() <-chan *ConsumerMessage
+
+	// Errors returns a read channel of errors that occured during consuming, if enabled. By default,
+	// errors are logged and not returned over this channel. If you want to implement any custom errpr
+	// handling, set your config's Consumer.ReturnErrors setting to true, and read from this channel.
+	Errors() <-chan *ConsumerError
 }
 
 type partitionConsumer struct {
@@ -251,11 +256,17 @@ type partitionConsumer struct {
 }
 
 func (child *partitionConsumer) sendError(err error) {
-	child.errors <- &ConsumerError{
+	cErr := &ConsumerError{
 		Topic:     child.topic,
 		Partition: child.partition,
 		Err:       err,
 	}
+
+	if child.conf.Consumer.ReturnErrors {
+		child.errors <- cErr
+	} else {
+		Logger.Println(cErr)
+	}
 }
 
 func (child *partitionConsumer) dispatcher() {
@@ -263,7 +274,7 @@ func (child *partitionConsumer) dispatcher() {
 		select {
 		case <-child.dying:
 			close(child.trigger)
-		default:
+		case <-time.After(child.conf.Consumer.Retry.Backoff):
 			if child.broker != nil {
 				child.consumer.unrefBrokerConsumer(child.broker)
 				child.broker = nil
@@ -272,13 +283,6 @@ func (child *partitionConsumer) dispatcher() {
 			if err := child.dispatch(); err != nil {
 				child.sendError(err)
 				child.trigger <- none{}
-
-				// there's no point in trying again *right* away
-				select {
-				case <-child.dying:
-					close(child.trigger)
-				case <-time.After(10 * time.Second):
-				}
 			}
 		}
 	}

+ 48 - 3
consumer_test.go

@@ -142,7 +142,9 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
 	seedBroker.Returns(metadataResponse)
 
 	// launch test goroutines
-	master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
+	config := NewConfig()
+	config.Consumer.Retry.Backoff = 0
+	master, err := NewConsumer([]string{seedBroker.Addr()}, config)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -290,10 +292,50 @@ func TestConsumerInterleavedClose(t *testing.T) {
 	seedBroker.Close()
 }
 
+// This example has the simplest use case of the consumer. It simply
+// iterates over the messages channel using a for/range loop. Because
+// a producer never stopsunless requested, a signal handler is registered
+// so we can trigger a clean shutdown of the consumer.
+func ExampleConsumer_for_loop() {
+	master, err := NewConsumer([]string{"localhost:9092"}, nil)
+	if err != nil {
+		log.Fatalln(err)
+	}
+	defer func() {
+		if err := master.Close(); err != nil {
+			log.Fatalln(err)
+		}
+	}()
+
+	consumer, err := master.ConsumePartition("my_topic", 0, 0)
+	if err != nil {
+		log.Fatalln(err)
+	}
+
+	go func() {
+		// By default, the consumer will always keep going, unless we tell it to stop.
+		// In this case, we capture the SIGINT signal so we can tell the consumer to stop
+		signals := make(chan os.Signal, 1)
+		signal.Notify(signals, os.Interrupt)
+		<-signals
+		consumer.AsyncClose()
+	}()
+
+	msgCount := 0
+	for message := range consumer.Messages() {
+		log.Println(string(message.Value))
+		msgCount++
+	}
+	log.Println("Processed", msgCount, "messages.")
+}
+
 // This example shows how to use a consumer with a select statement
 // dealing with the different channels.
 func ExampleConsumer_select() {
-	master, err := NewConsumer([]string{"localhost:9092"}, nil)
+	config := NewConfig()
+	config.Consumer.ReturnErrors = true // Handle errors manually instead of letting Sarama log them.
+
+	master, err := NewConsumer([]string{"localhost:9092"}, config)
 	if err != nil {
 		log.Fatalln(err)
 	}
@@ -336,7 +378,10 @@ consumerLoop:
 // This example shows how to use a consumer with different goroutines
 // to read from the Messages and Errors channels.
 func ExampleConsumer_goroutines() {
-	master, err := NewConsumer([]string{"localhost:9092"}, nil)
+	config := NewConfig()
+	config.Consumer.ReturnErrors = true // Handle errors manually instead of letting Sarama log them.
+
+	master, err := NewConsumer([]string{"localhost:9092"}, config)
 	if err != nil {
 		log.Fatalln(err)
 	}

+ 4 - 2
functional_test.go

@@ -90,7 +90,7 @@ func TestFuncMultiPartitionProduce(t *testing.T) {
 	config.ChannelBufferSize = 20
 	config.Producer.Flush.Frequency = 50 * time.Millisecond
 	config.Producer.Flush.Messages = 200
-	config.Producer.AckSuccesses = true
+	config.Producer.ReturnSuccesses = true
 	producer, err := NewProducer([]string{kafkaAddr}, config)
 	if err != nil {
 		t.Fatal(err)
@@ -122,7 +122,9 @@ func TestFuncMultiPartitionProduce(t *testing.T) {
 func testProducingMessages(t *testing.T, config *Config) {
 	checkKafkaAvailability(t)
 
-	config.Producer.AckSuccesses = true
+	config.Producer.ReturnSuccesses = true
+	config.Consumer.ReturnErrors = true
+
 	client, err := NewClient([]string{kafkaAddr}, config)
 	if err != nil {
 		t.Fatal(err)

+ 5 - 3
mocks/producer.go

@@ -52,11 +52,13 @@ func NewProducer(t ErrorReporter, config *sarama.Config) *Producer {
 				expectation := mp.expectations[0]
 				mp.expectations = mp.expectations[1:]
 				if expectation.Result == errProduceSuccess {
-					if config.Producer.AckSuccesses {
+					if config.Producer.ReturnSuccesses {
 						mp.successes <- msg
 					}
 				} else {
-					mp.errors <- &sarama.ProducerError{Err: expectation.Result, Msg: msg}
+					if config.Producer.ReturnErrors {
+						mp.errors <- &sarama.ProducerError{Err: expectation.Result, Msg: msg}
+					}
 				}
 			}
 			mp.l.Unlock()
@@ -119,7 +121,7 @@ func (mp *Producer) Errors() <-chan *sarama.ProducerError {
 
 // ExpectInputAndSucceed sets an expectation on the mock producer that a message will be provided
 // on the input channel. The mock producer will handle the message as if it is produced successfully,
-// i.e. it will make it available on the Successes channel if the Producer.AckSuccesses setting
+// i.e. it will make it available on the Successes channel if the Producer.ReturnSuccesses setting
 // is set to true.
 func (mp *Producer) ExpectInputAndSucceed() {
 	mp.l.Lock()

+ 1 - 1
mocks/producer_test.go

@@ -28,7 +28,7 @@ func TestMockProducerImplementsProducerInterface(t *testing.T) {
 
 func TestProducerReturnsExpectationsToChannels(t *testing.T) {
 	config := sarama.NewConfig()
-	config.Producer.AckSuccesses = true
+	config.Producer.ReturnSuccesses = true
 	mp := NewProducer(t, config)
 
 	mp.ExpectInputAndSucceed()

+ 18 - 10
producer.go

@@ -37,12 +37,13 @@ type Producer interface {
 	Input() chan<- *ProducerMessage
 
 	// Successes is the success output channel back to the user when AckSuccesses is confured.
-	// If AckSuccesses is true, you MUST read from this channel or the Producer will deadlock.
+	// If ReturnSuccesses is true, you MUST read from this channel or the Producer will deadlock.
 	// It is suggested that you send and read messages together in a single select statement.
 	Successes() <-chan *ProducerMessage
 
-	// Errors is the error output channel back to the user. You MUST read from this channel or the Producer will deadlock.
-	// It is suggested that you send messages and read errors together in a single select statement.
+	// Errors is the error output channel back to the user. You MUST read from this channel
+	// or the Producer will deadlock when the channel is full. Alternatively, you can set
+	// Producer.ReturnErrors in your config to false, which prevents errors to be returned.
 	Errors() <-chan *ProducerError
 }
 
@@ -178,7 +179,7 @@ func (p *producer) Input() chan<- *ProducerMessage {
 func (p *producer) Close() error {
 	p.AsyncClose()
 
-	if p.conf.Producer.AckSuccesses {
+	if p.conf.Producer.ReturnSuccesses {
 		go withRecover(func() {
 			for _ = range p.successes {
 			}
@@ -186,8 +187,10 @@ func (p *producer) Close() error {
 	}
 
 	var errors ProducerErrors
-	for event := range p.errors {
-		errors = append(errors, event)
+	if p.conf.Producer.ReturnErrors {
+		for event := range p.errors {
+			errors = append(errors, event)
+		}
 	}
 
 	if len(errors) > 0 {
@@ -258,7 +261,7 @@ func (p *producer) topicDispatcher() {
 	if p.ownClient {
 		err := p.client.Close()
 		if err != nil {
-			p.errors <- &ProducerError{Err: err}
+			Logger.Println("producer/shutdown failed to close the embedded client:", err)
 		}
 	}
 	close(p.errors)
@@ -538,7 +541,7 @@ func (p *producer) flusher(broker *Broker, input chan []*ProducerMessage) {
 
 		if response == nil {
 			// this only happens when RequiredAcks is NoResponse, so we have to assume success
-			if p.conf.Producer.AckSuccesses {
+			if p.conf.Producer.ReturnSuccesses {
 				p.returnSuccesses(batch)
 			}
 			continue
@@ -558,7 +561,7 @@ func (p *producer) flusher(broker *Broker, input chan []*ProducerMessage) {
 				switch block.Err {
 				case ErrNoError:
 					// All the messages for this topic-partition were delivered successfully!
-					if p.conf.Producer.AckSuccesses {
+					if p.conf.Producer.ReturnSuccesses {
 						for i := range msgs {
 							msgs[i].offset = block.Offset + int64(i)
 						}
@@ -731,7 +734,12 @@ func (p *producer) buildRequest(batch map[string]map[int32][]*ProducerMessage) *
 func (p *producer) returnError(msg *ProducerMessage, err error) {
 	msg.flags = 0
 	msg.retries = 0
-	p.errors <- &ProducerError{Msg: msg, Err: err}
+	pErr := &ProducerError{Msg: msg, Err: err}
+	if p.conf.Producer.ReturnErrors {
+		p.errors <- pErr
+	} else {
+		Logger.Println(pErr)
+	}
 }
 
 func (p *producer) returnErrors(batch []*ProducerMessage, err error) {

+ 10 - 10
producer_test.go

@@ -125,7 +125,7 @@ func TestProducer(t *testing.T) {
 
 	config := NewConfig()
 	config.Producer.Flush.Messages = 10
-	config.Producer.AckSuccesses = true
+	config.Producer.ReturnSuccesses = true
 	producer, err := NewProducer([]string{seedBroker.Addr()}, config)
 	if err != nil {
 		t.Fatal(err)
@@ -173,7 +173,7 @@ func TestProducerMultipleFlushes(t *testing.T) {
 
 	config := NewConfig()
 	config.Producer.Flush.Messages = 5
-	config.Producer.AckSuccesses = true
+	config.Producer.ReturnSuccesses = true
 	producer, err := NewProducer([]string{seedBroker.Addr()}, config)
 	if err != nil {
 		t.Fatal(err)
@@ -225,7 +225,7 @@ func TestProducerMultipleBrokers(t *testing.T) {
 
 	config := NewConfig()
 	config.Producer.Flush.Messages = 5
-	config.Producer.AckSuccesses = true
+	config.Producer.ReturnSuccesses = true
 	config.Producer.Partitioner = NewRoundRobinPartitioner
 	producer, err := NewProducer([]string{seedBroker.Addr()}, config)
 	if err != nil {
@@ -267,7 +267,7 @@ func TestProducerFailureRetry(t *testing.T) {
 
 	config := NewConfig()
 	config.Producer.Flush.Messages = 10
-	config.Producer.AckSuccesses = true
+	config.Producer.ReturnSuccesses = true
 	config.Producer.Retry.Backoff = 0
 	producer, err := NewProducer([]string{seedBroker.Addr()}, config)
 	if err != nil {
@@ -339,7 +339,7 @@ func TestProducerBrokerBounce(t *testing.T) {
 
 	config := NewConfig()
 	config.Producer.Flush.Messages = 10
-	config.Producer.AckSuccesses = true
+	config.Producer.ReturnSuccesses = true
 	config.Producer.Retry.Backoff = 0
 	producer, err := NewProducer([]string{seedBroker.Addr()}, config)
 	if err != nil {
@@ -387,7 +387,7 @@ func TestProducerBrokerBounceWithStaleMetadata(t *testing.T) {
 
 	config := NewConfig()
 	config.Producer.Flush.Messages = 10
-	config.Producer.AckSuccesses = true
+	config.Producer.ReturnSuccesses = true
 	config.Producer.Retry.Max = 3
 	config.Producer.Retry.Backoff = 0
 	producer, err := NewProducer([]string{seedBroker.Addr()}, config)
@@ -442,7 +442,7 @@ func TestProducerMultipleRetries(t *testing.T) {
 
 	config := NewConfig()
 	config.Producer.Flush.Messages = 10
-	config.Producer.AckSuccesses = true
+	config.Producer.ReturnSuccesses = true
 	config.Producer.Retry.Max = 4
 	config.Producer.Retry.Backoff = 0
 	producer, err := NewProducer([]string{seedBroker.Addr()}, config)
@@ -522,7 +522,7 @@ func TestProducerOutOfRetries(t *testing.T) {
 
 	config := NewConfig()
 	config.Producer.Flush.Messages = 10
-	config.Producer.AckSuccesses = true
+	config.Producer.ReturnSuccesses = true
 	config.Producer.Retry.Backoff = 0
 	config.Producer.Retry.Max = 0
 	producer, err := NewProducer([]string{seedBroker.Addr()}, config)
@@ -610,10 +610,10 @@ ProducerLoop:
 // This example shows how to use the producer with separate goroutines
 // reading from the Successes and Errors channels. Note that in order
 // for the Successes channel to be populated, you have to set
-// config.Producer.AckSuccesses to true.
+// config.Producer.ReturnSuccesses to true.
 func ExampleProducer_goroutines() {
 	config := NewConfig()
-	config.Producer.AckSuccesses = true
+	config.Producer.ReturnSuccesses = true
 	producer, err := NewProducer([]string{"localhost:9092"}, config)
 	if err != nil {
 		panic(err)

+ 2 - 1
sync_producer.go

@@ -40,7 +40,8 @@ func NewSyncProducerFromClient(client *Client) (SyncProducer, error) {
 }
 
 func newSyncProducerFromProducer(p *producer) *syncProducer {
-	p.conf.Producer.AckSuccesses = true
+	p.conf.Producer.ReturnSuccesses = true
+	p.conf.Producer.ReturnErrors = true
 	sp := &syncProducer{producer: p}
 
 	sp.wg.Add(2)