metadata_request.go 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. package sarama
  2. type MetadataRequest struct {
  3. Version int16
  4. Topics []string
  5. }
  6. func (r *MetadataRequest) encode(pe packetEncoder) error {
  7. if r.Version < 0 || r.Version > 1 {
  8. return PacketEncodingError{"invalid or unsupported MetadataRequest version field"}
  9. }
  10. if r.Version == 0 || r.Topics != nil || len(r.Topics) > 0 {
  11. err := pe.putArrayLength(len(r.Topics))
  12. if err != nil {
  13. return err
  14. }
  15. for i := range r.Topics {
  16. err = pe.putString(r.Topics[i])
  17. if err != nil {
  18. return err
  19. }
  20. }
  21. } else {
  22. pe.putInt32(-1)
  23. }
  24. return nil
  25. }
  26. func (r *MetadataRequest) decode(pd packetDecoder, version int16) error {
  27. r.Version = version
  28. size, err := pd.getInt32()
  29. if err != nil {
  30. return err
  31. }
  32. if size < 0 {
  33. return nil
  34. } else {
  35. topicCount := size
  36. if topicCount == 0 {
  37. return nil
  38. }
  39. r.Topics = make([]string, topicCount)
  40. for i := range r.Topics {
  41. topic, err := pd.getString()
  42. if err != nil {
  43. return err
  44. }
  45. r.Topics[i] = topic
  46. }
  47. return nil
  48. }
  49. }
  50. func (r *MetadataRequest) key() int16 {
  51. return 3
  52. }
  53. func (r *MetadataRequest) version() int16 {
  54. return r.Version
  55. }
  56. func (r *MetadataRequest) requiredVersion() KafkaVersion {
  57. switch r.Version {
  58. case 1:
  59. return V0_10_0_0
  60. default:
  61. return MinVersion
  62. }
  63. }