create_partitions_response.go 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. package sarama
  2. import "time"
  3. type CreatePartitionsResponse struct {
  4. ThrottleTime time.Duration
  5. TopicPartitionErrors map[string]*TopicPartitionError
  6. }
  7. func (c *CreatePartitionsResponse) encode(pe packetEncoder) error {
  8. pe.putInt32(int32(c.ThrottleTime / time.Millisecond))
  9. if err := pe.putArrayLength(len(c.TopicPartitionErrors)); err != nil {
  10. return err
  11. }
  12. for topic, partitionError := range c.TopicPartitionErrors {
  13. if err := pe.putString(topic); err != nil {
  14. return err
  15. }
  16. if err := partitionError.encode(pe); err != nil {
  17. return err
  18. }
  19. }
  20. return nil
  21. }
  22. func (c *CreatePartitionsResponse) decode(pd packetDecoder, version int16) (err error) {
  23. throttleTime, err := pd.getInt32()
  24. if err != nil {
  25. return err
  26. }
  27. c.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
  28. n, err := pd.getArrayLength()
  29. if err != nil {
  30. return err
  31. }
  32. c.TopicPartitionErrors = make(map[string]*TopicPartitionError, n)
  33. for i := 0; i < n; i++ {
  34. topic, err := pd.getString()
  35. if err != nil {
  36. return err
  37. }
  38. c.TopicPartitionErrors[topic] = new(TopicPartitionError)
  39. if err := c.TopicPartitionErrors[topic].decode(pd, version); err != nil {
  40. return err
  41. }
  42. }
  43. return nil
  44. }
  45. func (r *CreatePartitionsResponse) key() int16 {
  46. return 37
  47. }
  48. func (r *CreatePartitionsResponse) version() int16 {
  49. return 0
  50. }
  51. func (r *CreatePartitionsResponse) requiredVersion() KafkaVersion {
  52. return V1_0_0_0
  53. }
  54. type TopicPartitionError struct {
  55. Err KError
  56. ErrMsg *string
  57. }
  58. func (t *TopicPartitionError) encode(pe packetEncoder) error {
  59. pe.putInt16(int16(t.Err))
  60. if err := pe.putNullableString(t.ErrMsg); err != nil {
  61. return err
  62. }
  63. return nil
  64. }
  65. func (t *TopicPartitionError) decode(pd packetDecoder, version int16) (err error) {
  66. kerr, err := pd.getInt16()
  67. if err != nil {
  68. return err
  69. }
  70. t.Err = KError(kerr)
  71. if t.ErrMsg, err = pd.getNullableString(); err != nil {
  72. return err
  73. }
  74. return nil
  75. }