find_coordinator_request.go 1.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. package sarama
  2. type CoordinatorType int8
  3. const (
  4. CoordinatorGroup CoordinatorType = 0
  5. CoordinatorTransaction CoordinatorType = 1
  6. )
  7. type FindCoordinatorRequest struct {
  8. Version int16
  9. CoordinatorKey string
  10. CoordinatorType CoordinatorType
  11. }
  12. func (f *FindCoordinatorRequest) encode(pe packetEncoder) error {
  13. if err := pe.putString(f.CoordinatorKey); err != nil {
  14. return err
  15. }
  16. if f.Version >= 1 {
  17. pe.putInt8(int8(f.CoordinatorType))
  18. }
  19. return nil
  20. }
  21. func (f *FindCoordinatorRequest) decode(pd packetDecoder, version int16) (err error) {
  22. if f.CoordinatorKey, err = pd.getString(); err != nil {
  23. return err
  24. }
  25. if version >= 1 {
  26. f.Version = version
  27. coordinatorType, err := pd.getInt8()
  28. if err != nil {
  29. return err
  30. }
  31. f.CoordinatorType = CoordinatorType(coordinatorType)
  32. }
  33. return nil
  34. }
  35. func (f *FindCoordinatorRequest) key() int16 {
  36. return 10
  37. }
  38. func (f *FindCoordinatorRequest) version() int16 {
  39. return f.Version
  40. }
  41. func (f *FindCoordinatorRequest) requiredVersion() KafkaVersion {
  42. switch f.Version {
  43. case 1:
  44. return V0_11_0_0
  45. default:
  46. return V0_8_2_0
  47. }
  48. }