Browse Source

Publish metadata request/response structures, remove dead code

Evan Huus 11 years ago
parent
commit
1e7b3a06da
5 changed files with 53 additions and 98 deletions
  1. 14 14
      metadata_cache.go
  2. 10 27
      metadata_request.go
  3. 8 19
      metadata_response.go
  4. 12 20
      partition_metadata.go
  5. 9 18
      topic_metadata.go

+ 14 - 14
metadata_cache.go

@@ -87,32 +87,32 @@ func (mc *metadataCache) refreshTopics(topics []*string) error {
 		return OutOfBrokers{}
 	}
 
-	decoder, err := broker.Send(mc.client.id, &metadataRequest{topics})
+	decoder, err := broker.Send(mc.client.id, &MetadataRequest{topics})
 	if err != nil {
 		return err
 	}
-	response := decoder.(*metadataResponse)
+	response := decoder.(*MetadataResponse)
 
 	mc.lock.Lock()
 	defer mc.lock.Unlock()
 
-	for i := range response.brokers {
-		broker := &response.brokers[i]
+	for i := range response.Brokers {
+		broker := &response.Brokers[i]
 		mc.brokers[broker.id] = broker
 	}
 
-	for i := range response.topics {
-		topic := &response.topics[i]
-		if topic.err != NO_ERROR {
-			return topic.err
+	for i := range response.Topics {
+		topic := &response.Topics[i]
+		if topic.Err != NO_ERROR {
+			return topic.Err
 		}
-		mc.leaders[*topic.name] = make(map[int32]int32, len(topic.partitions))
-		for j := range topic.partitions {
-			partition := &topic.partitions[j]
-			if partition.err != NO_ERROR {
-				return partition.err
+		mc.leaders[*topic.Name] = make(map[int32]int32, len(topic.Partitions))
+		for j := range topic.Partitions {
+			partition := &topic.Partitions[j]
+			if partition.Err != NO_ERROR {
+				return partition.Err
 			}
-			mc.leaders[*topic.name][partition.id] = partition.leader
+			mc.leaders[*topic.Name][partition.Id] = partition.Leader
 		}
 	}
 

+ 10 - 27
metadata_request.go

@@ -1,41 +1,24 @@
 package kafka
 
-type metadataRequest struct {
-	topics []*string
+type MetadataRequest struct {
+	Topics []*string
 }
 
-func (mr *metadataRequest) encode(pe packetEncoder) {
-	pe.putArrayCount(len(mr.topics))
-	for i := range mr.topics {
-		pe.putString(mr.topics[i])
+func (mr *MetadataRequest) encode(pe packetEncoder) {
+	pe.putArrayCount(len(mr.Topics))
+	for i := range mr.Topics {
+		pe.putString(mr.Topics[i])
 	}
 }
 
-func (mr *metadataRequest) decode(pd packetDecoder) (err error) {
-	n, err := pd.getArrayCount()
-	if err != nil {
-		return err
-	}
-
-	mr.topics = make([]*string, n)
-	for i := 0; i < n; i++ {
-		mr.topics[i], err = pd.getString()
-		if err != nil {
-			return err
-		}
-	}
-
-	return nil
-}
-
-func (mr *metadataRequest) key() int16 {
+func (mr *MetadataRequest) key() int16 {
 	return 3
 }
 
-func (mr *metadataRequest) version() int16 {
+func (mr *MetadataRequest) version() int16 {
 	return 0
 }
 
-func (mr *metadataRequest) responseDecoder() decoder {
-	return new(metadataResponse)
+func (mr *MetadataRequest) responseDecoder() decoder {
+	return new(MetadataResponse)
 }

+ 8 - 19
metadata_response.go

@@ -1,30 +1,19 @@
 package kafka
 
-type metadataResponse struct {
-	brokers []Broker
-	topics  []topicMetadata
+type MetadataResponse struct {
+	Brokers []Broker
+	Topics  []TopicMetadata
 }
 
-func (m *metadataResponse) encode(pe packetEncoder) {
-	pe.putInt32(int32(len(m.brokers)))
-	for i := range m.brokers {
-		(&m.brokers[i]).encode(pe)
-	}
-	pe.putInt32(int32(len(m.topics)))
-	for i := range m.topics {
-		(&m.topics[i]).encode(pe)
-	}
-}
-
-func (m *metadataResponse) decode(pd packetDecoder) (err error) {
+func (m *MetadataResponse) decode(pd packetDecoder) (err error) {
 	n, err := pd.getArrayCount()
 	if err != nil {
 		return err
 	}
 
-	m.brokers = make([]Broker, n)
+	m.Brokers = make([]Broker, n)
 	for i := 0; i < n; i++ {
-		err = (&m.brokers[i]).decode(pd)
+		err = (&m.Brokers[i]).decode(pd)
 		if err != nil {
 			return err
 		}
@@ -35,9 +24,9 @@ func (m *metadataResponse) decode(pd packetDecoder) (err error) {
 		return err
 	}
 
-	m.topics = make([]topicMetadata, n)
+	m.Topics = make([]TopicMetadata, n)
 	for i := 0; i < n; i++ {
-		err = (&m.topics[i]).decode(pd)
+		err = (&m.Topics[i]).decode(pd)
 		if err != nil {
 			return err
 		}

+ 12 - 20
partition_metadata.go

@@ -1,43 +1,35 @@
 package kafka
 
-type partitionMetadata struct {
-	err      KError
-	id       int32
-	leader   int32
-	replicas []int32
-	isr      []int32
+type PartitionMetadata struct {
+	Err      KError
+	Id       int32
+	Leader   int32
+	Replicas []int32
+	Isr      []int32
 }
 
-func (pm *partitionMetadata) encode(pe packetEncoder) {
-	pe.putError(pm.err)
-	pe.putInt32(pm.id)
-	pe.putInt32(pm.leader)
-	pe.putInt32Array(pm.replicas)
-	pe.putInt32Array(pm.isr)
-}
-
-func (pm *partitionMetadata) decode(pd packetDecoder) (err error) {
-	pm.err, err = pd.getError()
+func (pm *PartitionMetadata) decode(pd packetDecoder) (err error) {
+	pm.Err, err = pd.getError()
 	if err != nil {
 		return err
 	}
 
-	pm.id, err = pd.getInt32()
+	pm.Id, err = pd.getInt32()
 	if err != nil {
 		return err
 	}
 
-	pm.leader, err = pd.getInt32()
+	pm.Leader, err = pd.getInt32()
 	if err != nil {
 		return err
 	}
 
-	pm.replicas, err = pd.getInt32Array()
+	pm.Replicas, err = pd.getInt32Array()
 	if err != nil {
 		return err
 	}
 
-	pm.isr, err = pd.getInt32Array()
+	pm.Isr, err = pd.getInt32Array()
 	if err != nil {
 		return err
 	}

+ 9 - 18
topic_metadata.go

@@ -1,27 +1,18 @@
 package kafka
 
-type topicMetadata struct {
-	err        KError
-	name       *string
-	partitions []partitionMetadata
+type TopicMetadata struct {
+	Err        KError
+	Name       *string
+	Partitions []PartitionMetadata
 }
 
-func (tm *topicMetadata) encode(pe packetEncoder) {
-	pe.putError(tm.err)
-	pe.putString(tm.name)
-	pe.putArrayCount(len(tm.partitions))
-	for i := range tm.partitions {
-		(&tm.partitions[i]).encode(pe)
-	}
-}
-
-func (tm *topicMetadata) decode(pd packetDecoder) (err error) {
-	tm.err, err = pd.getError()
+func (tm *TopicMetadata) decode(pd packetDecoder) (err error) {
+	tm.Err, err = pd.getError()
 	if err != nil {
 		return err
 	}
 
-	tm.name, err = pd.getString()
+	tm.Name, err = pd.getString()
 	if err != nil {
 		return err
 	}
@@ -30,9 +21,9 @@ func (tm *topicMetadata) decode(pd packetDecoder) (err error) {
 	if err != nil {
 		return err
 	}
-	tm.partitions = make([]partitionMetadata, n)
+	tm.Partitions = make([]PartitionMetadata, n)
 	for i := 0; i < n; i++ {
-		err = (&tm.partitions[i]).decode(pd)
+		err = (&tm.Partitions[i]).decode(pd)
 		if err != nil {
 			return err
 		}