|
@@ -574,6 +574,7 @@ func (mr *MockProduceResponse) getError(topic string, partition int32) KError {
|
|
|
// MockOffsetFetchResponse is a `OffsetFetchResponse` builder.
|
|
// MockOffsetFetchResponse is a `OffsetFetchResponse` builder.
|
|
|
type MockOffsetFetchResponse struct {
|
|
type MockOffsetFetchResponse struct {
|
|
|
offsets map[string]map[string]map[int32]*OffsetFetchResponseBlock
|
|
offsets map[string]map[string]map[int32]*OffsetFetchResponseBlock
|
|
|
|
|
+ error KError
|
|
|
t TestReporter
|
|
t TestReporter
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -599,15 +600,25 @@ func (mr *MockOffsetFetchResponse) SetOffset(group, topic string, partition int3
|
|
|
return mr
|
|
return mr
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+func (mr *MockOffsetFetchResponse) SetError(kerror KError) *MockOffsetFetchResponse {
|
|
|
|
|
+ mr.error = kerror
|
|
|
|
|
+ return mr
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
func (mr *MockOffsetFetchResponse) For(reqBody versionedDecoder) encoder {
|
|
func (mr *MockOffsetFetchResponse) For(reqBody versionedDecoder) encoder {
|
|
|
req := reqBody.(*OffsetFetchRequest)
|
|
req := reqBody.(*OffsetFetchRequest)
|
|
|
group := req.ConsumerGroup
|
|
group := req.ConsumerGroup
|
|
|
- res := &OffsetFetchResponse{}
|
|
|
|
|
|
|
+ res := &OffsetFetchResponse{Version: req.Version}
|
|
|
|
|
+
|
|
|
for topic, partitions := range mr.offsets[group] {
|
|
for topic, partitions := range mr.offsets[group] {
|
|
|
for partition, block := range partitions {
|
|
for partition, block := range partitions {
|
|
|
res.AddBlock(topic, partition, block)
|
|
res.AddBlock(topic, partition, block)
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
|
|
+ if res.Version >= 2 {
|
|
|
|
|
+ res.Err = mr.error
|
|
|
|
|
+ }
|
|
|
return res
|
|
return res
|
|
|
}
|
|
}
|
|
|
|
|
|