Browse Source

rm unnecessary logic: abort message on batch level

francois 6 years ago
parent
commit
d2e4a4d587
1 changed files with 3 additions and 10 deletions
  1. 3 10
      consumer.go

+ 3 - 10
consumer.go

@@ -47,7 +47,6 @@ func (ce ConsumerErrors) Error() string {
 // on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of
 // scope.
 type Consumer interface {
-
 	// Topics returns the set of available topics as retrieved from the cluster
 	// metadata. This method is the same as Client.Topics(), and is provided for
 	// convenience.
@@ -260,7 +259,6 @@ func (c *consumer) abandonBrokerConsumer(brokerWorker *brokerConsumer) {
 // Close; this will signal the PartitionConsumer's goroutines to begin shutting down (just like AsyncClose), but will
 // also drain the Messages channel, harvest all errors & return them once cleanup has completed.
 type PartitionConsumer interface {
-
 	// AsyncClose initiates a shutdown of the PartitionConsumer. This method will return immediately, after which you
 	// should continue to service the 'Messages' and 'Errors' channels until they are empty. It is required to call this
 	// function, or Close before a consumer object passes out of scope, as it will otherwise leak memory. You must call
@@ -667,15 +665,10 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu
 
 			// filter aborted transactions
 			if child.conf.Consumer.IsolationLevel == ReadCommitted {
-				committedMessages := make([]*ConsumerMessage, 0, len(recordBatchMessages))
-				for _, message := range recordBatchMessages {
-					_, exist := abortedProducerIDs[records.RecordBatch.ProducerID]
-					if !(records.RecordBatch.IsTransactional && exist) {
-						// as long as this is not a transactional message that is part of aborted transaction, let it pass
-						committedMessages = append(committedMessages, message)
-					}
+				_, isAborted := abortedProducerIDs[records.RecordBatch.ProducerID]
+				if records.RecordBatch.IsTransactional && isAborted {
+					continue
 				}
-				recordBatchMessages = committedMessages
 			}
 
 			messages = append(messages, recordBatchMessages...)