123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325 |
- package sarama
- type PartitionMetadata struct {
- Err KError
- ID int32
- Leader int32
- Replicas []int32
- Isr []int32
- OfflineReplicas []int32
- }
- func (pm *PartitionMetadata) decode(pd packetDecoder, version int16) (err error) {
- tmp, err := pd.getInt16()
- if err != nil {
- return err
- }
- pm.Err = KError(tmp)
- pm.ID, err = pd.getInt32()
- if err != nil {
- return err
- }
- pm.Leader, err = pd.getInt32()
- if err != nil {
- return err
- }
- pm.Replicas, err = pd.getInt32Array()
- if err != nil {
- return err
- }
- pm.Isr, err = pd.getInt32Array()
- if err != nil {
- return err
- }
- if version >= 5 {
- pm.OfflineReplicas, err = pd.getInt32Array()
- if err != nil {
- return err
- }
- }
- return nil
- }
- func (pm *PartitionMetadata) encode(pe packetEncoder, version int16) (err error) {
- pe.putInt16(int16(pm.Err))
- pe.putInt32(pm.ID)
- pe.putInt32(pm.Leader)
- err = pe.putInt32Array(pm.Replicas)
- if err != nil {
- return err
- }
- err = pe.putInt32Array(pm.Isr)
- if err != nil {
- return err
- }
- if version >= 5 {
- err = pe.putInt32Array(pm.OfflineReplicas)
- if err != nil {
- return err
- }
- }
- return nil
- }
- type TopicMetadata struct {
- Err KError
- Name string
- IsInternal bool // Only valid for Version >= 1
- Partitions []*PartitionMetadata
- }
- func (tm *TopicMetadata) decode(pd packetDecoder, version int16) (err error) {
- tmp, err := pd.getInt16()
- if err != nil {
- return err
- }
- tm.Err = KError(tmp)
- tm.Name, err = pd.getString()
- if err != nil {
- return err
- }
- if version >= 1 {
- tm.IsInternal, err = pd.getBool()
- if err != nil {
- return err
- }
- }
- n, err := pd.getArrayLength()
- if err != nil {
- return err
- }
- tm.Partitions = make([]*PartitionMetadata, n)
- for i := 0; i < n; i++ {
- tm.Partitions[i] = new(PartitionMetadata)
- err = tm.Partitions[i].decode(pd, version)
- if err != nil {
- return err
- }
- }
- return nil
- }
- func (tm *TopicMetadata) encode(pe packetEncoder, version int16) (err error) {
- pe.putInt16(int16(tm.Err))
- err = pe.putString(tm.Name)
- if err != nil {
- return err
- }
- if version >= 1 {
- pe.putBool(tm.IsInternal)
- }
- err = pe.putArrayLength(len(tm.Partitions))
- if err != nil {
- return err
- }
- for _, pm := range tm.Partitions {
- err = pm.encode(pe, version)
- if err != nil {
- return err
- }
- }
- return nil
- }
- type MetadataResponse struct {
- Version int16
- ThrottleTimeMs int32
- Brokers []*Broker
- ClusterID *string
- ControllerID int32
- Topics []*TopicMetadata
- }
- func (r *MetadataResponse) decode(pd packetDecoder, version int16) (err error) {
- r.Version = version
- if version >= 3 {
- r.ThrottleTimeMs, err = pd.getInt32()
- if err != nil {
- return err
- }
- }
- n, err := pd.getArrayLength()
- if err != nil {
- return err
- }
- r.Brokers = make([]*Broker, n)
- for i := 0; i < n; i++ {
- r.Brokers[i] = new(Broker)
- err = r.Brokers[i].decode(pd, version)
- if err != nil {
- return err
- }
- }
- if version >= 2 {
- r.ClusterID, err = pd.getNullableString()
- if err != nil {
- return err
- }
- }
- if version >= 1 {
- r.ControllerID, err = pd.getInt32()
- if err != nil {
- return err
- }
- } else {
- r.ControllerID = -1
- }
- n, err = pd.getArrayLength()
- if err != nil {
- return err
- }
- r.Topics = make([]*TopicMetadata, n)
- for i := 0; i < n; i++ {
- r.Topics[i] = new(TopicMetadata)
- err = r.Topics[i].decode(pd, version)
- if err != nil {
- return err
- }
- }
- return nil
- }
- func (r *MetadataResponse) encode(pe packetEncoder) error {
- if r.Version >= 3 {
- pe.putInt32(r.ThrottleTimeMs)
- }
- err := pe.putArrayLength(len(r.Brokers))
- if err != nil {
- return err
- }
- for _, broker := range r.Brokers {
- err = broker.encode(pe, r.Version)
- if err != nil {
- return err
- }
- }
- if r.Version >= 2 {
- err := pe.putNullableString(r.ClusterID)
- if err != nil {
- return err
- }
- }
- if r.Version >= 1 {
- pe.putInt32(r.ControllerID)
- }
- err = pe.putArrayLength(len(r.Topics))
- if err != nil {
- return err
- }
- for _, tm := range r.Topics {
- err = tm.encode(pe, r.Version)
- if err != nil {
- return err
- }
- }
- return nil
- }
- func (r *MetadataResponse) key() int16 {
- return 3
- }
- func (r *MetadataResponse) version() int16 {
- return r.Version
- }
- func (r *MetadataResponse) headerVersion() int16 {
- return 0
- }
- func (r *MetadataResponse) requiredVersion() KafkaVersion {
- switch r.Version {
- case 1:
- return V0_10_0_0
- case 2:
- return V0_10_1_0
- case 3, 4:
- return V0_11_0_0
- case 5:
- return V1_0_0_0
- default:
- return MinVersion
- }
- }
- // testing API
- func (r *MetadataResponse) AddBroker(addr string, id int32) {
- r.Brokers = append(r.Brokers, &Broker{id: id, addr: addr})
- }
- func (r *MetadataResponse) AddTopic(topic string, err KError) *TopicMetadata {
- var tmatch *TopicMetadata
- for _, tm := range r.Topics {
- if tm.Name == topic {
- tmatch = tm
- goto foundTopic
- }
- }
- tmatch = new(TopicMetadata)
- tmatch.Name = topic
- r.Topics = append(r.Topics, tmatch)
- foundTopic:
- tmatch.Err = err
- return tmatch
- }
- func (r *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32, replicas, isr []int32, offline []int32, err KError) {
- tmatch := r.AddTopic(topic, ErrNoError)
- var pmatch *PartitionMetadata
- for _, pm := range tmatch.Partitions {
- if pm.ID == partition {
- pmatch = pm
- goto foundPartition
- }
- }
- pmatch = new(PartitionMetadata)
- pmatch.ID = partition
- tmatch.Partitions = append(tmatch.Partitions, pmatch)
- foundPartition:
- pmatch.Leader = brokerID
- pmatch.Replicas = replicas
- pmatch.Isr = isr
- pmatch.OfflineReplicas = offline
- pmatch.Err = err
- }
|