瀏覽代碼

Permit setting version on mock produce response

This allows setting the version of the message data struct for
MockProduceResponse. This change is very similar to what was already
done in pull request #939

In order to mock a "produce" request for Kafka version 0.10.2.0 you
would use the following:

    leader.SetHandlerByMap(map[string]MockResponse{
    	"ProduceRequest": NewMockProduceResponse(t).
    		SetVersion(2).
    		SetError("my_topic", 0, ErrNoError),
    })
Friedrich Große 8 年之前
父節點
當前提交
893be7d533
共有 2 個文件被更改,包括 10 次插入1 次删除
  1. 1 0
      async_producer_test.go
  2. 9 1
      mockresponses.go

+ 1 - 0
async_producer_test.go

@@ -639,6 +639,7 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) {
 
 	leader.SetHandlerByMap(map[string]MockResponse{
 		"ProduceRequest": NewMockProduceResponse(t).
+			SetVersion(0).
 			SetError("my_topic", 0, ErrNoError),
 	})
 

+ 9 - 1
mockresponses.go

@@ -384,6 +384,7 @@ func (mr *MockOffsetCommitResponse) getError(group, topic string, partition int3
 
 // MockProduceResponse is a `ProduceResponse` builder.
 type MockProduceResponse struct {
+	version      int16
 	errors map[string]map[int32]KError
 	t      TestReporter
 }
@@ -392,6 +393,11 @@ func NewMockProduceResponse(t TestReporter) *MockProduceResponse {
 	return &MockProduceResponse{t: t}
 }
 
+func (mr *MockProduceResponse) SetVersion(version int16) *MockProduceResponse {
+	mr.version = version
+	return mr
+}
+
 func (mr *MockProduceResponse) SetError(topic string, partition int32, kerror KError) *MockProduceResponse {
 	if mr.errors == nil {
 		mr.errors = make(map[string]map[int32]KError)
@@ -407,7 +413,9 @@ func (mr *MockProduceResponse) SetError(topic string, partition int32, kerror KE
 
 func (mr *MockProduceResponse) For(reqBody versionedDecoder) encoder {
 	req := reqBody.(*ProduceRequest)
-	res := &ProduceResponse{}
+	res := &ProduceResponse{
+		Version: mr.version,
+	}
 	for topic, partitions := range req.records {
 		for partition := range partitions {
 			res.AddTopicPartition(topic, partition, mr.getError(topic, partition))