Selaa lähdekoodia

Optionally use a ticker instead of a timer to detect timeouts

Vagrant User 8 vuotta sitten
vanhempi
commit
87e4033a66
3 muutettua tiedostoa jossa 25 lisäystä ja 60 poistoa
  1. 9 13
      config.go
  2. 14 45
      consumer.go
  3. 2 2
      consumer_test.go

+ 9 - 13
config.go

@@ -197,22 +197,20 @@ type Config struct {
 		// (MaxProcessingTime * ChanneBufferSize). Defaults to 100ms.
 		MaxProcessingTime time.Duration
 
-		// The time interval between ticks of the fast checker. A value of 0
-		// turns off the fast checker.
-		// If this is set to a non-zero value, then there will be periodic
-		// checks to see if messages have been written to the Messages channel.
-		// If a message has not been written to the Messages channel since the
-		// last tick of the fast checker, then the timer will be set.
+		// Whether or not to use the fast checker. The fast checker uses a
+		// ticker instead of a timer to implement the timeout functionality in
+		// (*partitionConsumer).responseFeeder.
+		// If a message is not written to the Messages channel between two ticks
+		// of the fast checker then a timeout is detected.
 		// Using the fast checker should typically result in many fewer calls to
 		// Timer functions resulting in a significant performance improvement if
 		// many messages are being sent and timeouts are infrequent.
 		// The disadvantage of using the fast checker is that timeouts will be
 		// less accurate. That is, the effective timeout could be between
-		// `MaxProcessingTime` and `MaxProcessingTime + FastCheckerInterval`.
-		// For example, if `MaxProcessingTime` is 100ms and
-		// `FastCheckerInterval` is 10ms, then a delay of 108ms between two
+		// `MaxProcessingTime` and `2 * MaxProcessingTime`. For example, if
+		// `MaxProcessingTime` is 100ms then a delay of 180ms between two
 		// messages being sent may not be recognized as a timeout.
-		FastCheckerInterval time.Duration
+		UseFastChecker bool
 
 		// Return specifies what channels will be populated. If they are set to true,
 		// you must read from them to prevent deadlock.
@@ -294,7 +292,7 @@ func NewConfig() *Config {
 	c.Consumer.Retry.Backoff = 2 * time.Second
 	c.Consumer.MaxWaitTime = 250 * time.Millisecond
 	c.Consumer.MaxProcessingTime = 100 * time.Millisecond
-	c.Consumer.FastCheckerInterval = 0
+	c.Consumer.UseFastChecker = false
 	c.Consumer.Return.Errors = false
 	c.Consumer.Offsets.CommitInterval = 1 * time.Second
 	c.Consumer.Offsets.Initial = OffsetNewest
@@ -420,8 +418,6 @@ func (c *Config) Validate() error {
 		return ConfigurationError("Consumer.MaxWaitTime must be >= 1ms")
 	case c.Consumer.MaxProcessingTime <= 0:
 		return ConfigurationError("Consumer.MaxProcessingTime must be > 0")
-	case c.Consumer.FastCheckerInterval < 0:
-		return ConfigurationError("Consumer.FastCheckerInterval must be >= 0")
 	case c.Consumer.Retry.Backoff < 0:
 		return ConfigurationError("Consumer.Retry.Backoff must be >= 0")
 	case c.Consumer.Offsets.CommitInterval <= 0:

+ 14 - 45
consumer.go

@@ -441,67 +441,36 @@ func (child *partitionConsumer) HighWaterMarkOffset() int64 {
 func (child *partitionConsumer) responseFeeder() {
 	var msgs []*ConsumerMessage
 	msgSent := false
-	// Initialize timer without a pending send on its channel
-	expiryTimer := time.NewTimer(0)
-	<-expiryTimer.C
-	expiryTimerSet := false
-
-	var fastCheckerChan <-chan (time.Time)
-	if child.conf.Consumer.FastCheckerInterval > 0 {
-		fastChecker := time.NewTicker(child.conf.Consumer.FastCheckerInterval)
-		defer fastChecker.Stop()
-		fastCheckerChan = fastChecker.C
-	}
 
 feederLoop:
 	for response := range child.feeder {
 		msgs, child.responseResult = child.parseResponse(response)
+		expiryTicker := time.NewTicker(child.conf.Consumer.MaxProcessingTime)
 
 		for i, msg := range msgs {
-			if child.conf.Consumer.FastCheckerInterval <= 0 {
-				expiryTimerSet = true
-				expiryTimer.Reset(child.conf.Consumer.MaxProcessingTime)
-			}
-
 		messageSelect:
 			select {
 			case child.messages <- msg:
 				msgSent = true
-				if expiryTimerSet {
-					// The timer was set and a message was sent, stop the
-					// timer and resume using the fast checker
-					if !expiryTimer.Stop() {
-						<-expiryTimer.C
+			case <-expiryTicker.C:
+				if !msgSent {
+					child.responseResult = errTimedOut
+					child.broker.acks.Done()
+					for _, msg = range msgs[i:] {
+						child.messages <- msg
 					}
-					expiryTimerSet = false
-				}
-			// Periodically check if messages have been sent
-			case <-fastCheckerChan:
-				if msgSent {
+					child.broker.input <- child
+					continue feederLoop
+				} else {
+					// current message has not been sent, return to select
+					// statement
 					msgSent = false
-				} else if !expiryTimerSet {
-					// No messages have been sent since the last tick,
-					// start the timer
-					expiryTimerSet = true
-					// If the fast checker is being used, then at least
-					// the time between two fast checker ticks has already
-					// passed since the last message was sent.
-					expiryTimer.Reset(child.conf.Consumer.MaxProcessingTime - child.conf.Consumer.FastCheckerInterval)
-				}
-				// message has not been sent, return to select statement
-				goto messageSelect
-			case <-expiryTimer.C:
-				expiryTimerSet = false
-				child.responseResult = errTimedOut
-				child.broker.acks.Done()
-				for _, msg = range msgs[i:] {
-					child.messages <- msg
+					goto messageSelect
 				}
-				child.broker.input <- child
-				continue feederLoop
 			}
 		}
 
+		expiryTicker.Stop()
 		child.broker.acks.Done()
 	}
 

+ 2 - 2
consumer_test.go

@@ -822,7 +822,7 @@ func TestConsumerFastCheckerOff(t *testing.T) {
 
 	config := NewConfig()
 	config.ChannelBufferSize = 0
-	config.Consumer.FastCheckerInterval = 0
+	config.Consumer.UseFastChecker = false
 	config.Consumer.MaxProcessingTime = 10 * time.Millisecond
 	master, err := NewConsumer([]string{broker0.Addr()}, config)
 	if err != nil {
@@ -865,7 +865,7 @@ func TestConsumerFastCheckerOn(t *testing.T) {
 
 	config := NewConfig()
 	config.ChannelBufferSize = 0
-	config.Consumer.FastCheckerInterval = 1 * time.Millisecond
+	config.Consumer.UseFastChecker = true
 	config.Consumer.MaxProcessingTime = 10 * time.Millisecond
 	master, err := NewConsumer([]string{broker0.Addr()}, config)
 	if err != nil {