Browse Source

Merge pull request #867 from hanbing0715/master

add new version kafka and apiversions support
Evan Huus 8 years ago
parent
commit
dd00cf9b0e
3 changed files with 25 additions and 0 deletions
  1. 11 0
      broker.go
  2. 13 0
      broker_test.go
  3. 1 0
      utils.go

+ 11 - 0
broker.go

@@ -355,6 +355,17 @@ func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroups
 	return response, nil
 }
 
+func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse, error) {
+	response := new(ApiVersionsResponse)
+
+	err := b.sendAndReceive(request, response)
+	if err != nil {
+		return nil, err
+	}
+
+	return response, nil
+}
+
 func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) {
 	b.lock.Lock()
 	defer b.lock.Unlock()

+ 13 - 0
broker_test.go

@@ -284,6 +284,19 @@ var brokerTestTable = []struct {
 				t.Error("DescribeGroups request got no response!")
 			}
 		}},
+
+	{"ApiVersionsRequest",
+		[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
+		func(t *testing.T, broker *Broker) {
+			request := ApiVersionsRequest{}
+			response, err := broker.ApiVersions(&request)
+			if err != nil {
+				t.Error(err)
+			}
+			if response == nil {
+				t.Error("ApiVersions request got no response!")
+			}
+		}},
 }
 
 func validateBrokerMetrics(t *testing.T, broker *Broker, mockBrokerMetrics brokerMetrics) {

+ 1 - 0
utils.go

@@ -148,5 +148,6 @@ var (
 	V0_10_0_0  = newKafkaVersion(0, 10, 0, 0)
 	V0_10_0_1  = newKafkaVersion(0, 10, 0, 1)
 	V0_10_1_0  = newKafkaVersion(0, 10, 1, 0)
+	V0_10_2_0  = newKafkaVersion(0, 10, 2, 0)
 	minVersion = V0_8_2_0
 )