Переглянути джерело

simplified code for abandoned connection check, attempt to fix racy sync producer test

Steve van Loben Sels 7 роки тому
батько
коміт
459e548c02
3 змінених файлів з 13 додано та 20 видалено
  1. 11 18
      async_producer.go
  2. 1 1
      async_producer_test.go
  3. 1 1
      sync_producer_test.go

+ 11 - 18
async_producer.go

@@ -499,25 +499,18 @@ func (pp *partitionProducer) dispatch() {
 		}
 	}()
 
-	for {
-		var abandoned chan struct{}
-		if pp.brokerProducer != nil {
-			abandoned = pp.brokerProducer.abandoned
-		}
+	for msg := range pp.input {
 
-		var msg *ProducerMessage
-		var ok bool
-		select {
-		case <-abandoned:
-			// a message on the abandoned channel means that our current broker selection is out of date
-			Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
-			pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
-			pp.brokerProducer = nil
-			time.Sleep(pp.parent.conf.Producer.Retry.Backoff)
-			continue
-		case msg, ok = <-pp.input:
-			if !ok {
-				return
+		if pp.brokerProducer != nil && pp.brokerProducer.abandoned != nil {
+			select {
+			case <-pp.brokerProducer.abandoned:
+				// a message on the abandoned channel means that our current broker selection is out of date
+				Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
+				pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
+				pp.brokerProducer = nil
+				time.Sleep(pp.parent.conf.Producer.Retry.Backoff)
+			default:
+				// producer connection is still open.
 			}
 		}
 

+ 1 - 1
async_producer_test.go

@@ -345,9 +345,9 @@ func TestAsyncProducerRecoveryWithRetriesDisabled(t *testing.T) {
 	metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
 	metadataLeader2.AddTopicPartition("my_topic", 1, leader2.BrokerID(), nil, nil, ErrNoError)
 	leader1.Returns(metadataLeader2)
+	leader1.Returns(metadataLeader2)
 
 	producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 1}
-	leader1.Returns(metadataLeader2)
 	prodSuccess := new(ProduceResponse)
 	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
 	prodSuccess.AddTopicPartition("my_topic", 1, ErrNoError)

+ 1 - 1
sync_producer_test.go

@@ -177,7 +177,7 @@ func TestSyncProducerToNonExistingTopic(t *testing.T) {
 	broker.Close()
 }
 
-func TestSyncProducerRefreshMetadata(t *testing.T) {
+func TestSyncProducerRecoveryWithRetriesDisabled(t *testing.T) {
 	seedBroker := NewMockBroker(t, 1)
 	leader1 := NewMockBroker(t, 2)
 	leader2 := NewMockBroker(t, 3)