txn_offset_commit_response.go 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. package sarama
  2. import (
  3. "time"
  4. )
  5. type TxnOffsetCommitResponse struct {
  6. ThrottleTime time.Duration
  7. Topics map[string][]*PartitionError
  8. }
  9. func (t *TxnOffsetCommitResponse) encode(pe packetEncoder) error {
  10. pe.putInt32(int32(t.ThrottleTime / time.Millisecond))
  11. if err := pe.putArrayLength(len(t.Topics)); err != nil {
  12. return err
  13. }
  14. for topic, e := range t.Topics {
  15. if err := pe.putString(topic); err != nil {
  16. return err
  17. }
  18. if err := pe.putArrayLength(len(e)); err != nil {
  19. return err
  20. }
  21. for _, partitionError := range e {
  22. if err := partitionError.encode(pe); err != nil {
  23. return err
  24. }
  25. }
  26. }
  27. return nil
  28. }
  29. func (t *TxnOffsetCommitResponse) decode(pd packetDecoder, version int16) (err error) {
  30. throttleTime, err := pd.getInt32()
  31. if err != nil {
  32. return err
  33. }
  34. t.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
  35. n, err := pd.getArrayLength()
  36. if err != nil {
  37. return err
  38. }
  39. t.Topics = make(map[string][]*PartitionError)
  40. for i := 0; i < n; i++ {
  41. topic, err := pd.getString()
  42. if err != nil {
  43. return err
  44. }
  45. m, err := pd.getArrayLength()
  46. if err != nil {
  47. return err
  48. }
  49. t.Topics[topic] = make([]*PartitionError, m)
  50. for j := 0; j < m; j++ {
  51. t.Topics[topic][j] = new(PartitionError)
  52. if err := t.Topics[topic][j].decode(pd, version); err != nil {
  53. return err
  54. }
  55. }
  56. }
  57. return nil
  58. }
  59. func (a *TxnOffsetCommitResponse) key() int16 {
  60. return 28
  61. }
  62. func (a *TxnOffsetCommitResponse) version() int16 {
  63. return 0
  64. }
  65. func (a *TxnOffsetCommitResponse) headerVersion() int16 {
  66. return 0
  67. }
  68. func (a *TxnOffsetCommitResponse) requiredVersion() KafkaVersion {
  69. return V0_11_0_0
  70. }