Browse Source

midway on consumer test

FrancoisPoinsot 6 năm trước cách đây
mục cha
commit
57215a08ac
5 tập tin đã thay đổi với 172 bổ sung37 xóa
  1. 81 0
      consumer_test.go
  2. 52 0
      control_record.go
  3. 1 0
      control_record_test.go
  4. 29 0
      fetch_response.go
  5. 9 37
      records.go

+ 81 - 0
consumer_test.go

@@ -1128,6 +1128,87 @@ func TestConsumerTimestamps(t *testing.T) {
 	}
 }
 
+// When set to ReadCommitted, no uncommitted message should be available in messages channel
+func TestExcludeUncommitted(t *testing.T) {
+	// Given
+	broker0 := NewMockBroker(t, 0)
+
+	// define controlRecord key and value
+	controlRecordAbort := ControlRecord{
+		Version: 0,
+		Type:    ControlRecordAbort,
+	}
+	crKey := &realEncoder{
+		raw: make([]byte, 4),
+	}
+	crValue := &realEncoder{
+		raw: make([]byte, 6),
+	}
+	controlRecordAbort.encode(crKey, crValue)
+
+	fetchResponse := &FetchResponse{
+		Version: 4,
+		Blocks: map[string]map[int32]*FetchResponseBlock{"my_topic": {0: {
+			AbortedTransactions: []*AbortedTransaction{
+				{
+					ProducerID:  7,
+					FirstOffset: 1235,
+				},
+			},
+		}}},
+	}
+	fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 0, 7, true) // committed msg
+	fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 1, 7, true) // uncommitted msg
+	//TODO, need its own specific method
+	fetchResponse.AddRecordBatch("my_topic", 0, ByteEncoder(crValue.raw), ByteEncoder(crKey.raw), 2, 7, true) // abort control record
+	fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 3, 7, true)                                     // committed msg
+
+	broker0.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": NewMockMetadataResponse(t).
+			SetBroker(broker0.Addr(), broker0.BrokerID()).
+			SetLeader("my_topic", 0, broker0.BrokerID()),
+		"OffsetRequest": NewMockOffsetResponse(t).
+			SetVersion(1).
+			SetOffset("my_topic", 0, OffsetOldest, 0).
+			SetOffset("my_topic", 0, OffsetNewest, 1237),
+		"FetchRequest": NewMockSequence(fetchResponse),
+	})
+
+	cfg := NewConfig()
+	cfg.Consumer.Return.Errors = true
+	cfg.Version = V0_11_0_0
+	cfg.Consumer.IsolationLevel = ReadCommitted
+
+	// When
+	master, err := NewConsumer([]string{broker0.Addr()}, cfg)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	consumer, err := master.ConsumePartition("my_topic", 0, 1234)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// Then: only the 2 committed messages are returned
+	select {
+	case message := <-consumer.Messages():
+		assertMessageOffset(t, message, int64(1234))
+	case err := <-consumer.Errors():
+		t.Error(err)
+	}
+	select {
+	case message := <-consumer.Messages():
+		assertMessageOffset(t, message, int64(1237))
+	case err := <-consumer.Errors():
+		t.Error(err)
+	}
+
+	safeClose(t, consumer)
+	safeClose(t, master)
+	broker0.Close()
+}
+
 func assertMessageOffset(t *testing.T, msg *ConsumerMessage, expectedOffset int64) {
 	if msg.Offset != expectedOffset {
 		t.Errorf("Incorrect message offset: expected=%d, actual=%d", expectedOffset, msg.Offset)

+ 52 - 0
control_record.go

@@ -16,3 +16,55 @@ type ControlRecord struct {
 	CoordinatorEpoch int32
 	Type             ControlRecordType
 }
+
+func (cr *ControlRecord) decode(key, value packetDecoder) (err error) {
+	{
+		var err error
+		cr.Version, err = value.getInt16()
+		if err != nil {
+			return err
+		}
+		cr.CoordinatorEpoch, err = value.getInt32()
+		if err != nil {
+			return err
+		}
+	}
+	{
+		var err error
+		// There a version for the value part AND the key part. And I have no idea if they are supposed to match or not
+		// Either way, all these version can only be 0 for now
+		cr.Version, err = key.getInt16()
+		if err != nil {
+			return err
+		}
+
+		recordType, err := key.getInt16()
+		if err != nil {
+			return err
+		}
+		switch recordType {
+		case 0:
+			cr.Type = ControlRecordAbort
+		case 1:
+			cr.Type = ControlRecordCommit
+		default:
+			// from JAVA implementation:
+			// UNKNOWN is used to indicate a control type which the client is not aware of and should be ignored
+			cr.Type = ControlRecordUnknown
+		}
+	}
+	return nil
+}
+
+func (cr *ControlRecord) encode(key, value packetEncoder) {
+	value.putInt16(cr.Version)
+	value.putInt32(cr.CoordinatorEpoch)
+
+	key.putInt16(cr.Version)
+	switch cr.Type {
+	case ControlRecordAbort:
+		key.putInt16(0)
+	case ControlRecordCommit:
+		key.putInt16(1)
+	}
+}

+ 1 - 0
control_record_test.go

@@ -0,0 +1 @@
+package sarama

+ 29 - 0
fetch_response.go

@@ -385,6 +385,31 @@ func (r *FetchResponse) AddRecordWithTimestamp(topic string, partition int32, ke
 	batch.addRecord(rec)
 }
 
+// AddRecordBatchWithTimestamp is similar to AddRecordWithTimestamp
+// But instead of append a record a batch of record it append a new batch of record of size 1 to a set of batch
+// Since transaction are handled on batch level (the whole batch is either committed or aborted), use this to test transactions
+func (r *FetchResponse) AddRecordBatchWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, producerID int64, isTransactional bool, timestamp time.Time) {
+	frb := r.getOrCreateBlock(topic, partition)
+	kb, vb := encodeKV(key, value)
+
+	records := newDefaultRecords(&RecordBatch{Version: 2, LogAppendTime: r.LogAppendTime, FirstTimestamp: timestamp, MaxTimestamp: r.Timestamp})
+	batch := &RecordBatch{
+		Version:         2,
+		LogAppendTime:   r.LogAppendTime,
+		FirstTimestamp:  timestamp,
+		MaxTimestamp:    r.Timestamp,
+		FirstOffset:     offset,
+		LastOffsetDelta: 0,
+		ProducerID:      producerID,
+		IsTransactional: isTransactional,
+	}
+	rec := &Record{Key: kb, Value: vb, OffsetDelta: 0, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)}
+	batch.addRecord(rec)
+	records.RecordBatch = batch
+
+	frb.RecordsSet = append(frb.RecordsSet, &records)
+}
+
 func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) {
 	r.AddMessageWithTimestamp(topic, partition, key, value, offset, time.Time{}, 0)
 }
@@ -393,6 +418,10 @@ func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Enco
 	r.AddRecordWithTimestamp(topic, partition, key, value, offset, time.Time{})
 }
 
+func (r *FetchResponse) AddRecordBatch(topic string, partition int32, key, value Encoder, offset int64, producerID int64, isTransactional bool) {
+	r.AddRecordBatchWithTimestamp(topic, partition, key, value, offset, producerID, isTransactional, time.Time{})
+}
+
 func (r *FetchResponse) SetLastOffsetDelta(topic string, partition int32, offset int32) {
 	frb := r.getOrCreateBlock(topic, partition)
 	if len(frb.RecordsSet) == 0 {

+ 9 - 37
records.go

@@ -26,6 +26,11 @@ func newDefaultRecords(batch *RecordBatch) Records {
 	return Records{recordsType: defaultRecords, RecordBatch: batch}
 }
 
+func newControlRecords(cr ControlRecord) Records {
+	//TODO
+	return Records{}
+}
+
 // setTypeFromFields sets type of Records depending on which of MsgSet or RecordBatch is not nil.
 // The first return value indicates whether both fields are nil (and the type is not set).
 // If both fields are not nil, it returns an error.
@@ -145,6 +150,7 @@ func (r *Records) isPartial() (bool, error) {
 }
 
 func (r *Records) isControl() (bool, error) {
+	//TODO i guess there is a type that we should support properly
 	if r.recordsType == unknownRecords {
 		if empty, err := r.setTypeFromFields(); err != nil || empty {
 			return false, err
@@ -200,43 +206,9 @@ func (r *Records) getControlRecord() (ControlRecord, error) {
 
 	firstRecord := r.RecordBatch.Records[0]
 	controlRecord := ControlRecord{}
-	{
-		var err error
-		valueDecoder := realDecoder{raw: firstRecord.Value}
-		controlRecord.Version, err = valueDecoder.getInt16()
-		if err != nil {
-			return ControlRecord{}, err
-		}
-		controlRecord.CoordinatorEpoch, err = valueDecoder.getInt32()
-		if err != nil {
-			return ControlRecord{}, err
-		}
-	}
-	{
-		var err error
-		keyDecoder := realDecoder{raw: firstRecord.Key}
-
-		// There a version for the value part AND the key part. And I have no idea if they are supposed to match or not
-		// Either way, all these version can only be 0 for now
-		controlRecord.Version, err = keyDecoder.getInt16()
-		if err != nil {
-			return ControlRecord{}, err
-		}
-
-		recordType, err := keyDecoder.getInt16()
-		if err != nil {
-			return ControlRecord{}, err
-		}
-		switch recordType {
-		case 0:
-			controlRecord.Type = ControlRecordAbort
-		case 1:
-			controlRecord.Type = ControlRecordCommit
-		default:
-			// from JAVA implementation:
-			// UNKNOWN is used to indicate a control type which the client is not aware of and should be ignored
-			controlRecord.Type = ControlRecordUnknown
-		}
+	err := controlRecord.decode(&realDecoder{raw: firstRecord.Key}, &realDecoder{raw: firstRecord.Value})
+	if err != nil {
+		return ControlRecord{}, err
 	}
 
 	return controlRecord, nil