|
|
@@ -230,3 +230,152 @@ func (mfr *mockFetchResponse) getHighWaterMark(topic string, partition int32) in
|
|
|
}
|
|
|
return partitions[partition]
|
|
|
}
|
|
|
+
|
|
|
+// mockConsumerMetadataResponse is a `ConsumerMetadataResponse` builder.
|
|
|
+type mockConsumerMetadataResponse struct {
|
|
|
+ coordinators map[string]interface{}
|
|
|
+ t *testing.T
|
|
|
+}
|
|
|
+
|
|
|
+func newMockConsumerMetadataResponse(t *testing.T) *mockConsumerMetadataResponse {
|
|
|
+ return &mockConsumerMetadataResponse{
|
|
|
+ coordinators: make(map[string]interface{}),
|
|
|
+ t: t,
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (mr *mockConsumerMetadataResponse) SetCoordinator(group string, broker *mockBroker) *mockConsumerMetadataResponse {
|
|
|
+ mr.coordinators[group] = broker
|
|
|
+ return mr
|
|
|
+}
|
|
|
+
|
|
|
+func (mr *mockConsumerMetadataResponse) SetError(group string, kerror KError) *mockConsumerMetadataResponse {
|
|
|
+ mr.coordinators[group] = kerror
|
|
|
+ return mr
|
|
|
+}
|
|
|
+
|
|
|
+func (mr *mockConsumerMetadataResponse) For(reqBody decoder) encoder {
|
|
|
+ req := reqBody.(*ConsumerMetadataRequest)
|
|
|
+ group := req.ConsumerGroup
|
|
|
+ res := &ConsumerMetadataResponse{}
|
|
|
+ v := mr.coordinators[group]
|
|
|
+ switch v := v.(type) {
|
|
|
+ case *mockBroker:
|
|
|
+ res.Coordinator = &Broker{id: v.BrokerID(), addr: v.Addr()}
|
|
|
+ case KError:
|
|
|
+ res.Err = v
|
|
|
+ }
|
|
|
+ return res
|
|
|
+}
|
|
|
+
|
|
|
+// mockOffsetCommitResponse is a `OffsetCommitResponse` builder.
|
|
|
+type mockOffsetCommitResponse struct {
|
|
|
+ errors map[string]map[string]map[int32]KError
|
|
|
+ t *testing.T
|
|
|
+}
|
|
|
+
|
|
|
+func newMockOffsetCommitResponse(t *testing.T) *mockOffsetCommitResponse {
|
|
|
+ return &mockOffsetCommitResponse{t: t}
|
|
|
+}
|
|
|
+
|
|
|
+func (mr *mockOffsetCommitResponse) SetError(group, topic string, partition int32, kerror KError) *mockOffsetCommitResponse {
|
|
|
+ if mr.errors == nil {
|
|
|
+ mr.errors = make(map[string]map[string]map[int32]KError)
|
|
|
+ }
|
|
|
+ topics := mr.errors[group]
|
|
|
+ if topics == nil {
|
|
|
+ topics = make(map[string]map[int32]KError)
|
|
|
+ mr.errors[group] = topics
|
|
|
+ }
|
|
|
+ partitions := topics[topic]
|
|
|
+ if partitions == nil {
|
|
|
+ partitions = make(map[int32]KError)
|
|
|
+ topics[topic] = partitions
|
|
|
+ }
|
|
|
+ partitions[partition] = kerror
|
|
|
+ return mr
|
|
|
+}
|
|
|
+
|
|
|
+func (mr *mockOffsetCommitResponse) For(reqBody decoder) encoder {
|
|
|
+ req := reqBody.(*OffsetCommitRequest)
|
|
|
+ group := req.ConsumerGroup
|
|
|
+ res := &OffsetCommitResponse{}
|
|
|
+ for topic, partitions := range req.blocks {
|
|
|
+ for partition := range partitions {
|
|
|
+ res.AddError(topic, partition, mr.getError(group, topic, partition))
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return res
|
|
|
+}
|
|
|
+
|
|
|
+func (mr *mockOffsetCommitResponse) getError(group, topic string, partition int32) KError {
|
|
|
+ topics := mr.errors[group]
|
|
|
+ if topics == nil {
|
|
|
+ return ErrNoError
|
|
|
+ }
|
|
|
+ partitions := topics[topic]
|
|
|
+ if partitions == nil {
|
|
|
+ return ErrNoError
|
|
|
+ }
|
|
|
+ kerror, ok := partitions[partition]
|
|
|
+ if !ok {
|
|
|
+ return ErrNoError
|
|
|
+ }
|
|
|
+ return kerror
|
|
|
+}
|
|
|
+
|
|
|
+// mockOffsetFetchResponse is a `OffsetFetchResponse` builder.
|
|
|
+type mockOffsetFetchResponse struct {
|
|
|
+ offsets map[string]map[string]map[int32]*OffsetFetchResponseBlock
|
|
|
+ t *testing.T
|
|
|
+}
|
|
|
+
|
|
|
+func newMockOffsetFetchResponse(t *testing.T) *mockOffsetFetchResponse {
|
|
|
+ return &mockOffsetFetchResponse{t: t}
|
|
|
+}
|
|
|
+
|
|
|
+func (mr *mockOffsetFetchResponse) SetOffset(group, topic string, partition int32, offset int64, metadata string, kerror KError) *mockOffsetFetchResponse {
|
|
|
+ if mr.offsets == nil {
|
|
|
+ mr.offsets = make(map[string]map[string]map[int32]*OffsetFetchResponseBlock)
|
|
|
+ }
|
|
|
+ topics := mr.offsets[group]
|
|
|
+ if topics == nil {
|
|
|
+ topics = make(map[string]map[int32]*OffsetFetchResponseBlock)
|
|
|
+ mr.offsets[group] = topics
|
|
|
+ }
|
|
|
+ partitions := topics[topic]
|
|
|
+ if partitions == nil {
|
|
|
+ partitions = make(map[int32]*OffsetFetchResponseBlock)
|
|
|
+ topics[topic] = partitions
|
|
|
+ }
|
|
|
+ partitions[partition] = &OffsetFetchResponseBlock{offset, metadata, kerror}
|
|
|
+ return mr
|
|
|
+}
|
|
|
+
|
|
|
+func (mr *mockOffsetFetchResponse) For(reqBody decoder) encoder {
|
|
|
+ req := reqBody.(*OffsetFetchRequest)
|
|
|
+ group := req.ConsumerGroup
|
|
|
+ res := &OffsetFetchResponse{}
|
|
|
+ for topic, partitions := range mr.offsets[group] {
|
|
|
+ for partition, block := range partitions {
|
|
|
+ res.AddBlock(topic, partition, block)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return res
|
|
|
+}
|
|
|
+
|
|
|
+//func (mr *mockOffsetFetchResponse) getOffset(group, topic string, partition int32) *OffsetFetchResponseBlock {
|
|
|
+// topics := mr.offsets[group]
|
|
|
+// if topics == nil {
|
|
|
+// return nil
|
|
|
+// }
|
|
|
+// partitions := topics[topic]
|
|
|
+// if partitions == nil {
|
|
|
+// return nil
|
|
|
+// }
|
|
|
+// block, ok := partitions[partition]
|
|
|
+// if !ok {
|
|
|
+// return nil
|
|
|
+// }
|
|
|
+// return block
|
|
|
+//}
|