txn_offset_commit_request.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. package sarama
  2. type TxnOffsetCommitRequest struct {
  3. TransactionalID string
  4. GroupID string
  5. ProducerID int64
  6. ProducerEpoch int16
  7. Topics map[string][]*PartitionOffsetMetadata
  8. }
  9. func (t *TxnOffsetCommitRequest) encode(pe packetEncoder) error {
  10. if err := pe.putString(t.TransactionalID); err != nil {
  11. return err
  12. }
  13. if err := pe.putString(t.GroupID); err != nil {
  14. return err
  15. }
  16. pe.putInt64(t.ProducerID)
  17. pe.putInt16(t.ProducerEpoch)
  18. if err := pe.putArrayLength(len(t.Topics)); err != nil {
  19. return err
  20. }
  21. for topic, partitions := range t.Topics {
  22. if err := pe.putString(topic); err != nil {
  23. return err
  24. }
  25. if err := pe.putArrayLength(len(partitions)); err != nil {
  26. return err
  27. }
  28. for _, partition := range partitions {
  29. if err := partition.encode(pe); err != nil {
  30. return err
  31. }
  32. }
  33. }
  34. return nil
  35. }
  36. func (t *TxnOffsetCommitRequest) decode(pd packetDecoder, version int16) (err error) {
  37. if t.TransactionalID, err = pd.getString(); err != nil {
  38. return err
  39. }
  40. if t.GroupID, err = pd.getString(); err != nil {
  41. return err
  42. }
  43. if t.ProducerID, err = pd.getInt64(); err != nil {
  44. return err
  45. }
  46. if t.ProducerEpoch, err = pd.getInt16(); err != nil {
  47. return err
  48. }
  49. n, err := pd.getArrayLength()
  50. if err != nil {
  51. return err
  52. }
  53. t.Topics = make(map[string][]*PartitionOffsetMetadata)
  54. for i := 0; i < n; i++ {
  55. topic, err := pd.getString()
  56. if err != nil {
  57. return err
  58. }
  59. m, err := pd.getArrayLength()
  60. if err != nil {
  61. return err
  62. }
  63. t.Topics[topic] = make([]*PartitionOffsetMetadata, m)
  64. for j := 0; j < m; j++ {
  65. partitionOffsetMetadata := new(PartitionOffsetMetadata)
  66. if err := partitionOffsetMetadata.decode(pd, version); err != nil {
  67. return err
  68. }
  69. t.Topics[topic][j] = partitionOffsetMetadata
  70. }
  71. }
  72. return nil
  73. }
  74. func (a *TxnOffsetCommitRequest) key() int16 {
  75. return 28
  76. }
  77. func (a *TxnOffsetCommitRequest) version() int16 {
  78. return 0
  79. }
  80. func (a *TxnOffsetCommitRequest) headerVersion() int16 {
  81. return 1
  82. }
  83. func (a *TxnOffsetCommitRequest) requiredVersion() KafkaVersion {
  84. return V0_11_0_0
  85. }
  86. type PartitionOffsetMetadata struct {
  87. Partition int32
  88. Offset int64
  89. Metadata *string
  90. }
  91. func (p *PartitionOffsetMetadata) encode(pe packetEncoder) error {
  92. pe.putInt32(p.Partition)
  93. pe.putInt64(p.Offset)
  94. if err := pe.putNullableString(p.Metadata); err != nil {
  95. return err
  96. }
  97. return nil
  98. }
  99. func (p *PartitionOffsetMetadata) decode(pd packetDecoder, version int16) (err error) {
  100. if p.Partition, err = pd.getInt32(); err != nil {
  101. return err
  102. }
  103. if p.Offset, err = pd.getInt64(); err != nil {
  104. return err
  105. }
  106. if p.Metadata, err = pd.getNullableString(); err != nil {
  107. return err
  108. }
  109. return nil
  110. }