|
@@ -71,7 +71,7 @@ func TestSimpleBrokerCommunication(t *testing.T) {
|
|
|
|
|
|
broker.id = 0
|
|
|
conf := NewConfig()
|
|
|
- conf.Version = V0_10_0_0
|
|
|
+ conf.Version = tt.version
|
|
|
err := broker.Open(conf)
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
@@ -97,11 +97,13 @@ func TestSimpleBrokerCommunication(t *testing.T) {
|
|
|
|
|
|
|
|
|
var brokerTestTable = []struct {
|
|
|
+ version KafkaVersion
|
|
|
name string
|
|
|
response []byte
|
|
|
runner func(*testing.T, *Broker)
|
|
|
}{
|
|
|
- {"MetadataRequest",
|
|
|
+ {V0_10_0_0,
|
|
|
+ "MetadataRequest",
|
|
|
[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
|
|
|
func(t *testing.T, broker *Broker) {
|
|
|
request := MetadataRequest{}
|
|
@@ -114,7 +116,8 @@ var brokerTestTable = []struct {
|
|
|
}
|
|
|
}},
|
|
|
|
|
|
- {"ConsumerMetadataRequest",
|
|
|
+ {V0_10_0_0,
|
|
|
+ "ConsumerMetadataRequest",
|
|
|
[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 't', 0x00, 0x00, 0x00, 0x00},
|
|
|
func(t *testing.T, broker *Broker) {
|
|
|
request := ConsumerMetadataRequest{}
|
|
@@ -127,7 +130,8 @@ var brokerTestTable = []struct {
|
|
|
}
|
|
|
}},
|
|
|
|
|
|
- {"ProduceRequest (NoResponse)",
|
|
|
+ {V0_10_0_0,
|
|
|
+ "ProduceRequest (NoResponse)",
|
|
|
[]byte{},
|
|
|
func(t *testing.T, broker *Broker) {
|
|
|
request := ProduceRequest{}
|
|
@@ -141,7 +145,8 @@ var brokerTestTable = []struct {
|
|
|
}
|
|
|
}},
|
|
|
|
|
|
- {"ProduceRequest (WaitForLocal)",
|
|
|
+ {V0_10_0_0,
|
|
|
+ "ProduceRequest (WaitForLocal)",
|
|
|
[]byte{0x00, 0x00, 0x00, 0x00},
|
|
|
func(t *testing.T, broker *Broker) {
|
|
|
request := ProduceRequest{}
|
|
@@ -155,7 +160,8 @@ var brokerTestTable = []struct {
|
|
|
}
|
|
|
}},
|
|
|
|
|
|
- {"FetchRequest",
|
|
|
+ {V0_10_0_0,
|
|
|
+ "FetchRequest",
|
|
|
[]byte{0x00, 0x00, 0x00, 0x00},
|
|
|
func(t *testing.T, broker *Broker) {
|
|
|
request := FetchRequest{}
|
|
@@ -168,7 +174,8 @@ var brokerTestTable = []struct {
|
|
|
}
|
|
|
}},
|
|
|
|
|
|
- {"OffsetFetchRequest",
|
|
|
+ {V0_10_0_0,
|
|
|
+ "OffsetFetchRequest",
|
|
|
[]byte{0x00, 0x00, 0x00, 0x00},
|
|
|
func(t *testing.T, broker *Broker) {
|
|
|
request := OffsetFetchRequest{}
|
|
@@ -181,7 +188,8 @@ var brokerTestTable = []struct {
|
|
|
}
|
|
|
}},
|
|
|
|
|
|
- {"OffsetCommitRequest",
|
|
|
+ {V0_10_0_0,
|
|
|
+ "OffsetCommitRequest",
|
|
|
[]byte{0x00, 0x00, 0x00, 0x00},
|
|
|
func(t *testing.T, broker *Broker) {
|
|
|
request := OffsetCommitRequest{}
|
|
@@ -194,7 +202,8 @@ var brokerTestTable = []struct {
|
|
|
}
|
|
|
}},
|
|
|
|
|
|
- {"OffsetRequest",
|
|
|
+ {V0_10_0_0,
|
|
|
+ "OffsetRequest",
|
|
|
[]byte{0x00, 0x00, 0x00, 0x00},
|
|
|
func(t *testing.T, broker *Broker) {
|
|
|
request := OffsetRequest{}
|
|
@@ -207,7 +216,8 @@ var brokerTestTable = []struct {
|
|
|
}
|
|
|
}},
|
|
|
|
|
|
- {"JoinGroupRequest",
|
|
|
+ {V0_10_0_0,
|
|
|
+ "JoinGroupRequest",
|
|
|
[]byte{0x00, 0x17, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
|
|
|
func(t *testing.T, broker *Broker) {
|
|
|
request := JoinGroupRequest{}
|
|
@@ -220,7 +230,8 @@ var brokerTestTable = []struct {
|
|
|
}
|
|
|
}},
|
|
|
|
|
|
- {"SyncGroupRequest",
|
|
|
+ {V0_10_0_0,
|
|
|
+ "SyncGroupRequest",
|
|
|
[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
|
|
|
func(t *testing.T, broker *Broker) {
|
|
|
request := SyncGroupRequest{}
|
|
@@ -233,7 +244,8 @@ var brokerTestTable = []struct {
|
|
|
}
|
|
|
}},
|
|
|
|
|
|
- {"LeaveGroupRequest",
|
|
|
+ {V0_10_0_0,
|
|
|
+ "LeaveGroupRequest",
|
|
|
[]byte{0x00, 0x00},
|
|
|
func(t *testing.T, broker *Broker) {
|
|
|
request := LeaveGroupRequest{}
|
|
@@ -246,7 +258,8 @@ var brokerTestTable = []struct {
|
|
|
}
|
|
|
}},
|
|
|
|
|
|
- {"HeartbeatRequest",
|
|
|
+ {V0_10_0_0,
|
|
|
+ "HeartbeatRequest",
|
|
|
[]byte{0x00, 0x00},
|
|
|
func(t *testing.T, broker *Broker) {
|
|
|
request := HeartbeatRequest{}
|
|
@@ -259,7 +272,8 @@ var brokerTestTable = []struct {
|
|
|
}
|
|
|
}},
|
|
|
|
|
|
- {"ListGroupsRequest",
|
|
|
+ {V0_10_0_0,
|
|
|
+ "ListGroupsRequest",
|
|
|
[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
|
|
|
func(t *testing.T, broker *Broker) {
|
|
|
request := ListGroupsRequest{}
|
|
@@ -272,7 +286,8 @@ var brokerTestTable = []struct {
|
|
|
}
|
|
|
}},
|
|
|
|
|
|
- {"DescribeGroupsRequest",
|
|
|
+ {V0_10_0_0,
|
|
|
+ "DescribeGroupsRequest",
|
|
|
[]byte{0x00, 0x00, 0x00, 0x00},
|
|
|
func(t *testing.T, broker *Broker) {
|
|
|
request := DescribeGroupsRequest{}
|
|
@@ -285,7 +300,8 @@ var brokerTestTable = []struct {
|
|
|
}
|
|
|
}},
|
|
|
|
|
|
- {"ApiVersionsRequest",
|
|
|
+ {V0_10_0_0,
|
|
|
+ "ApiVersionsRequest",
|
|
|
[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
|
|
|
func(t *testing.T, broker *Broker) {
|
|
|
request := ApiVersionsRequest{}
|
|
@@ -298,7 +314,8 @@ var brokerTestTable = []struct {
|
|
|
}
|
|
|
}},
|
|
|
|
|
|
- {"DeleteGroupsRequest",
|
|
|
+ {V1_1_0_0,
|
|
|
+ "DeleteGroupsRequest",
|
|
|
[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
|
|
|
func(t *testing.T, broker *Broker) {
|
|
|
request := DeleteGroupsRequest{}
|