| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106 |
- package kafka
- type fetchResponsePartitionBlock struct {
- id int32
- err KError
- highWaterMarkOffset int64
- msgSet messageSet
- }
- func (pr *fetchResponsePartitionBlock) decode(pd packetDecoder) (err error) {
- pr.id, err = pd.getInt32()
- if err != nil {
- return err
- }
- pr.err, err = pd.getError()
- if err != nil {
- return err
- }
- pr.highWaterMarkOffset, err = pd.getInt64()
- if err != nil {
- return err
- }
- msgSetSize, err := pd.getInt32()
- if err != nil {
- return err
- }
- msgSetDecoder, err := pd.getSubset(int(msgSetSize))
- if err != nil {
- return err
- }
- err = (&pr.msgSet).decode(msgSetDecoder)
- return err
- }
- type fetchResponseTopicBlock struct {
- name *string
- partitions []fetchResponsePartitionBlock
- }
- func (pr *fetchResponseTopicBlock) decode(pd packetDecoder) (err error) {
- pr.name, err = pd.getString()
- if err != nil {
- return err
- }
- n, err := pd.getArrayCount()
- if err != nil {
- return err
- }
- pr.partitions = make([]fetchResponsePartitionBlock, n)
- for i := range pr.partitions {
- err = (&pr.partitions[i]).decode(pd)
- if err != nil {
- return err
- }
- }
- return nil
- }
- type fetchResponse struct {
- topics []fetchResponseTopicBlock
- }
- func (pr *fetchResponse) decode(pd packetDecoder) (err error) {
- n, err := pd.getArrayCount()
- if err != nil {
- return err
- }
- pr.topics = make([]fetchResponseTopicBlock, n)
- for i := range pr.topics {
- err = (&pr.topics[i]).decode(pd)
- if err != nil {
- return err
- }
- }
- return nil
- }
- func (pr *fetchResponse) staleTopics() []*string {
- ret := make([]*string, 0)
- for i := range pr.topics {
- topic := &pr.topics[i]
- currentTopic:
- for j := range topic.partitions {
- partition := &topic.partitions[j]
- switch partition.err {
- case UNKNOWN, UNKNOWN_TOPIC_OR_PARTITION, LEADER_NOT_AVAILABLE, NOT_LEADER_FOR_PARTITION:
- ret = append(ret, topic.name)
- break currentTopic
- }
- }
- }
- return ret
- }
|