|
|
@@ -384,14 +384,20 @@ func (mr *MockOffsetCommitResponse) getError(group, topic string, partition int3
|
|
|
|
|
|
// MockProduceResponse is a `ProduceResponse` builder.
|
|
|
type MockProduceResponse struct {
|
|
|
- errors map[string]map[int32]KError
|
|
|
- t TestReporter
|
|
|
+ version int16
|
|
|
+ errors map[string]map[int32]KError
|
|
|
+ t TestReporter
|
|
|
}
|
|
|
|
|
|
func NewMockProduceResponse(t TestReporter) *MockProduceResponse {
|
|
|
return &MockProduceResponse{t: t}
|
|
|
}
|
|
|
|
|
|
+func (mr *MockProduceResponse) SetVersion(version int16) *MockProduceResponse {
|
|
|
+ mr.version = version
|
|
|
+ return mr
|
|
|
+}
|
|
|
+
|
|
|
func (mr *MockProduceResponse) SetError(topic string, partition int32, kerror KError) *MockProduceResponse {
|
|
|
if mr.errors == nil {
|
|
|
mr.errors = make(map[string]map[int32]KError)
|
|
|
@@ -407,7 +413,9 @@ func (mr *MockProduceResponse) SetError(topic string, partition int32, kerror KE
|
|
|
|
|
|
func (mr *MockProduceResponse) For(reqBody versionedDecoder) encoder {
|
|
|
req := reqBody.(*ProduceRequest)
|
|
|
- res := &ProduceResponse{}
|
|
|
+ res := &ProduceResponse{
|
|
|
+ Version: mr.version,
|
|
|
+ }
|
|
|
for topic, partitions := range req.records {
|
|
|
for partition := range partitions {
|
|
|
res.AddTopicPartition(topic, partition, mr.getError(topic, partition))
|