package sarama type PartitionMetadata struct { Err KError ID int32 Leader int32 Replicas []int32 Isr []int32 OfflineReplicas []int32 } func (pm *PartitionMetadata) decode(pd packetDecoder, version int16) (err error) { tmp, err := pd.getInt16() if err != nil { return err } pm.Err = KError(tmp) pm.ID, err = pd.getInt32() if err != nil { return err } pm.Leader, err = pd.getInt32() if err != nil { return err } pm.Replicas, err = pd.getInt32Array() if err != nil { return err } pm.Isr, err = pd.getInt32Array() if err != nil { return err } if version >= 5 { pm.OfflineReplicas, err = pd.getInt32Array() if err != nil { return err } } return nil } func (pm *PartitionMetadata) encode(pe packetEncoder, version int16) (err error) { pe.putInt16(int16(pm.Err)) pe.putInt32(pm.ID) pe.putInt32(pm.Leader) err = pe.putInt32Array(pm.Replicas) if err != nil { return err } err = pe.putInt32Array(pm.Isr) if err != nil { return err } if version >= 5 { err = pe.putInt32Array(pm.OfflineReplicas) if err != nil { return err } } return nil } type TopicMetadata struct { Err KError Name string IsInternal bool // Only valid for Version >= 1 Partitions []*PartitionMetadata } func (tm *TopicMetadata) decode(pd packetDecoder, version int16) (err error) { tmp, err := pd.getInt16() if err != nil { return err } tm.Err = KError(tmp) tm.Name, err = pd.getString() if err != nil { return err } if version >= 1 { tm.IsInternal, err = pd.getBool() if err != nil { return err } } n, err := pd.getArrayLength() if err != nil { return err } tm.Partitions = make([]*PartitionMetadata, n) for i := 0; i < n; i++ { tm.Partitions[i] = new(PartitionMetadata) err = tm.Partitions[i].decode(pd, version) if err != nil { return err } } return nil } func (tm *TopicMetadata) encode(pe packetEncoder, version int16) (err error) { pe.putInt16(int16(tm.Err)) err = pe.putString(tm.Name) if err != nil { return err } if version >= 1 { pe.putBool(tm.IsInternal) } err = pe.putArrayLength(len(tm.Partitions)) if err != nil { return err } for _, pm := range tm.Partitions { err = pm.encode(pe, version) if err != nil { return err } } return nil } type MetadataResponse struct { Version int16 ThrottleTimeMs int32 Brokers []*Broker ClusterID *string ControllerID int32 Topics []*TopicMetadata } func (r *MetadataResponse) decode(pd packetDecoder, version int16) (err error) { r.Version = version if version >= 3 { r.ThrottleTimeMs, err = pd.getInt32() if err != nil { return err } } n, err := pd.getArrayLength() if err != nil { return err } r.Brokers = make([]*Broker, n) for i := 0; i < n; i++ { r.Brokers[i] = new(Broker) err = r.Brokers[i].decode(pd, version) if err != nil { return err } } if version >= 2 { r.ClusterID, err = pd.getNullableString() if err != nil { return err } } if version >= 1 { r.ControllerID, err = pd.getInt32() if err != nil { return err } } else { r.ControllerID = -1 } n, err = pd.getArrayLength() if err != nil { return err } r.Topics = make([]*TopicMetadata, n) for i := 0; i < n; i++ { r.Topics[i] = new(TopicMetadata) err = r.Topics[i].decode(pd, version) if err != nil { return err } } return nil } func (r *MetadataResponse) encode(pe packetEncoder) error { if r.Version >= 3 { pe.putInt32(r.ThrottleTimeMs) } err := pe.putArrayLength(len(r.Brokers)) if err != nil { return err } for _, broker := range r.Brokers { err = broker.encode(pe, r.Version) if err != nil { return err } } if r.Version >= 2 { err := pe.putNullableString(r.ClusterID) if err != nil { return err } } if r.Version >= 1 { pe.putInt32(r.ControllerID) } err = pe.putArrayLength(len(r.Topics)) if err != nil { return err } for _, tm := range r.Topics { err = tm.encode(pe, r.Version) if err != nil { return err } } return nil } func (r *MetadataResponse) key() int16 { return 3 } func (r *MetadataResponse) version() int16 { return r.Version } func (r *MetadataResponse) headerVersion() int16 { return 0 } func (r *MetadataResponse) requiredVersion() KafkaVersion { switch r.Version { case 1: return V0_10_0_0 case 2: return V0_10_1_0 case 3, 4: return V0_11_0_0 case 5: return V1_0_0_0 default: return MinVersion } } // testing API func (r *MetadataResponse) AddBroker(addr string, id int32) { r.Brokers = append(r.Brokers, &Broker{id: id, addr: addr}) } func (r *MetadataResponse) AddTopic(topic string, err KError) *TopicMetadata { var tmatch *TopicMetadata for _, tm := range r.Topics { if tm.Name == topic { tmatch = tm goto foundTopic } } tmatch = new(TopicMetadata) tmatch.Name = topic r.Topics = append(r.Topics, tmatch) foundTopic: tmatch.Err = err return tmatch } func (r *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32, replicas, isr []int32, offline []int32, err KError) { tmatch := r.AddTopic(topic, ErrNoError) var pmatch *PartitionMetadata for _, pm := range tmatch.Partitions { if pm.ID == partition { pmatch = pm goto foundPartition } } pmatch = new(PartitionMetadata) pmatch.ID = partition tmatch.Partitions = append(tmatch.Partitions, pmatch) foundPartition: pmatch.Leader = brokerID pmatch.Replicas = replicas pmatch.Isr = isr pmatch.OfflineReplicas = offline pmatch.Err = err }