|
|
@@ -180,6 +180,7 @@ type MockFetchResponse struct {
|
|
|
highWaterMarks map[string]map[int32]int64
|
|
|
t TestReporter
|
|
|
batchSize int
|
|
|
+ version int16
|
|
|
}
|
|
|
|
|
|
func NewMockFetchResponse(t TestReporter, batchSize int) *MockFetchResponse {
|
|
|
@@ -191,6 +192,11 @@ func NewMockFetchResponse(t TestReporter, batchSize int) *MockFetchResponse {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+func (mfr *MockFetchResponse) SetVersion(version int16) *MockFetchResponse {
|
|
|
+ mfr.version = version
|
|
|
+ return mfr
|
|
|
+}
|
|
|
+
|
|
|
func (mfr *MockFetchResponse) SetMessage(topic string, partition int32, offset int64, msg Encoder) *MockFetchResponse {
|
|
|
partitions := mfr.messages[topic]
|
|
|
if partitions == nil {
|
|
|
@@ -218,7 +224,9 @@ func (mfr *MockFetchResponse) SetHighWaterMark(topic string, partition int32, of
|
|
|
|
|
|
func (mfr *MockFetchResponse) For(reqBody versionedDecoder) encoder {
|
|
|
fetchRequest := reqBody.(*FetchRequest)
|
|
|
- res := &FetchResponse{}
|
|
|
+ res := &FetchResponse{
|
|
|
+ Version: mfr.version,
|
|
|
+ }
|
|
|
for topic, partitions := range fetchRequest.blocks {
|
|
|
for partition, block := range partitions {
|
|
|
initialOffset := block.fetchOffset
|