Browse Source

Add two other missing broker calls for kafka 0.9

Evan Huus 10 years ago
parent
commit
bf94d2ef46
2 changed files with 46 additions and 0 deletions
  1. 22 0
      broker.go
  2. 24 0
      broker_test.go

+ 22 - 0
broker.go

@@ -284,6 +284,28 @@ func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error
 	return response, nil
 }
 
+func (b *Broker) ListGroups(request *ListGroupsRequest) (*ListGroupsResponse, error) {
+	response := new(ListGroupsResponse)
+
+	err := b.sendAndReceive(request, response)
+	if err != nil {
+		return nil, err
+	}
+
+	return response, nil
+}
+
+func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroupsResponse, error) {
+	response := new(DescribeGroupsResponse)
+
+	err := b.sendAndReceive(request, response)
+	if err != nil {
+		return nil, err
+	}
+
+	return response, nil
+}
+
 func (b *Broker) send(rb requestBody, promiseResponse bool) (*responsePromise, error) {
 	b.lock.Lock()
 	defer b.lock.Unlock()

+ 24 - 0
broker_test.go

@@ -224,4 +224,28 @@ var brokerTestTable = []struct {
 				t.Error("Heartbeat request got no response!")
 			}
 		}},
+
+	{[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
+		func(t *testing.T, broker *Broker) {
+			request := ListGroupsRequest{}
+			response, err := broker.ListGroups(&request)
+			if err != nil {
+				t.Error(err)
+			}
+			if response == nil {
+				t.Error("ListGroups request got no response!")
+			}
+		}},
+
+	{[]byte{0x00, 0x00, 0x00, 0x00},
+		func(t *testing.T, broker *Broker) {
+			request := DescribeGroupsRequest{}
+			response, err := broker.DescribeGroups(&request)
+			if err != nil {
+				t.Error(err)
+			}
+			if response == nil {
+				t.Error("DescribeGroups request got no response!")
+			}
+		}},
 }