find_coordinator_request.go 1.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. package sarama
  2. type CoordinatorType int8
  3. const (
  4. CoordinatorGroup CoordinatorType = iota
  5. CoordinatorTransaction
  6. )
  7. type FindCoordinatorRequest struct {
  8. Version int16
  9. CoordinatorKey string
  10. CoordinatorType CoordinatorType
  11. }
  12. func (f *FindCoordinatorRequest) encode(pe packetEncoder) error {
  13. if err := pe.putString(f.CoordinatorKey); err != nil {
  14. return err
  15. }
  16. if f.Version >= 1 {
  17. pe.putInt8(int8(f.CoordinatorType))
  18. }
  19. return nil
  20. }
  21. func (f *FindCoordinatorRequest) decode(pd packetDecoder, version int16) (err error) {
  22. if f.CoordinatorKey, err = pd.getString(); err != nil {
  23. return err
  24. }
  25. if version >= 1 {
  26. f.Version = version
  27. coordinatorType, err := pd.getInt8()
  28. if err != nil {
  29. return err
  30. }
  31. f.CoordinatorType = CoordinatorType(coordinatorType)
  32. }
  33. return nil
  34. }
  35. func (f *FindCoordinatorRequest) key() int16 {
  36. return 10
  37. }
  38. func (f *FindCoordinatorRequest) version() int16 {
  39. return f.Version
  40. }
  41. func (r *FindCoordinatorRequest) headerVersion() int16 {
  42. return 1
  43. }
  44. func (f *FindCoordinatorRequest) requiredVersion() KafkaVersion {
  45. switch f.Version {
  46. case 1:
  47. return V0_11_0_0
  48. default:
  49. return V0_8_2_0
  50. }
  51. }