Browse Source

Merge pull request #933 from wmille/timer_performance

Add an optional fast checker to (*partitionConsumer).responseFeeder
Evan Huus 8 years ago
parent
commit
298679cecd
4 changed files with 77 additions and 21 deletions
  1. 15 3
      config.go
  2. 19 17
      consumer.go
  3. 42 0
      consumer_test.go
  4. 1 1
      message_test.go

+ 15 - 3
config.go

@@ -196,11 +196,23 @@ type Config struct {
 		// Equivalent to the JVM's `fetch.wait.max.ms`.
 		MaxWaitTime time.Duration
 
-		// The maximum amount of time the consumer expects a message takes to process
-		// for the user. If writing to the Messages channel takes longer than this,
-		// that partition will stop fetching more messages until it can proceed again.
+		// The maximum amount of time the consumer expects a message takes to
+		// process for the user. If writing to the Messages channel takes longer
+		// than this, that partition will stop fetching more messages until it
+		// can proceed again.
 		// Note that, since the Messages channel is buffered, the actual grace time is
 		// (MaxProcessingTime * ChanneBufferSize). Defaults to 100ms.
+		// If a message is not written to the Messages channel between two ticks
+		// of the expiryTicker then a timeout is detected.
+		// Using a ticker instead of a timer to detect timeouts should typically
+		// result in many fewer calls to Timer functions which may result in a
+		// significant performance improvement if many messages are being sent
+		// and timeouts are infrequent.
+		// The disadvantage of using a ticker instead of a timer is that
+		// timeouts will be less accurate. That is, the effective timeout could
+		// be between `MaxProcessingTime` and `2 * MaxProcessingTime`. For
+		// example, if `MaxProcessingTime` is 100ms then a delay of 180ms
+		// between two messages being sent may not be recognized as a timeout.
 		MaxProcessingTime time.Duration
 
 		// Return specifies what channels will be populated. If they are set to true,

+ 19 - 17
consumer.go

@@ -440,35 +440,37 @@ func (child *partitionConsumer) HighWaterMarkOffset() int64 {
 
 func (child *partitionConsumer) responseFeeder() {
 	var msgs []*ConsumerMessage
-	expiryTimer := time.NewTimer(child.conf.Consumer.MaxProcessingTime)
-	expireTimedOut := false
+	msgSent := false
 
 feederLoop:
 	for response := range child.feeder {
 		msgs, child.responseResult = child.parseResponse(response)
+		expiryTicker := time.NewTicker(child.conf.Consumer.MaxProcessingTime)
 
 		for i, msg := range msgs {
-			if !expiryTimer.Stop() && !expireTimedOut {
-				// expiryTimer was expired; clear out the waiting msg
-				<-expiryTimer.C
-			}
-			expiryTimer.Reset(child.conf.Consumer.MaxProcessingTime)
-			expireTimedOut = false
-
+		messageSelect:
 			select {
 			case child.messages <- msg:
-			case <-expiryTimer.C:
-				expireTimedOut = true
-				child.responseResult = errTimedOut
-				child.broker.acks.Done()
-				for _, msg = range msgs[i:] {
-					child.messages <- msg
+				msgSent = true
+			case <-expiryTicker.C:
+				if !msgSent {
+					child.responseResult = errTimedOut
+					child.broker.acks.Done()
+					for _, msg = range msgs[i:] {
+						child.messages <- msg
+					}
+					child.broker.input <- child
+					continue feederLoop
+				} else {
+					// current message has not been sent, return to select
+					// statement
+					msgSent = false
+					goto messageSelect
 				}
-				child.broker.input <- child
-				continue feederLoop
 			}
 		}
 
+		expiryTicker.Stop()
 		child.broker.acks.Done()
 	}
 

+ 42 - 0
consumer_test.go

@@ -803,6 +803,48 @@ func TestConsumerOffsetOutOfRange(t *testing.T) {
 	broker0.Close()
 }
 
+func TestConsumerExpiryTicker(t *testing.T) {
+	// Given
+	broker0 := NewMockBroker(t, 0)
+	fetchResponse1 := &FetchResponse{}
+	for i := 1; i <= 8; i++ {
+		fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, int64(i))
+	}
+	broker0.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": NewMockMetadataResponse(t).
+			SetBroker(broker0.Addr(), broker0.BrokerID()).
+			SetLeader("my_topic", 0, broker0.BrokerID()),
+		"OffsetRequest": NewMockOffsetResponse(t).
+			SetOffset("my_topic", 0, OffsetNewest, 1234).
+			SetOffset("my_topic", 0, OffsetOldest, 1),
+		"FetchRequest": NewMockSequence(fetchResponse1),
+	})
+
+	config := NewConfig()
+	config.ChannelBufferSize = 0
+	config.Consumer.MaxProcessingTime = 10 * time.Millisecond
+	master, err := NewConsumer([]string{broker0.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// When
+	consumer, err := master.ConsumePartition("my_topic", 0, 1)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// Then: messages with offsets 1 through 8 are read
+	for i := 1; i <= 8; i++ {
+		assertMessageOffset(t, <-consumer.Messages(), int64(i))
+		time.Sleep(2 * time.Millisecond)
+	}
+
+	safeClose(t, consumer)
+	safeClose(t, master)
+	broker0.Close()
+}
+
 func assertMessageOffset(t *testing.T, msg *ConsumerMessage, expectedOffset int64) {
 	if msg.Offset != expectedOffset {
 		t.Errorf("Incorrect message offset: expected=%d, actual=%d", expectedOffset, msg.Offset)

+ 1 - 1
message_test.go

@@ -106,7 +106,7 @@ func TestMessageEncoding(t *testing.T) {
 
 	message.Value = []byte{}
 	message.Codec = CompressionGZIP
-	if runtime.Version() == "go1.8" {
+	if runtime.Version() == "go1.8" || runtime.Version() == "go1.8.1" {
 		testEncodable(t, "empty gzip", &message, emptyGzipMessage18)
 	} else {
 		testEncodable(t, "empty gzip", &message, emptyGzipMessage)