Browse Source

copied transaction logic from java client
uh, it works?

FrancoisPoinsot 6 years ago
parent
commit
10d673ff22
2 changed files with 33 additions and 7 deletions
  1. 29 7
      consumer.go
  2. 4 0
      record_batch.go

+ 29 - 7
consumer.go

@@ -3,6 +3,7 @@ package sarama
 import (
 	"errors"
 	"fmt"
+	"sort"
 	"sync"
 	"sync/atomic"
 	"time"
@@ -595,6 +596,16 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu
 	child.fetchSize = child.conf.Consumer.Fetch.Default
 	atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset)
 
+	abortedProducerIDs := make(map[int64]none, len(block.AbortedTransactions))
+
+	// Load aborted transaction in separate var because we are going to depile this one
+	abortedTransactions := make([]*AbortedTransaction, len(block.AbortedTransactions))
+	copy(abortedTransactions, block.AbortedTransactions)
+	sort.Slice(
+		abortedTransactions,
+		func(i, j int) bool { return abortedTransactions[i].FirstOffset < abortedTransactions[j].FirstOffset },
+	)
+
 	messages := []*ConsumerMessage{}
 	for _, records := range block.RecordsSet {
 		switch records.recordsType {
@@ -606,6 +617,15 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu
 
 			messages = append(messages, messageSetMessages...)
 		case defaultRecords:
+			for _, abortedTransaction := range abortedTransactions {
+				if abortedTransaction.FirstOffset > records.RecordBatch.LastOffset() {
+					break
+				}
+				// add aborted transaction to abortedProducer list and depile abortedTransactions
+				abortedProducerIDs[abortedTransaction.ProducerID] = none{}
+				abortedTransactions = abortedTransactions[1:]
+			}
+
 			recordBatchMessages, err := child.parseRecords(records.RecordBatch)
 			if err != nil {
 				return nil, err
@@ -618,26 +638,28 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu
 			// control record
 			isControl, err := records.isControl()
 			if err != nil {
+				//TODO maybe we should handle this ? a log at least
 				continue
 			}
 			if isControl {
-				//// TODO do not commit
-				//spew.Dump(records)
-
 				controlRecord, err := records.getControlRecord()
 				if err != nil {
 					return nil, err
 				}
 
-				//TODO do ont commit
-				fmt.Println(controlRecord)
+				if controlRecord.Type == ControlRecordAbort {
+					delete(abortedProducerIDs, records.RecordBatch.ProducerID)
+				}
+				continue
 			}
 
-			// aborted transactions
+			// filter aborted transactions
 			if child.conf.Consumer.IsolationLevel == ReadCommitted {
 				committedMessages := make([]*ConsumerMessage, 0, len(recordBatchMessages))
 				for _, message := range recordBatchMessages {
-					if !block.IsAborted(message.Offset) {
+					_, exist := abortedProducerIDs[records.RecordBatch.ProducerID]
+					if !(records.RecordBatch.IsTransactional && exist) {
+						// as long as this is not a transactional message that is part of aborted transaction, let it pass
 						committedMessages = append(committedMessages, message)
 					}
 				}

+ 4 - 0
record_batch.go

@@ -51,6 +51,10 @@ type RecordBatch struct {
 	recordsLen        int // uncompressed records size
 }
 
+func (b *RecordBatch) LastOffset() int64 {
+	return b.FirstOffset + int64(b.LastOffsetDelta)
+}
+
 func (b *RecordBatch) encode(pe packetEncoder) error {
 	if b.Version != 2 {
 		return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)}