فهرست منبع

Merge pull request #512 from Shopify/cleanup-flusher

Clean up the flusher via helper methods
Evan Huus 10 سال پیش
والد
کامیت
85c76803a0
1فایلهای تغییر یافته به همراه74 افزوده شده و 58 حذف شده
  1. 74 58
      async_producer.go

+ 74 - 58
async_producer.go

@@ -504,9 +504,10 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessag
 	go withRecover(a.run)
 
 	f := &flusher{
-		parent: p,
-		broker: broker,
-		input:  bridge,
+		parent:         p,
+		broker:         broker,
+		input:          bridge,
+		currentRetries: make(map[string]map[int32]error),
 	}
 	go withRecover(f.run)
 
@@ -582,17 +583,18 @@ shutdown:
 	close(a.output)
 }
 
-// one per broker
-// takes a batch at a time from the messageAggregator and sends to the broker
+// takes a batch at a time from the aggregator and sends to the broker
 type flusher struct {
 	parent *asyncProducer
 	broker *Broker
 	input  <-chan []*ProducerMessage
+
+	currentRetries map[string]map[int32]error
 }
 
 func (f *flusher) run() {
 	var closing error
-	currentRetries := make(map[string]map[int32]error)
+
 	Logger.Printf("producer/flusher/%d starting up\n", f.broker.ID())
 
 	for batch := range f.input {
@@ -601,30 +603,7 @@ func (f *flusher) run() {
 			continue
 		}
 
-		// group messages by topic/partition
-		msgSets := make(map[string]map[int32][]*ProducerMessage)
-		for i, msg := range batch {
-			if currentRetries[msg.Topic] != nil && currentRetries[msg.Topic][msg.Partition] != nil {
-				if msg.flags&chaser == chaser {
-					// we can start processing this topic/partition again
-					Logger.Printf("producer/flusher/%d state change to [normal] on %s/%d\n",
-						f.broker.ID(), msg.Topic, msg.Partition)
-					currentRetries[msg.Topic][msg.Partition] = nil
-				}
-				f.parent.retryMessages([]*ProducerMessage{msg}, currentRetries[msg.Topic][msg.Partition])
-				batch[i] = nil // to prevent it being returned/retried twice
-				continue
-			}
-
-			partitionSet := msgSets[msg.Topic]
-			if partitionSet == nil {
-				partitionSet = make(map[int32][]*ProducerMessage)
-				msgSets[msg.Topic] = partitionSet
-			}
-
-			partitionSet[msg.Partition] = append(partitionSet[msg.Partition], msg)
-		}
-
+		msgSets := f.groupAndFilter(batch)
 		request := f.parent.buildRequest(msgSets)
 		if request == nil {
 			continue
@@ -653,40 +632,77 @@ func (f *flusher) run() {
 			continue
 		}
 
-		// we iterate through the blocks in the request, not the response, so that we notice
-		// if the response is missing a block completely
-		for topic, partitionSet := range msgSets {
-			for partition, msgs := range partitionSet {
+		f.parseResponse(msgSets, response)
+	}
+	Logger.Printf("producer/flusher/%d shut down\n", f.broker.ID())
+}
 
-				block := response.GetBlock(topic, partition)
-				if block == nil {
-					f.parent.returnErrors(msgs, ErrIncompleteResponse)
-					continue
-				}
+func (f *flusher) groupAndFilter(batch []*ProducerMessage) map[string]map[int32][]*ProducerMessage {
+	msgSets := make(map[string]map[int32][]*ProducerMessage)
 
-				switch block.Err {
-				case ErrNoError:
-					// All the messages for this topic-partition were delivered successfully!
-					for i := range msgs {
-						msgs[i].Offset = block.Offset + int64(i)
-					}
-					f.parent.returnSuccesses(msgs)
-				case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable,
-					ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
-					Logger.Printf("producer/flusher/%d state change to [retrying] on %s/%d because %v\n",
-						f.broker.ID(), topic, partition, block.Err)
-					if currentRetries[topic] == nil {
-						currentRetries[topic] = make(map[int32]error)
-					}
-					currentRetries[topic][partition] = block.Err
-					f.parent.retryMessages(msgs, block.Err)
-				default:
-					f.parent.returnErrors(msgs, block.Err)
+	for i, msg := range batch {
+
+		if f.currentRetries[msg.Topic] != nil && f.currentRetries[msg.Topic][msg.Partition] != nil {
+			// we're currently retrying this partition so we need to filter out this message
+			f.parent.retryMessages([]*ProducerMessage{msg}, f.currentRetries[msg.Topic][msg.Partition])
+			batch[i] = nil
+
+			if msg.flags&chaser == chaser {
+				// ...but now we can start processing future messages again
+				Logger.Printf("producer/flusher/%d state change to [normal] on %s/%d\n",
+					f.broker.ID(), msg.Topic, msg.Partition)
+				delete(f.currentRetries[msg.Topic], msg.Partition)
+			}
+
+			continue
+		}
+
+		partitionSet := msgSets[msg.Topic]
+		if partitionSet == nil {
+			partitionSet = make(map[int32][]*ProducerMessage)
+			msgSets[msg.Topic] = partitionSet
+		}
+
+		partitionSet[msg.Partition] = append(partitionSet[msg.Partition], msg)
+	}
+
+	return msgSets
+}
+
+func (f *flusher) parseResponse(msgSets map[string]map[int32][]*ProducerMessage, response *ProduceResponse) {
+	// we iterate through the blocks in the request set, not the response, so that we notice
+	// if the response is missing a block completely
+	for topic, partitionSet := range msgSets {
+		for partition, msgs := range partitionSet {
+			block := response.GetBlock(topic, partition)
+			if block == nil {
+				f.parent.returnErrors(msgs, ErrIncompleteResponse)
+				continue
+			}
+
+			switch block.Err {
+			// Success
+			case ErrNoError:
+				for i := range msgs {
+					msgs[i].Offset = block.Offset + int64(i)
 				}
+				f.parent.returnSuccesses(msgs)
+			// Retriable errors
+			case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable,
+				ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
+				Logger.Printf("producer/flusher/%d state change to [retrying] on %s/%d because %v\n",
+					f.broker.ID(), topic, partition, block.Err)
+				if f.currentRetries[topic] == nil {
+					f.currentRetries[topic] = make(map[int32]error)
+				}
+				f.currentRetries[topic][partition] = block.Err
+				f.parent.retryMessages(msgs, block.Err)
+			// Other non-retriable errors
+			default:
+				f.parent.returnErrors(msgs, block.Err)
 			}
 		}
 	}
-	Logger.Printf("producer/flusher/%d shut down\n", f.broker.ID())
 }
 
 // singleton