delete_topics_response.go 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. package sarama
  2. import "time"
  3. type DeleteTopicsResponse struct {
  4. Version int16
  5. ThrottleTime time.Duration
  6. TopicErrorCodes map[string]KError
  7. }
  8. func (d *DeleteTopicsResponse) encode(pe packetEncoder) error {
  9. if d.Version >= 1 {
  10. pe.putInt32(int32(d.ThrottleTime / time.Millisecond))
  11. }
  12. if err := pe.putArrayLength(len(d.TopicErrorCodes)); err != nil {
  13. return err
  14. }
  15. for topic, errorCode := range d.TopicErrorCodes {
  16. if err := pe.putString(topic); err != nil {
  17. return err
  18. }
  19. pe.putInt16(int16(errorCode))
  20. }
  21. return nil
  22. }
  23. func (d *DeleteTopicsResponse) decode(pd packetDecoder, version int16) (err error) {
  24. if version >= 1 {
  25. throttleTime, err := pd.getInt32()
  26. if err != nil {
  27. return err
  28. }
  29. d.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
  30. d.Version = version
  31. }
  32. n, err := pd.getArrayLength()
  33. if err != nil {
  34. return err
  35. }
  36. d.TopicErrorCodes = make(map[string]KError, n)
  37. for i := 0; i < n; i++ {
  38. topic, err := pd.getString()
  39. if err != nil {
  40. return err
  41. }
  42. errorCode, err := pd.getInt16()
  43. if err != nil {
  44. return err
  45. }
  46. d.TopicErrorCodes[topic] = KError(errorCode)
  47. }
  48. return nil
  49. }
  50. func (d *DeleteTopicsResponse) key() int16 {
  51. return 20
  52. }
  53. func (d *DeleteTopicsResponse) version() int16 {
  54. return d.Version
  55. }
  56. func (d *DeleteTopicsResponse) requiredVersion() KafkaVersion {
  57. switch d.Version {
  58. case 1:
  59. return V0_11_0_0
  60. default:
  61. return V0_10_1_0
  62. }
  63. }