123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411 |
- package sarama
- import (
- "testing"
- )
- // MockResponse is a response builder interface it defines one method that
- // allows generating a response based on a request body.
- type MockResponse interface {
- For(reqBody decoder) (res encoder)
- }
- type mockWrapper struct {
- res encoder
- }
- func (mw *mockWrapper) For(reqBody decoder) (res encoder) {
- return mw.res
- }
- func newMockWrapper(res encoder) *mockWrapper {
- return &mockWrapper{res: res}
- }
- // mockMetadataResponse is a `MetadataResponse` builder.
- type mockMetadataResponse struct {
- leaders map[string]map[int32]int32
- brokers map[string]int32
- t *testing.T
- }
- func newMockMetadataResponse(t *testing.T) *mockMetadataResponse {
- return &mockMetadataResponse{
- leaders: make(map[string]map[int32]int32),
- brokers: make(map[string]int32),
- t: t,
- }
- }
- func (mmr *mockMetadataResponse) SetLeader(topic string, partition, brokerID int32) *mockMetadataResponse {
- partitions := mmr.leaders[topic]
- if partitions == nil {
- partitions = make(map[int32]int32)
- mmr.leaders[topic] = partitions
- }
- partitions[partition] = brokerID
- return mmr
- }
- func (mmr *mockMetadataResponse) SetBroker(addr string, brokerID int32) *mockMetadataResponse {
- mmr.brokers[addr] = brokerID
- return mmr
- }
- func (mor *mockMetadataResponse) For(reqBody decoder) encoder {
- metadataRequest := reqBody.(*MetadataRequest)
- metadataResponse := &MetadataResponse{}
- for addr, brokerID := range mor.brokers {
- metadataResponse.AddBroker(addr, brokerID)
- }
- if len(metadataRequest.Topics) == 0 {
- for topic, partitions := range mor.leaders {
- for partition, brokerID := range partitions {
- metadataResponse.AddTopicPartition(topic, partition, brokerID, nil, nil, ErrNoError)
- }
- }
- return metadataResponse
- }
- for _, topic := range metadataRequest.Topics {
- for partition, brokerID := range mor.leaders[topic] {
- metadataResponse.AddTopicPartition(topic, partition, brokerID, nil, nil, ErrNoError)
- }
- }
- return metadataResponse
- }
- // mockOffsetResponse is an `OffsetResponse` builder.
- type mockOffsetResponse struct {
- offsets map[string]map[int32]map[int64]int64
- t *testing.T
- }
- func newMockOffsetResponse(t *testing.T) *mockOffsetResponse {
- return &mockOffsetResponse{
- offsets: make(map[string]map[int32]map[int64]int64),
- t: t,
- }
- }
- func (mor *mockOffsetResponse) SetOffset(topic string, partition int32, time, offset int64) *mockOffsetResponse {
- partitions := mor.offsets[topic]
- if partitions == nil {
- partitions = make(map[int32]map[int64]int64)
- mor.offsets[topic] = partitions
- }
- times := partitions[partition]
- if times == nil {
- times = make(map[int64]int64)
- partitions[partition] = times
- }
- times[time] = offset
- return mor
- }
- func (mor *mockOffsetResponse) For(reqBody decoder) encoder {
- offsetRequest := reqBody.(*OffsetRequest)
- offsetResponse := &OffsetResponse{}
- for topic, partitions := range offsetRequest.Blocks {
- for partition, block := range partitions {
- offset := mor.getOffset(topic, partition, block.Time)
- offsetResponse.AddTopicPartition(topic, partition, offset)
- }
- }
- return offsetResponse
- }
- func (mor *mockOffsetResponse) getOffset(topic string, partition int32, time int64) int64 {
- partitions := mor.offsets[topic]
- if partitions == nil {
- mor.t.Errorf("missing topic: %s", topic)
- }
- times := partitions[partition]
- if times == nil {
- mor.t.Errorf("missing partition: %d", partition)
- }
- offset, ok := times[time]
- if !ok {
- mor.t.Errorf("missing time: %d", time)
- }
- return offset
- }
- // mockFetchResponse is a `FetchResponse` builder.
- type mockFetchResponse struct {
- messages map[string]map[int32]map[int64]Encoder
- highWaterMarks map[string]map[int32]int64
- t *testing.T
- batchSize int
- }
- func newMockFetchResponse(t *testing.T, batchSize int) *mockFetchResponse {
- return &mockFetchResponse{
- messages: make(map[string]map[int32]map[int64]Encoder),
- highWaterMarks: make(map[string]map[int32]int64),
- t: t,
- batchSize: batchSize,
- }
- }
- func (mfr *mockFetchResponse) SetMessage(topic string, partition int32, offset int64, msg Encoder) *mockFetchResponse {
- partitions := mfr.messages[topic]
- if partitions == nil {
- partitions = make(map[int32]map[int64]Encoder)
- mfr.messages[topic] = partitions
- }
- messages := partitions[partition]
- if messages == nil {
- messages = make(map[int64]Encoder)
- partitions[partition] = messages
- }
- messages[offset] = msg
- return mfr
- }
- func (mfr *mockFetchResponse) SetHighWaterMark(topic string, partition int32, offset int64) *mockFetchResponse {
- partitions := mfr.highWaterMarks[topic]
- if partitions == nil {
- partitions = make(map[int32]int64)
- mfr.highWaterMarks[topic] = partitions
- }
- partitions[partition] = offset
- return mfr
- }
- func (mfr *mockFetchResponse) For(reqBody decoder) encoder {
- fetchRequest := reqBody.(*FetchRequest)
- res := &FetchResponse{}
- for topic, partitions := range fetchRequest.Blocks {
- for partition, block := range partitions {
- initialOffset := block.FetchOffset
- offset := initialOffset
- maxOffset := initialOffset + int64(mfr.getMessageCount(topic, partition))
- for i := 0; i < mfr.batchSize && offset < maxOffset; {
- msg := mfr.getMessage(topic, partition, offset)
- if msg != nil {
- res.AddMessage(topic, partition, nil, msg, offset)
- i++
- }
- offset++
- }
- fb := res.GetBlock(topic, partition)
- if fb == nil {
- res.AddError(topic, partition, ErrNoError)
- fb = res.GetBlock(topic, partition)
- }
- fb.HighWaterMarkOffset = mfr.getHighWaterMark(topic, partition)
- }
- }
- return res
- }
- func (mfr *mockFetchResponse) getMessage(topic string, partition int32, offset int64) Encoder {
- partitions := mfr.messages[topic]
- if partitions == nil {
- return nil
- }
- messages := partitions[partition]
- if messages == nil {
- return nil
- }
- return messages[offset]
- }
- func (mfr *mockFetchResponse) getMessageCount(topic string, partition int32) int {
- partitions := mfr.messages[topic]
- if partitions == nil {
- return 0
- }
- messages := partitions[partition]
- if messages == nil {
- return 0
- }
- return len(messages)
- }
- func (mfr *mockFetchResponse) getHighWaterMark(topic string, partition int32) int64 {
- partitions := mfr.highWaterMarks[topic]
- if partitions == nil {
- return 0
- }
- return partitions[partition]
- }
- // mockConsumerMetadataResponse is a `ConsumerMetadataResponse` builder.
- type mockConsumerMetadataResponse struct {
- coordinators map[string]interface{}
- t *testing.T
- }
- func newMockConsumerMetadataResponse(t *testing.T) *mockConsumerMetadataResponse {
- return &mockConsumerMetadataResponse{
- coordinators: make(map[string]interface{}),
- t: t,
- }
- }
- func (mr *mockConsumerMetadataResponse) SetCoordinator(group string, broker *mockBroker) *mockConsumerMetadataResponse {
- mr.coordinators[group] = broker
- return mr
- }
- func (mr *mockConsumerMetadataResponse) SetError(group string, kerror KError) *mockConsumerMetadataResponse {
- mr.coordinators[group] = kerror
- return mr
- }
- func (mr *mockConsumerMetadataResponse) For(reqBody decoder) encoder {
- req := reqBody.(*ConsumerMetadataRequest)
- group := req.ConsumerGroup
- res := &ConsumerMetadataResponse{}
- v := mr.coordinators[group]
- switch v := v.(type) {
- case *mockBroker:
- res.Coordinator = &Broker{id: v.BrokerID(), addr: v.Addr()}
- case KError:
- res.Err = v
- }
- return res
- }
- // mockOffsetCommitResponse is a `OffsetCommitResponse` builder.
- type mockOffsetCommitResponse struct {
- errors map[string]map[string]map[int32]KError
- t *testing.T
- }
- func newMockOffsetCommitResponse(t *testing.T) *mockOffsetCommitResponse {
- return &mockOffsetCommitResponse{t: t}
- }
- func (mr *mockOffsetCommitResponse) SetError(group, topic string, partition int32, kerror KError) *mockOffsetCommitResponse {
- if mr.errors == nil {
- mr.errors = make(map[string]map[string]map[int32]KError)
- }
- topics := mr.errors[group]
- if topics == nil {
- topics = make(map[string]map[int32]KError)
- mr.errors[group] = topics
- }
- partitions := topics[topic]
- if partitions == nil {
- partitions = make(map[int32]KError)
- topics[topic] = partitions
- }
- partitions[partition] = kerror
- return mr
- }
- func (mr *mockOffsetCommitResponse) For(reqBody decoder) encoder {
- req := reqBody.(*OffsetCommitRequest)
- group := req.ConsumerGroup
- res := &OffsetCommitResponse{}
- for topic, partitions := range req.blocks {
- for partition := range partitions {
- res.AddError(topic, partition, mr.getError(group, topic, partition))
- }
- }
- return res
- }
- func (mr *mockOffsetCommitResponse) getError(group, topic string, partition int32) KError {
- topics := mr.errors[group]
- if topics == nil {
- return ErrNoError
- }
- partitions := topics[topic]
- if partitions == nil {
- return ErrNoError
- }
- kerror, ok := partitions[partition]
- if !ok {
- return ErrNoError
- }
- return kerror
- }
- // mockProduceResponse is a `ProduceResponse` builder.
- type mockProduceResponse struct {
- errors map[string]map[int32]KError
- t *testing.T
- }
- func newMockProduceResponse(t *testing.T) *mockProduceResponse {
- return &mockProduceResponse{t: t}
- }
- func (mr *mockProduceResponse) SetError(topic string, partition int32, kerror KError) *mockProduceResponse {
- if mr.errors == nil {
- mr.errors = make(map[string]map[int32]KError)
- }
- partitions := mr.errors[topic]
- if partitions == nil {
- partitions = make(map[int32]KError)
- mr.errors[topic] = partitions
- }
- partitions[partition] = kerror
- return mr
- }
- func (mr *mockProduceResponse) For(reqBody decoder) encoder {
- req := reqBody.(*ProduceRequest)
- res := &ProduceResponse{}
- for topic, partitions := range req.MsgSets {
- for partition := range partitions {
- res.AddTopicPartition(topic, partition, mr.getError(topic, partition))
- }
- }
- return res
- }
- func (mr *mockProduceResponse) getError(topic string, partition int32) KError {
- partitions := mr.errors[topic]
- if partitions == nil {
- return ErrNoError
- }
- kerror, ok := partitions[partition]
- if !ok {
- return ErrNoError
- }
- return kerror
- }
- // mockOffsetFetchResponse is a `OffsetFetchResponse` builder.
- type mockOffsetFetchResponse struct {
- offsets map[string]map[string]map[int32]*OffsetFetchResponseBlock
- t *testing.T
- }
- func newMockOffsetFetchResponse(t *testing.T) *mockOffsetFetchResponse {
- return &mockOffsetFetchResponse{t: t}
- }
- func (mr *mockOffsetFetchResponse) SetOffset(group, topic string, partition int32, offset int64, metadata string, kerror KError) *mockOffsetFetchResponse {
- if mr.offsets == nil {
- mr.offsets = make(map[string]map[string]map[int32]*OffsetFetchResponseBlock)
- }
- topics := mr.offsets[group]
- if topics == nil {
- topics = make(map[string]map[int32]*OffsetFetchResponseBlock)
- mr.offsets[group] = topics
- }
- partitions := topics[topic]
- if partitions == nil {
- partitions = make(map[int32]*OffsetFetchResponseBlock)
- topics[topic] = partitions
- }
- partitions[partition] = &OffsetFetchResponseBlock{offset, metadata, kerror}
- return mr
- }
- func (mr *mockOffsetFetchResponse) For(reqBody decoder) encoder {
- req := reqBody.(*OffsetFetchRequest)
- group := req.ConsumerGroup
- res := &OffsetFetchResponse{}
- for topic, partitions := range mr.offsets[group] {
- for partition, block := range partitions {
- res.AddBlock(topic, partition, block)
- }
- }
- return res
- }
|