Browse Source

Add an optional fast checker to (*partitionConsumer).responseFeeder

Add a ticker in (*partitionConsumer).responseFeeder to frequently check for
messages before starting the timeout timer. Using the fast checker significantly
reduces the number of timer function calls when timeouts are infrequent.
Vagrant User 8 years ago
parent
commit
7bbb175476
4 changed files with 148 additions and 9 deletions
  1. 20 0
      config.go
  2. 41 8
      consumer.go
  3. 86 0
      consumer_test.go
  4. 1 1
      message_test.go

+ 20 - 0
config.go

@@ -197,6 +197,23 @@ type Config struct {
 		// (MaxProcessingTime * ChanneBufferSize). Defaults to 100ms.
 		MaxProcessingTime time.Duration
 
+		// The time interval between ticks of the fast checker. A value of 0
+		// turns off the fast checker.
+		// If this is set to a non-zero value, then there will be periodic
+		// checks to see if messages have been written to the Messages channel.
+		// If a message has not been written to the Messages channel since the
+		// last tick of the fast checker, then the timer will be set.
+		// Using the fast checker should typically result in many fewer calls to
+		// Timer functions resulting in a significant performance improvement if
+		// many messages are being sent and timeouts are infrequent.
+		// The disadvantage of using the fast checker is that timeouts will be
+		// less accurate. That is, the effective timeout could be between
+		// `MaxProcessingTime` and `MaxProcessingTime + FastCheckerInterval`.
+		// For example, if `MaxProcessingTime` is 100ms and
+		// `FastCheckerInterval` is 10ms, then a delay of 108ms between two
+		// messages being sent may not be recognized as a timeout.
+		FastCheckerInterval time.Duration
+
 		// Return specifies what channels will be populated. If they are set to true,
 		// you must read from them to prevent deadlock.
 		Return struct {
@@ -277,6 +294,7 @@ func NewConfig() *Config {
 	c.Consumer.Retry.Backoff = 2 * time.Second
 	c.Consumer.MaxWaitTime = 250 * time.Millisecond
 	c.Consumer.MaxProcessingTime = 100 * time.Millisecond
+	c.Consumer.FastCheckerInterval = 0
 	c.Consumer.Return.Errors = false
 	c.Consumer.Offsets.CommitInterval = 1 * time.Second
 	c.Consumer.Offsets.Initial = OffsetNewest
@@ -402,6 +420,8 @@ func (c *Config) Validate() error {
 		return ConfigurationError("Consumer.MaxWaitTime must be >= 1ms")
 	case c.Consumer.MaxProcessingTime <= 0:
 		return ConfigurationError("Consumer.MaxProcessingTime must be > 0")
+	case c.Consumer.FastCheckerInterval < 0:
+		return ConfigurationError("Consumer.FastCheckerInterval must be >= 0")
 	case c.Consumer.Retry.Backoff < 0:
 		return ConfigurationError("Consumer.Retry.Backoff must be >= 0")
 	case c.Consumer.Offsets.CommitInterval <= 0:

+ 41 - 8
consumer.go

@@ -440,25 +440,58 @@ func (child *partitionConsumer) HighWaterMarkOffset() int64 {
 
 func (child *partitionConsumer) responseFeeder() {
 	var msgs []*ConsumerMessage
-	expiryTimer := time.NewTimer(child.conf.Consumer.MaxProcessingTime)
-	expireTimedOut := false
+	msgSent := false
+	// Initialize timer without a pending send on its channel
+	expiryTimer := time.NewTimer(0)
+	<-expiryTimer.C
+	expiryTimerSet := false
+
+	var fastCheckerChan <-chan (time.Time)
+	if child.conf.Consumer.FastCheckerInterval > 0 {
+		fastChecker := time.NewTicker(child.conf.Consumer.FastCheckerInterval)
+		defer fastChecker.Stop()
+		fastCheckerChan = fastChecker.C
+	}
 
 feederLoop:
 	for response := range child.feeder {
 		msgs, child.responseResult = child.parseResponse(response)
 
 		for i, msg := range msgs {
-			if !expiryTimer.Stop() && !expireTimedOut {
-				// expiryTimer was expired; clear out the waiting msg
-				<-expiryTimer.C
+			if child.conf.Consumer.FastCheckerInterval <= 0 {
+				expiryTimerSet = true
+				expiryTimer.Reset(child.conf.Consumer.MaxProcessingTime)
 			}
-			expiryTimer.Reset(child.conf.Consumer.MaxProcessingTime)
-			expireTimedOut = false
 
+		messageSelect:
 			select {
 			case child.messages <- msg:
+				msgSent = true
+				if expiryTimerSet {
+					// The timer was set and a message was sent, stop the
+					// timer and resume using the fast checker
+					if !expiryTimer.Stop() {
+						<-expiryTimer.C
+					}
+					expiryTimerSet = false
+				}
+			// Periodically check if messages have been sent
+			case <-fastCheckerChan:
+				if msgSent {
+					msgSent = false
+				} else if !expiryTimerSet {
+					// No messages have been sent since the last tick,
+					// start the timer
+					expiryTimerSet = true
+					// If the fast checker is being used, then at least
+					// the time between two fast checker ticks has already
+					// passed since the last message was sent.
+					expiryTimer.Reset(child.conf.Consumer.MaxProcessingTime - child.conf.Consumer.FastCheckerInterval)
+				}
+				// message has not been sent, return to select statement
+				goto messageSelect
 			case <-expiryTimer.C:
-				expireTimedOut = true
+				expiryTimerSet = false
 				child.responseResult = errTimedOut
 				child.broker.acks.Done()
 				for _, msg = range msgs[i:] {

+ 86 - 0
consumer_test.go

@@ -803,6 +803,92 @@ func TestConsumerOffsetOutOfRange(t *testing.T) {
 	broker0.Close()
 }
 
+func TestConsumerFastCheckerOff(t *testing.T) {
+	// Given
+	broker0 := NewMockBroker(t, 0)
+	fetchResponse1 := &FetchResponse{}
+	for i := 1; i <= 8; i++ {
+		fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, int64(i))
+	}
+	broker0.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": NewMockMetadataResponse(t).
+			SetBroker(broker0.Addr(), broker0.BrokerID()).
+			SetLeader("my_topic", 0, broker0.BrokerID()),
+		"OffsetRequest": NewMockOffsetResponse(t).
+			SetOffset("my_topic", 0, OffsetNewest, 1234).
+			SetOffset("my_topic", 0, OffsetOldest, 1),
+		"FetchRequest": NewMockSequence(fetchResponse1),
+	})
+
+	config := NewConfig()
+	config.ChannelBufferSize = 0
+	config.Consumer.FastCheckerInterval = 0
+	config.Consumer.MaxProcessingTime = 10 * time.Millisecond
+	master, err := NewConsumer([]string{broker0.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// When
+	consumer, err := master.ConsumePartition("my_topic", 0, 1)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// Then: messages with offsets 1 through 8 are read
+	for i := 1; i <= 8; i++ {
+		assertMessageOffset(t, <-consumer.Messages(), int64(i))
+		time.Sleep(2 * time.Millisecond)
+	}
+
+	safeClose(t, consumer)
+	safeClose(t, master)
+	broker0.Close()
+}
+
+func TestConsumerFastCheckerOn(t *testing.T) {
+	// Given
+	broker0 := NewMockBroker(t, 0)
+	fetchResponse1 := &FetchResponse{}
+	for i := 1; i <= 8; i++ {
+		fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, int64(i))
+	}
+	broker0.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": NewMockMetadataResponse(t).
+			SetBroker(broker0.Addr(), broker0.BrokerID()).
+			SetLeader("my_topic", 0, broker0.BrokerID()),
+		"OffsetRequest": NewMockOffsetResponse(t).
+			SetOffset("my_topic", 0, OffsetNewest, 1234).
+			SetOffset("my_topic", 0, OffsetOldest, 1),
+		"FetchRequest": NewMockSequence(fetchResponse1),
+	})
+
+	config := NewConfig()
+	config.ChannelBufferSize = 0
+	config.Consumer.FastCheckerInterval = 1 * time.Millisecond
+	config.Consumer.MaxProcessingTime = 10 * time.Millisecond
+	master, err := NewConsumer([]string{broker0.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// When
+	consumer, err := master.ConsumePartition("my_topic", 0, 1)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// Then: messages with offsets 1 through 8 are read
+	for i := 1; i <= 8; i++ {
+		assertMessageOffset(t, <-consumer.Messages(), int64(i))
+		time.Sleep(2 * time.Millisecond)
+	}
+
+	safeClose(t, consumer)
+	safeClose(t, master)
+	broker0.Close()
+}
+
 func assertMessageOffset(t *testing.T, msg *ConsumerMessage, expectedOffset int64) {
 	if msg.Offset != expectedOffset {
 		t.Errorf("Incorrect message offset: expected=%d, actual=%d", expectedOffset, msg.Offset)

+ 1 - 1
message_test.go

@@ -91,7 +91,7 @@ func TestMessageEncoding(t *testing.T) {
 
 	message.Value = []byte{}
 	message.Codec = CompressionGZIP
-	if runtime.Version() == "go1.8" {
+	if runtime.Version() == "go1.8" || runtime.Version() == "go1.8.1" {
 		testEncodable(t, "empty gzip", &message, emptyGzipMessage18)
 	} else {
 		testEncodable(t, "empty gzip", &message, emptyGzipMessage)