|
@@ -324,6 +324,52 @@ func (mr *mockOffsetCommitResponse) getError(group, topic string, partition int3
|
|
|
return kerror
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+type mockProduceResponse struct {
|
|
|
+ errors map[string]map[int32]KError
|
|
|
+ t *testing.T
|
|
|
+}
|
|
|
+
|
|
|
+func newMockProduceResponse(t *testing.T) *mockProduceResponse {
|
|
|
+ return &mockProduceResponse{t: t}
|
|
|
+}
|
|
|
+
|
|
|
+func (mr *mockProduceResponse) SetError(topic string, partition int32, kerror KError) *mockProduceResponse {
|
|
|
+ if mr.errors == nil {
|
|
|
+ mr.errors = make(map[string]map[int32]KError)
|
|
|
+ }
|
|
|
+ partitions := mr.errors[topic]
|
|
|
+ if partitions == nil {
|
|
|
+ partitions = make(map[int32]KError)
|
|
|
+ mr.errors[topic] = partitions
|
|
|
+ }
|
|
|
+ partitions[partition] = kerror
|
|
|
+ return mr
|
|
|
+}
|
|
|
+
|
|
|
+func (mr *mockProduceResponse) For(reqBody decoder) encoder {
|
|
|
+ req := reqBody.(*ProduceRequest)
|
|
|
+ res := &ProduceResponse{}
|
|
|
+ for topic, partitions := range req.msgSets {
|
|
|
+ for partition := range partitions {
|
|
|
+ res.AddTopicPartition(topic, partition, mr.getError(topic, partition))
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return res
|
|
|
+}
|
|
|
+
|
|
|
+func (mr *mockProduceResponse) getError(topic string, partition int32) KError {
|
|
|
+ partitions := mr.errors[topic]
|
|
|
+ if partitions == nil {
|
|
|
+ return ErrNoError
|
|
|
+ }
|
|
|
+ kerror, ok := partitions[partition]
|
|
|
+ if !ok {
|
|
|
+ return ErrNoError
|
|
|
+ }
|
|
|
+ return kerror
|
|
|
+}
|
|
|
+
|
|
|
|
|
|
type mockOffsetFetchResponse struct {
|
|
|
offsets map[string]map[string]map[int32]*OffsetFetchResponseBlock
|