describe_log_dirs_response.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. package sarama
  2. import "time"
  3. type DescribeLogDirsResponse struct {
  4. ThrottleTime time.Duration
  5. // Version 0 and 1 are equal
  6. // The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
  7. Version int16
  8. LogDirs []DescribeLogDirsResponseDirMetadata
  9. }
  10. func (r *DescribeLogDirsResponse) encode(pe packetEncoder) error {
  11. pe.putInt32(int32(r.ThrottleTime / time.Millisecond))
  12. if err := pe.putArrayLength(len(r.LogDirs)); err != nil {
  13. return err
  14. }
  15. for _, dir := range r.LogDirs {
  16. if err := dir.encode(pe); err != nil {
  17. return err
  18. }
  19. }
  20. return nil
  21. }
  22. func (r *DescribeLogDirsResponse) decode(pd packetDecoder, version int16) error {
  23. throttleTime, err := pd.getInt32()
  24. if err != nil {
  25. return err
  26. }
  27. r.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
  28. // Decode array of DescribeLogDirsResponseDirMetadata
  29. n, err := pd.getArrayLength()
  30. if err != nil {
  31. return err
  32. }
  33. r.LogDirs = make([]DescribeLogDirsResponseDirMetadata, n)
  34. for i := 0; i < n; i++ {
  35. dir := DescribeLogDirsResponseDirMetadata{}
  36. if err := dir.decode(pd, version); err != nil {
  37. return err
  38. }
  39. r.LogDirs[i] = dir
  40. }
  41. return nil
  42. }
  43. func (r *DescribeLogDirsResponse) key() int16 {
  44. return 35
  45. }
  46. func (r *DescribeLogDirsResponse) version() int16 {
  47. return r.Version
  48. }
  49. func (r *DescribeLogDirsResponse) requiredVersion() KafkaVersion {
  50. return V1_0_0_0
  51. }
  52. type DescribeLogDirsResponseDirMetadata struct {
  53. ErrorCode KError
  54. // The absolute log directory path
  55. Path string
  56. Topics []DescribeLogDirsResponseTopic
  57. }
  58. func (r *DescribeLogDirsResponseDirMetadata) encode(pe packetEncoder) error {
  59. pe.putInt16(int16(r.ErrorCode))
  60. if err := pe.putString(r.Path); err != nil {
  61. return err
  62. }
  63. for _, topic := range r.Topics {
  64. if err := topic.encode(pe); err != nil {
  65. return err
  66. }
  67. }
  68. return nil
  69. }
  70. func (r *DescribeLogDirsResponseDirMetadata) decode(pd packetDecoder, version int16) error {
  71. errCode, err := pd.getInt16()
  72. if err != nil {
  73. return err
  74. }
  75. r.ErrorCode = KError(errCode)
  76. path, err := pd.getString()
  77. if err != nil {
  78. return err
  79. }
  80. r.Path = path
  81. // Decode array of DescribeLogDirsResponseTopic
  82. n, err := pd.getArrayLength()
  83. if err != nil {
  84. return err
  85. }
  86. r.Topics = make([]DescribeLogDirsResponseTopic, n)
  87. for i := 0; i < n; i++ {
  88. t := DescribeLogDirsResponseTopic{}
  89. if err := t.decode(pd, version); err != nil {
  90. return err
  91. }
  92. r.Topics[i] = t
  93. }
  94. return nil
  95. }
  96. // DescribeLogDirsResponseTopic contains a topic's partitions descriptions
  97. type DescribeLogDirsResponseTopic struct {
  98. Topic string
  99. Partitions []DescribeLogDirsResponsePartition
  100. }
  101. func (r *DescribeLogDirsResponseTopic) encode(pe packetEncoder) error {
  102. if err := pe.putString(r.Topic); err != nil {
  103. return err
  104. }
  105. for _, partition := range r.Partitions {
  106. if err := partition.encode(pe); err != nil {
  107. return err
  108. }
  109. }
  110. return nil
  111. }
  112. func (r *DescribeLogDirsResponseTopic) decode(pd packetDecoder, version int16) error {
  113. t, err := pd.getString()
  114. if err != nil {
  115. return err
  116. }
  117. r.Topic = t
  118. n, err := pd.getArrayLength()
  119. if err != nil {
  120. return err
  121. }
  122. r.Partitions = make([]DescribeLogDirsResponsePartition, n)
  123. for i := 0; i < n; i++ {
  124. p := DescribeLogDirsResponsePartition{}
  125. if err := p.decode(pd, version); err != nil {
  126. return err
  127. }
  128. r.Partitions[i] = p
  129. }
  130. return nil
  131. }
  132. // DescribeLogDirsResponsePartition describes a partition's log directory
  133. type DescribeLogDirsResponsePartition struct {
  134. PartitionID int32
  135. // The size of the log segments of the partition in bytes.
  136. Size int64
  137. // The lag of the log's LEO w.r.t. partition's HW (if it is the current log for the partition) or
  138. // current replica's LEO (if it is the future log for the partition)
  139. OffsetLag int64
  140. // True if this log is created by AlterReplicaLogDirsRequest and will replace the current log of
  141. // the replica in the future.
  142. IsTemporary bool
  143. }
  144. func (r *DescribeLogDirsResponsePartition) encode(pe packetEncoder) error {
  145. pe.putInt32(r.PartitionID)
  146. pe.putInt64(r.Size)
  147. pe.putInt64(r.OffsetLag)
  148. pe.putBool(r.IsTemporary)
  149. return nil
  150. }
  151. func (r *DescribeLogDirsResponsePartition) decode(pd packetDecoder, version int16) error {
  152. pID, err := pd.getInt32()
  153. if err != nil {
  154. return err
  155. }
  156. r.PartitionID = pID
  157. size, err := pd.getInt64()
  158. if err != nil {
  159. return err
  160. }
  161. r.Size = size
  162. lag, err := pd.getInt64()
  163. if err != nil {
  164. return err
  165. }
  166. r.OffsetLag = lag
  167. isTemp, err := pd.getBool()
  168. if err != nil {
  169. return err
  170. }
  171. r.IsTemporary = isTemp
  172. return nil
  173. }