Explorar el Código

control record + isTransactional bit parsing

FrancoisPoinsot hace 5 años
padre
commit
b68420a29b
Se han modificado 6 ficheros con 96 adiciones y 5 borrados
  1. 14 1
      consumer.go
  2. 18 0
      control_record.go
  3. 1 0
      record.go
  4. 5 0
      record_batch.go
  5. 49 0
      records.go
  6. 9 4
      tools/uncomitted-messages/kafka-console-consumer.go

+ 14 - 1
consumer.go

@@ -616,9 +616,22 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu
 			// - part of an aborted transaction when set to `ReadCommitted`
 
 			// control record
-			if control, err := records.isControl(); err != nil || control {
+			isControl, err := records.isControl()
+			if err != nil {
 				continue
 			}
+			if isControl {
+				//// TODO do not commit
+				//spew.Dump(records)
+
+				controlRecord, err := records.getControlRecord()
+				if err != nil {
+					return nil, err
+				}
+
+				//TODO do ont commit
+				fmt.Println(controlRecord)
+			}
 
 			// aborted transactions
 			if child.conf.Consumer.IsolationLevel == ReadCommitted {

+ 18 - 0
control_record.go

@@ -0,0 +1,18 @@
+package sarama
+
+type ControlRecordType int
+
+const (
+	ControlRecordAbort ControlRecordType = iota
+	ControlRecordCommit
+	ControlRecordUnknown
+)
+
+// Control records are returned as a record by fetchRequest
+// However unlike "normal" records, they mean nothing application wise.
+// They only serve internal logic for supporting transactions.
+type ControlRecord struct {
+	Version          int16
+	CoordinatorEpoch int32
+	Type             ControlRecordType
+}

+ 1 - 0
record.go

@@ -6,6 +6,7 @@ import (
 )
 
 const (
+	isTransactionalMask   = 0x10
 	controlMask           = 0x20
 	maximumRecordOverhead = 5*binary.MaxVarintLen32 + binary.MaxVarintLen64 + 1
 )

+ 5 - 0
record_batch.go

@@ -45,6 +45,7 @@ type RecordBatch struct {
 	FirstSequence         int32
 	Records               []*Record
 	PartialTrailingRecord bool
+	IsTransactional       bool
 
 	compressedRecords []byte
 	recordsLen        int // uncompressed records size
@@ -122,6 +123,7 @@ func (b *RecordBatch) decode(pd packetDecoder) (err error) {
 	b.Codec = CompressionCodec(int8(attributes) & compressionCodecMask)
 	b.Control = attributes&controlMask == controlMask
 	b.LogAppendTime = attributes&timestampTypeMask == timestampTypeMask
+	b.IsTransactional = attributes&isTransactionalMask == isTransactionalMask
 
 	if b.LastOffsetDelta, err = pd.getInt32(); err != nil {
 		return err
@@ -205,6 +207,9 @@ func (b *RecordBatch) computeAttributes() int16 {
 	if b.LogAppendTime {
 		attr |= timestampTypeMask
 	}
+	if b.IsTransactional {
+		attr |= isTransactionalMask
+	}
 	return attr
 }
 

+ 49 - 0
records.go

@@ -192,3 +192,52 @@ func magicValue(pd packetDecoder) (int8, error) {
 
 	return dec.getInt8()
 }
+
+func (r *Records) getControlRecord() (ControlRecord, error) {
+	if r.RecordBatch == nil || len(r.RecordBatch.Records) <= 0 {
+		return ControlRecord{}, fmt.Errorf("cannot get control record, record batch is empty")
+	}
+
+	firstRecord := r.RecordBatch.Records[0]
+	controlRecord := ControlRecord{}
+	{
+		var err error
+		valueDecoder := realDecoder{raw: firstRecord.Value}
+		controlRecord.Version, err = valueDecoder.getInt16()
+		if err != nil {
+			return ControlRecord{}, err
+		}
+		controlRecord.CoordinatorEpoch, err = valueDecoder.getInt32()
+		if err != nil {
+			return ControlRecord{}, err
+		}
+	}
+	{
+		var err error
+		keyDecoder := realDecoder{raw: firstRecord.Key}
+
+		// There a version for the value part AND the key part. And I have no idea if they are supposed to match or not
+		// Either way, all these version can only be 0 for now
+		controlRecord.Version, err = keyDecoder.getInt16()
+		if err != nil {
+			return ControlRecord{}, err
+		}
+
+		recordType, err := keyDecoder.getInt16()
+		if err != nil {
+			return ControlRecord{}, err
+		}
+		switch recordType {
+		case 0:
+			controlRecord.Type = ControlRecordAbort
+		case 1:
+			controlRecord.Type = ControlRecordCommit
+		default:
+			// from JAVA implementation:
+			// UNKNOWN is used to indicate a control type which the client is not aware of and should be ignored
+			controlRecord.Type = ControlRecordUnknown
+		}
+	}
+
+	return controlRecord, nil
+}

+ 9 - 4
tools/uncomitted-messages/kafka-console-consumer.go

@@ -24,7 +24,12 @@ func main() {
 		log.Fatal("-topic is required")
 	}
 
-	c, err := sarama.NewConsumer(strings.Split(*brokerList, ","), nil)
+	config := sarama.NewConfig()
+	config.Version = sarama.V1_1_1_0
+	config.Consumer.IsolationLevel = sarama.ReadCommitted
+	config.Consumer.MaxProcessingTime = 20 * 365 * 24 * time.Hour
+
+	c, err := sarama.NewConsumer(strings.Split(*brokerList, ","), config)
 	if err != nil {
 		log.Fatalf("Failed to start consumer: %s", err)
 	}
@@ -49,7 +54,7 @@ func main() {
 		msgChannel := pc.Messages()
 	read1Partition:
 		for {
-			timeout := time.NewTimer(1 * time.Second)
+			//timeout := time.NewTimer(1 * time.Second)
 			select {
 			case msg, open := <-msgChannel:
 				if !open {
@@ -57,8 +62,8 @@ func main() {
 					break read1Partition
 				}
 				log.Println(string(msg.Value))
-			case <-timeout.C:
-				break read1Partition
+				//case <-timeout.C:
+				//	break read1Partition
 			}
 		}
 	}