init_producer_id_response.go 1.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
  1. package sarama
  2. import "time"
  3. type InitProducerIDResponse struct {
  4. ThrottleTime time.Duration
  5. Err KError
  6. ProducerID int64
  7. ProducerEpoch int16
  8. }
  9. func (i *InitProducerIDResponse) encode(pe packetEncoder) error {
  10. pe.putInt32(int32(i.ThrottleTime / time.Millisecond))
  11. pe.putInt16(int16(i.Err))
  12. pe.putInt64(i.ProducerID)
  13. pe.putInt16(i.ProducerEpoch)
  14. return nil
  15. }
  16. func (i *InitProducerIDResponse) decode(pd packetDecoder, version int16) (err error) {
  17. throttleTime, err := pd.getInt32()
  18. if err != nil {
  19. return err
  20. }
  21. i.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
  22. kerr, err := pd.getInt16()
  23. if err != nil {
  24. return err
  25. }
  26. i.Err = KError(kerr)
  27. if i.ProducerID, err = pd.getInt64(); err != nil {
  28. return err
  29. }
  30. if i.ProducerEpoch, err = pd.getInt16(); err != nil {
  31. return err
  32. }
  33. return nil
  34. }
  35. func (i *InitProducerIDResponse) key() int16 {
  36. return 22
  37. }
  38. func (i *InitProducerIDResponse) version() int16 {
  39. return 0
  40. }
  41. func (i *InitProducerIDResponse) headerVersion() int16 {
  42. return 0
  43. }
  44. func (i *InitProducerIDResponse) requiredVersion() KafkaVersion {
  45. return V0_11_0_0
  46. }