add_offsets_to_txn_request.go 1.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. package sarama
  2. //AddOffsetsToTxnRequest adds offsets to a transaction request
  3. type AddOffsetsToTxnRequest struct {
  4. TransactionalID string
  5. ProducerID int64
  6. ProducerEpoch int16
  7. GroupID string
  8. }
  9. func (a *AddOffsetsToTxnRequest) encode(pe packetEncoder) error {
  10. if err := pe.putString(a.TransactionalID); err != nil {
  11. return err
  12. }
  13. pe.putInt64(a.ProducerID)
  14. pe.putInt16(a.ProducerEpoch)
  15. if err := pe.putString(a.GroupID); err != nil {
  16. return err
  17. }
  18. return nil
  19. }
  20. func (a *AddOffsetsToTxnRequest) decode(pd packetDecoder, version int16) (err error) {
  21. if a.TransactionalID, err = pd.getString(); err != nil {
  22. return err
  23. }
  24. if a.ProducerID, err = pd.getInt64(); err != nil {
  25. return err
  26. }
  27. if a.ProducerEpoch, err = pd.getInt16(); err != nil {
  28. return err
  29. }
  30. if a.GroupID, err = pd.getString(); err != nil {
  31. return err
  32. }
  33. return nil
  34. }
  35. func (a *AddOffsetsToTxnRequest) key() int16 {
  36. return 25
  37. }
  38. func (a *AddOffsetsToTxnRequest) version() int16 {
  39. return 0
  40. }
  41. func (a *AddOffsetsToTxnRequest) headerVersion() int16 {
  42. return 1
  43. }
  44. func (a *AddOffsetsToTxnRequest) requiredVersion() KafkaVersion {
  45. return V0_11_0_0
  46. }