123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229 |
- package sarama
- import "time"
- type DescribeLogDirsResponse struct {
- ThrottleTime time.Duration
- // Version 0 and 1 are equal
- // The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
- Version int16
- LogDirs []DescribeLogDirsResponseDirMetadata
- }
- func (r *DescribeLogDirsResponse) encode(pe packetEncoder) error {
- pe.putInt32(int32(r.ThrottleTime / time.Millisecond))
- if err := pe.putArrayLength(len(r.LogDirs)); err != nil {
- return err
- }
- for _, dir := range r.LogDirs {
- if err := dir.encode(pe); err != nil {
- return err
- }
- }
- return nil
- }
- func (r *DescribeLogDirsResponse) decode(pd packetDecoder, version int16) error {
- throttleTime, err := pd.getInt32()
- if err != nil {
- return err
- }
- r.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
- // Decode array of DescribeLogDirsResponseDirMetadata
- n, err := pd.getArrayLength()
- if err != nil {
- return err
- }
- r.LogDirs = make([]DescribeLogDirsResponseDirMetadata, n)
- for i := 0; i < n; i++ {
- dir := DescribeLogDirsResponseDirMetadata{}
- if err := dir.decode(pd, version); err != nil {
- return err
- }
- r.LogDirs[i] = dir
- }
- return nil
- }
- func (r *DescribeLogDirsResponse) key() int16 {
- return 35
- }
- func (r *DescribeLogDirsResponse) version() int16 {
- return r.Version
- }
- func (r *DescribeLogDirsResponse) headerVersion() int16 {
- return 0
- }
- func (r *DescribeLogDirsResponse) requiredVersion() KafkaVersion {
- return V1_0_0_0
- }
- type DescribeLogDirsResponseDirMetadata struct {
- ErrorCode KError
- // The absolute log directory path
- Path string
- Topics []DescribeLogDirsResponseTopic
- }
- func (r *DescribeLogDirsResponseDirMetadata) encode(pe packetEncoder) error {
- pe.putInt16(int16(r.ErrorCode))
- if err := pe.putString(r.Path); err != nil {
- return err
- }
- if err := pe.putArrayLength(len(r.Topics)); err != nil {
- return err
- }
- for _, topic := range r.Topics {
- if err := topic.encode(pe); err != nil {
- return err
- }
- }
- return nil
- }
- func (r *DescribeLogDirsResponseDirMetadata) decode(pd packetDecoder, version int16) error {
- errCode, err := pd.getInt16()
- if err != nil {
- return err
- }
- r.ErrorCode = KError(errCode)
- path, err := pd.getString()
- if err != nil {
- return err
- }
- r.Path = path
- // Decode array of DescribeLogDirsResponseTopic
- n, err := pd.getArrayLength()
- if err != nil {
- return err
- }
- r.Topics = make([]DescribeLogDirsResponseTopic, n)
- for i := 0; i < n; i++ {
- t := DescribeLogDirsResponseTopic{}
- if err := t.decode(pd, version); err != nil {
- return err
- }
- r.Topics[i] = t
- }
- return nil
- }
- // DescribeLogDirsResponseTopic contains a topic's partitions descriptions
- type DescribeLogDirsResponseTopic struct {
- Topic string
- Partitions []DescribeLogDirsResponsePartition
- }
- func (r *DescribeLogDirsResponseTopic) encode(pe packetEncoder) error {
- if err := pe.putString(r.Topic); err != nil {
- return err
- }
- if err := pe.putArrayLength(len(r.Partitions)); err != nil {
- return err
- }
- for _, partition := range r.Partitions {
- if err := partition.encode(pe); err != nil {
- return err
- }
- }
- return nil
- }
- func (r *DescribeLogDirsResponseTopic) decode(pd packetDecoder, version int16) error {
- t, err := pd.getString()
- if err != nil {
- return err
- }
- r.Topic = t
- n, err := pd.getArrayLength()
- if err != nil {
- return err
- }
- r.Partitions = make([]DescribeLogDirsResponsePartition, n)
- for i := 0; i < n; i++ {
- p := DescribeLogDirsResponsePartition{}
- if err := p.decode(pd, version); err != nil {
- return err
- }
- r.Partitions[i] = p
- }
- return nil
- }
- // DescribeLogDirsResponsePartition describes a partition's log directory
- type DescribeLogDirsResponsePartition struct {
- PartitionID int32
- // The size of the log segments of the partition in bytes.
- Size int64
- // The lag of the log's LEO w.r.t. partition's HW (if it is the current log for the partition) or
- // current replica's LEO (if it is the future log for the partition)
- OffsetLag int64
- // True if this log is created by AlterReplicaLogDirsRequest and will replace the current log of
- // the replica in the future.
- IsTemporary bool
- }
- func (r *DescribeLogDirsResponsePartition) encode(pe packetEncoder) error {
- pe.putInt32(r.PartitionID)
- pe.putInt64(r.Size)
- pe.putInt64(r.OffsetLag)
- pe.putBool(r.IsTemporary)
- return nil
- }
- func (r *DescribeLogDirsResponsePartition) decode(pd packetDecoder, version int16) error {
- pID, err := pd.getInt32()
- if err != nil {
- return err
- }
- r.PartitionID = pID
- size, err := pd.getInt64()
- if err != nil {
- return err
- }
- r.Size = size
- lag, err := pd.getInt64()
- if err != nil {
- return err
- }
- r.OffsetLag = lag
- isTemp, err := pd.getBool()
- if err != nil {
- return err
- }
- r.IsTemporary = isTemp
- return nil
- }
|