Преглед изворни кода

Partial decoding of fetch responses

Evan Huus пре 12 година
родитељ
комит
aea38cd370
1 измењених фајлова са 94 додато и 0 уклоњено
  1. 94 0
      fetch_response.go

+ 94 - 0
fetch_response.go

@@ -1 +1,95 @@
 package kafka
+
+type fetchResponsePartitionBlock struct {
+	id                  int32
+	err                 KError
+	highWaterMarkOffset int64
+	msgSet              messageSet
+}
+
+func (pr *fetchResponsePartitionBlock) decode(pd packetDecoder) (err error) {
+	pr.id, err = pd.getInt32()
+	if err != nil {
+		return err
+	}
+
+	pr.err, err = pd.getError()
+	if err != nil {
+		return err
+	}
+
+	pr.highWaterMarkOffset, err = pd.getInt64()
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+type fetchResponseTopicBlock struct {
+	name       *string
+	partitions []fetchResponsePartitionBlock
+}
+
+func (pr *fetchResponseTopicBlock) decode(pd packetDecoder) (err error) {
+	pr.name, err = pd.getString()
+	if err != nil {
+		return err
+	}
+
+	n, err := pd.getArrayCount()
+	if err != nil {
+		return err
+	}
+
+	pr.partitions = make([]fetchResponsePartitionBlock, n)
+	for i := range pr.partitions {
+		err = (&pr.partitions[i]).decode(pd)
+		if err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+type fetchResponse struct {
+	topics []fetchResponseTopicBlock
+}
+
+func (pr *fetchResponse) decode(pd packetDecoder) (err error) {
+	n, err := pd.getArrayCount()
+	if err != nil {
+		return err
+	}
+
+	pr.topics = make([]fetchResponseTopicBlock, n)
+	for i := range pr.topics {
+		err = (&pr.topics[i]).decode(pd)
+		if err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+func (pr *fetchResponse) staleTopics() []*string {
+	ret := make([]*string, 0)
+
+	for i := range pr.topics {
+		topic := &pr.topics[i]
+
+	currentTopic:
+		for j := range topic.partitions {
+			partition := &topic.partitions[j]
+			switch partition.err {
+			case UNKNOWN, UNKNOWN_TOPIC_OR_PARTITION, LEADER_NOT_AVAILABLE, NOT_LEADER_FOR_PARTITION:
+				ret = append(ret, topic.name)
+				break currentTopic
+			}
+		}
+	}
+
+	return ret
+}