describe_log_dirs_request.go 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. package sarama
  2. // DescribeLogDirsRequest is a describe request to get partitions' log size
  3. type DescribeLogDirsRequest struct {
  4. // Version 0 and 1 are equal
  5. // The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
  6. Version int16
  7. // If this is an empty array, all topics will be queried
  8. DescribeTopics []DescribeLogDirsRequestTopic
  9. }
  10. // DescribeLogDirsRequestTopic is a describe request about the log dir of one or more partitions within a Topic
  11. type DescribeLogDirsRequestTopic struct {
  12. Topic string
  13. PartitionIDs []int32
  14. }
  15. func (r *DescribeLogDirsRequest) encode(pe packetEncoder) error {
  16. length := len(r.DescribeTopics)
  17. if length == 0 {
  18. // In order to query all topics we must send null
  19. length = -1
  20. }
  21. if err := pe.putArrayLength(length); err != nil {
  22. return err
  23. }
  24. for _, d := range r.DescribeTopics {
  25. if err := pe.putString(d.Topic); err != nil {
  26. return err
  27. }
  28. if err := pe.putInt32Array(d.PartitionIDs); err != nil {
  29. return err
  30. }
  31. }
  32. return nil
  33. }
  34. func (r *DescribeLogDirsRequest) decode(pd packetDecoder, version int16) error {
  35. n, err := pd.getArrayLength()
  36. if err != nil {
  37. return err
  38. }
  39. if n == -1 {
  40. n = 0
  41. }
  42. topics := make([]DescribeLogDirsRequestTopic, n)
  43. for i := 0; i < n; i++ {
  44. topics[i] = DescribeLogDirsRequestTopic{}
  45. topic, err := pd.getString()
  46. if err != nil {
  47. return err
  48. }
  49. topics[i].Topic = topic
  50. pIDs, err := pd.getInt32Array()
  51. if err != nil {
  52. return err
  53. }
  54. topics[i].PartitionIDs = pIDs
  55. }
  56. r.DescribeTopics = topics
  57. return nil
  58. }
  59. func (r *DescribeLogDirsRequest) key() int16 {
  60. return 35
  61. }
  62. func (r *DescribeLogDirsRequest) version() int16 {
  63. return r.Version
  64. }
  65. func (r *DescribeLogDirsRequest) headerVersion() int16 {
  66. return 1
  67. }
  68. func (r *DescribeLogDirsRequest) requiredVersion() KafkaVersion {
  69. return V1_0_0_0
  70. }