Browse Source

Simplify dispatcher loop

Evan Huus 10 years ago
parent
commit
01eada75a8
2 changed files with 4 additions and 9 deletions
  1. 1 8
      consumer.go
  2. 3 1
      consumer_test.go

+ 1 - 8
consumer.go

@@ -268,7 +268,7 @@ func (child *partitionConsumer) dispatcher() {
 		select {
 		case <-child.dying:
 			close(child.trigger)
-		default:
+		case <-time.After(child.conf.Consumer.Retry.Backoff):
 			if child.broker != nil {
 				child.consumer.unrefBrokerConsumer(child.broker)
 				child.broker = nil
@@ -277,13 +277,6 @@ func (child *partitionConsumer) dispatcher() {
 			if err := child.dispatch(); err != nil {
 				child.sendError(err)
 				child.trigger <- none{}
-
-				// there's no point in trying again *right* away
-				select {
-				case <-child.dying:
-					close(child.trigger)
-				case <-time.After(child.conf.Consumer.Retry.Backoff):
-				}
 			}
 		}
 	}

+ 3 - 1
consumer_test.go

@@ -142,7 +142,9 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
 	seedBroker.Returns(metadataResponse)
 
 	// launch test goroutines
-	master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
+	config := NewConfig()
+	config.Consumer.Retry.Backoff = 0
+	master, err := NewConsumer([]string{seedBroker.Addr()}, config)
 	if err != nil {
 		t.Fatal(err)
 	}