Ver código fonte

not working, handle only single message transaction

FrancoisPoinsot 6 anos atrás
pai
commit
559a3ceebb
3 arquivos alterados com 32 adições e 1 exclusões
  1. 5 0
      config.go
  2. 18 1
      consumer.go
  3. 9 0
      fetch_response.go

+ 5 - 0
config.go

@@ -338,6 +338,11 @@ type Config struct {
 				Max int
 				Max int
 			}
 			}
 		}
 		}
+
+		// IsolationLevel support 2 mode:
+		// 	- use `ReadUncommitted` (default) to consume and return all messages in message channel
+		//	- use `ReadCommitted` to hide messages that are part of an aborted transaction
+		IsolationLevel IsolationLevel
 	}
 	}
 
 
 	// A user-provided string sent with every request to the brokers for logging,
 	// A user-provided string sent with every request to the brokers for logging,

+ 18 - 1
consumer.go

@@ -610,10 +610,27 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu
 			if err != nil {
 			if err != nil {
 				return nil, err
 				return nil, err
 			}
 			}
+
+			// Parse and commit offset but do not expose messages that are:
+			// - control records
+			// - part of an aborted transaction when set to `ReadCommitted`
+
+			// control record
 			if control, err := records.isControl(); err != nil || control {
 			if control, err := records.isControl(); err != nil || control {
 				continue
 				continue
 			}
 			}
 
 
+			// aborted transactions
+			if child.conf.Consumer.IsolationLevel == ReadCommitted {
+				committedMessages := make([]*ConsumerMessage, 0, len(recordBatchMessages))
+				for _, message := range recordBatchMessages {
+					if !block.IsAborted(message.Offset) {
+						committedMessages = append(committedMessages, message)
+					}
+				}
+				recordBatchMessages = committedMessages
+			}
+
 			messages = append(messages, recordBatchMessages...)
 			messages = append(messages, recordBatchMessages...)
 		default:
 		default:
 			return nil, fmt.Errorf("unknown records type: %v", records.recordsType)
 			return nil, fmt.Errorf("unknown records type: %v", records.recordsType)
@@ -815,7 +832,7 @@ func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
 	}
 	}
 	if bc.consumer.conf.Version.IsAtLeast(V0_11_0_0) {
 	if bc.consumer.conf.Version.IsAtLeast(V0_11_0_0) {
 		request.Version = 4
 		request.Version = 4
-		request.Isolation = ReadUncommitted // We don't support yet transactions.
+		request.Isolation = bc.consumer.conf.Consumer.IsolationLevel
 	}
 	}
 
 
 	for child := range bc.subscriptions {
 	for child := range bc.subscriptions {

+ 9 - 0
fetch_response.go

@@ -185,6 +185,15 @@ func (b *FetchResponseBlock) encode(pe packetEncoder, version int16) (err error)
 	return pe.pop()
 	return pe.pop()
 }
 }
 
 
+func (r *FetchResponseBlock) IsAborted(offset int64) bool {
+	for _, abortedTransaction := range r.AbortedTransactions {
+		if abortedTransaction != nil && abortedTransaction.FirstOffset == offset {
+			return true
+		}
+	}
+	return false
+}
+
 type FetchResponse struct {
 type FetchResponse struct {
 	Blocks        map[string]map[int32]*FetchResponseBlock
 	Blocks        map[string]map[int32]*FetchResponseBlock
 	ThrottleTime  time.Duration
 	ThrottleTime  time.Duration