Browse Source

test consumer

FrancoisPoinsot 6 years ago
parent
commit
0432240e61
3 changed files with 47 additions and 32 deletions
  1. 8 26
      consumer_test.go
  2. 39 0
      fetch_response.go
  3. 0 6
      records.go

+ 8 - 26
consumer_test.go

@@ -1133,35 +1133,17 @@ func TestExcludeUncommitted(t *testing.T) {
 	// Given
 	broker0 := NewMockBroker(t, 0)
 
-	// define controlRecord key and value
-	controlRecordAbort := ControlRecord{
-		Version: 0,
-		Type:    ControlRecordAbort,
-	}
-	crKey := &realEncoder{
-		raw: make([]byte, 4),
-	}
-	crValue := &realEncoder{
-		raw: make([]byte, 6),
-	}
-	controlRecordAbort.encode(crKey, crValue)
-
 	fetchResponse := &FetchResponse{
 		Version: 4,
 		Blocks: map[string]map[int32]*FetchResponseBlock{"my_topic": {0: {
-			AbortedTransactions: []*AbortedTransaction{
-				{
-					ProducerID:  7,
-					FirstOffset: 1235,
-				},
-			},
+			AbortedTransactions: []*AbortedTransaction{{ProducerID: 7, FirstOffset: 1235}},
 		}}},
 	}
-	fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 0, 7, true) // committed msg
-	fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 1, 7, true) // uncommitted msg
-	//TODO, need its own specific method
-	fetchResponse.AddRecordBatch("my_topic", 0, ByteEncoder(crValue.raw), ByteEncoder(crKey.raw), 2, 7, true) // abort control record
-	fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 3, 7, true)                                     // committed msg
+	fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 1234, 7, true)   // committed msg
+	fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 1235, 7, true)   // uncommitted msg
+	fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 1236, 7, true)   // uncommitted msg
+	fetchResponse.AddControlRecord("my_topic", 0, 1237, 7, ControlRecordAbort) // abort control record
+	fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 1238, 7, true)   // committed msg
 
 	broker0.SetHandlerByMap(map[string]MockResponse{
 		"MetadataRequest": NewMockMetadataResponse(t).
@@ -1171,7 +1153,7 @@ func TestExcludeUncommitted(t *testing.T) {
 			SetVersion(1).
 			SetOffset("my_topic", 0, OffsetOldest, 0).
 			SetOffset("my_topic", 0, OffsetNewest, 1237),
-		"FetchRequest": NewMockSequence(fetchResponse),
+		"FetchRequest": NewMockWrapper(fetchResponse),
 	})
 
 	cfg := NewConfig()
@@ -1199,7 +1181,7 @@ func TestExcludeUncommitted(t *testing.T) {
 	}
 	select {
 	case message := <-consumer.Messages():
-		assertMessageOffset(t, message, int64(1237))
+		assertMessageOffset(t, message, int64(1238))
 	case err := <-consumer.Errors():
 		t.Error(err)
 	}

+ 39 - 0
fetch_response.go

@@ -410,6 +410,40 @@ func (r *FetchResponse) AddRecordBatchWithTimestamp(topic string, partition int3
 	frb.RecordsSet = append(frb.RecordsSet, &records)
 }
 
+func (r *FetchResponse) AddControlRecordWithTimestamp(topic string, partition int32, offset int64, producerID int64, recordType ControlRecordType, timestamp time.Time) {
+	frb := r.getOrCreateBlock(topic, partition)
+
+	// batch
+	batch := &RecordBatch{
+		Version:         2,
+		LogAppendTime:   r.LogAppendTime,
+		FirstTimestamp:  timestamp,
+		MaxTimestamp:    r.Timestamp,
+		FirstOffset:     offset,
+		LastOffsetDelta: 0,
+		ProducerID:      producerID,
+		IsTransactional: true,
+		Control:         true,
+	}
+
+	// records
+	records := newDefaultRecords(nil)
+	records.RecordBatch = batch
+
+	// record
+	crAbort := ControlRecord{
+		Version: 0,
+		Type:    recordType,
+	}
+	crKey := &realEncoder{raw: make([]byte, 4)}
+	crValue := &realEncoder{raw: make([]byte, 6)}
+	crAbort.encode(crKey, crValue)
+	rec := &Record{Key: ByteEncoder(crKey.raw), Value: ByteEncoder(crValue.raw), OffsetDelta: 0, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)}
+	batch.addRecord(rec)
+
+	frb.RecordsSet = append(frb.RecordsSet, &records)
+}
+
 func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) {
 	r.AddMessageWithTimestamp(topic, partition, key, value, offset, time.Time{}, 0)
 }
@@ -422,6 +456,11 @@ func (r *FetchResponse) AddRecordBatch(topic string, partition int32, key, value
 	r.AddRecordBatchWithTimestamp(topic, partition, key, value, offset, producerID, isTransactional, time.Time{})
 }
 
+func (r *FetchResponse) AddControlRecord(topic string, partition int32, offset int64, producerID int64, recordType ControlRecordType) {
+	// define controlRecord key and value
+	r.AddControlRecordWithTimestamp(topic, partition, offset, producerID, recordType, time.Time{})
+}
+
 func (r *FetchResponse) SetLastOffsetDelta(topic string, partition int32, offset int32) {
 	frb := r.getOrCreateBlock(topic, partition)
 	if len(frb.RecordsSet) == 0 {

+ 0 - 6
records.go

@@ -26,11 +26,6 @@ func newDefaultRecords(batch *RecordBatch) Records {
 	return Records{recordsType: defaultRecords, RecordBatch: batch}
 }
 
-func newControlRecords(cr ControlRecord) Records {
-	//TODO
-	return Records{}
-}
-
 // setTypeFromFields sets type of Records depending on which of MsgSet or RecordBatch is not nil.
 // The first return value indicates whether both fields are nil (and the type is not set).
 // If both fields are not nil, it returns an error.
@@ -150,7 +145,6 @@ func (r *Records) isPartial() (bool, error) {
 }
 
 func (r *Records) isControl() (bool, error) {
-	//TODO i guess there is a type that we should support properly
 	if r.recordsType == unknownRecords {
 		if empty, err := r.setTypeFromFields(); err != nil || empty {
 			return false, err