Explorar el Código

Use a ticker instead of a timer to detect timeouts

Vagrant User hace 7 años
padre
commit
9112d76399
Se han modificado 2 ficheros con 16 adiciones y 64 borrados
  1. 15 19
      config.go
  2. 1 45
      consumer_test.go

+ 15 - 19
config.go

@@ -190,27 +190,24 @@ type Config struct {
 		// Equivalent to the JVM's `fetch.wait.max.ms`.
 		MaxWaitTime time.Duration
 
-		// The maximum amount of time the consumer expects a message takes to process
-		// for the user. If writing to the Messages channel takes longer than this,
-		// that partition will stop fetching more messages until it can proceed again.
+		// The maximum amount of time the consumer expects a message takes to
+		// process for the user. If writing to the Messages channel takes longer
+		// than this, that partition will stop fetching more messages until it
+		// can proceed again.
 		// Note that, since the Messages channel is buffered, the actual grace time is
 		// (MaxProcessingTime * ChanneBufferSize). Defaults to 100ms.
-		MaxProcessingTime time.Duration
-
-		// Whether or not to use the fast checker. The fast checker uses a
-		// ticker instead of a timer to implement the timeout functionality in
-		// (*partitionConsumer).responseFeeder.
 		// If a message is not written to the Messages channel between two ticks
-		// of the fast checker then a timeout is detected.
-		// Using the fast checker should typically result in many fewer calls to
-		// Timer functions resulting in a significant performance improvement if
-		// many messages are being sent and timeouts are infrequent.
-		// The disadvantage of using the fast checker is that timeouts will be
-		// less accurate. That is, the effective timeout could be between
-		// `MaxProcessingTime` and `2 * MaxProcessingTime`. For example, if
-		// `MaxProcessingTime` is 100ms then a delay of 180ms between two
-		// messages being sent may not be recognized as a timeout.
-		UseFastChecker bool
+		// of the expiryTicker then a timeout is detected.
+		// Using a ticker instead of a timer to detect timeouts should typically
+		// result in many fewer calls to Timer functions which may result in a
+		// significant performance improvement if many messages are being sent
+		// and timeouts are infrequent.
+		// The disadvantage of using a ticker instead of a timer is that
+		// timeouts will be less accurate. That is, the effective timeout could
+		// be between `MaxProcessingTime` and `2 * MaxProcessingTime`. For
+		// example, if `MaxProcessingTime` is 100ms then a delay of 180ms
+		// between two messages being sent may not be recognized as a timeout.
+		MaxProcessingTime time.Duration
 
 		// Return specifies what channels will be populated. If they are set to true,
 		// you must read from them to prevent deadlock.
@@ -292,7 +289,6 @@ func NewConfig() *Config {
 	c.Consumer.Retry.Backoff = 2 * time.Second
 	c.Consumer.MaxWaitTime = 250 * time.Millisecond
 	c.Consumer.MaxProcessingTime = 100 * time.Millisecond
-	c.Consumer.UseFastChecker = false
 	c.Consumer.Return.Errors = false
 	c.Consumer.Offsets.CommitInterval = 1 * time.Second
 	c.Consumer.Offsets.Initial = OffsetNewest

+ 1 - 45
consumer_test.go

@@ -803,7 +803,7 @@ func TestConsumerOffsetOutOfRange(t *testing.T) {
 	broker0.Close()
 }
 
-func TestConsumerFastCheckerOff(t *testing.T) {
+func TestConsumerExpiryTicker(t *testing.T) {
 	// Given
 	broker0 := NewMockBroker(t, 0)
 	fetchResponse1 := &FetchResponse{}
@@ -822,50 +822,6 @@ func TestConsumerFastCheckerOff(t *testing.T) {
 
 	config := NewConfig()
 	config.ChannelBufferSize = 0
-	config.Consumer.UseFastChecker = false
-	config.Consumer.MaxProcessingTime = 10 * time.Millisecond
-	master, err := NewConsumer([]string{broker0.Addr()}, config)
-	if err != nil {
-		t.Fatal(err)
-	}
-
-	// When
-	consumer, err := master.ConsumePartition("my_topic", 0, 1)
-	if err != nil {
-		t.Fatal(err)
-	}
-
-	// Then: messages with offsets 1 through 8 are read
-	for i := 1; i <= 8; i++ {
-		assertMessageOffset(t, <-consumer.Messages(), int64(i))
-		time.Sleep(2 * time.Millisecond)
-	}
-
-	safeClose(t, consumer)
-	safeClose(t, master)
-	broker0.Close()
-}
-
-func TestConsumerFastCheckerOn(t *testing.T) {
-	// Given
-	broker0 := NewMockBroker(t, 0)
-	fetchResponse1 := &FetchResponse{}
-	for i := 1; i <= 8; i++ {
-		fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, int64(i))
-	}
-	broker0.SetHandlerByMap(map[string]MockResponse{
-		"MetadataRequest": NewMockMetadataResponse(t).
-			SetBroker(broker0.Addr(), broker0.BrokerID()).
-			SetLeader("my_topic", 0, broker0.BrokerID()),
-		"OffsetRequest": NewMockOffsetResponse(t).
-			SetOffset("my_topic", 0, OffsetNewest, 1234).
-			SetOffset("my_topic", 0, OffsetOldest, 1),
-		"FetchRequest": NewMockSequence(fetchResponse1),
-	})
-
-	config := NewConfig()
-	config.ChannelBufferSize = 0
-	config.Consumer.UseFastChecker = true
 	config.Consumer.MaxProcessingTime = 10 * time.Millisecond
 	master, err := NewConsumer([]string{broker0.Addr()}, config)
 	if err != nil {