consumer_metadata_response.go 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. package sarama
  2. import (
  3. "net"
  4. "strconv"
  5. )
  6. type ConsumerMetadataResponse struct {
  7. Err KError
  8. Coordinator *Broker
  9. CoordinatorID int32 // deprecated: use Coordinator.ID()
  10. CoordinatorHost string // deprecated: use Coordinator.Addr()
  11. CoordinatorPort int32 // deprecated: use Coordinator.Addr()
  12. }
  13. func (r *ConsumerMetadataResponse) decode(pd packetDecoder, version int16) (err error) {
  14. tmp := new(FindCoordinatorResponse)
  15. if err := tmp.decode(pd, version); err != nil {
  16. return err
  17. }
  18. r.Err = tmp.Err
  19. r.Coordinator = tmp.Coordinator
  20. if tmp.Coordinator == nil {
  21. return nil
  22. }
  23. // this can all go away in 2.0, but we have to fill in deprecated fields to maintain
  24. // backwards compatibility
  25. host, portstr, err := net.SplitHostPort(r.Coordinator.Addr())
  26. if err != nil {
  27. return err
  28. }
  29. port, err := strconv.ParseInt(portstr, 10, 32)
  30. if err != nil {
  31. return err
  32. }
  33. r.CoordinatorID = r.Coordinator.ID()
  34. r.CoordinatorHost = host
  35. r.CoordinatorPort = int32(port)
  36. return nil
  37. }
  38. func (r *ConsumerMetadataResponse) encode(pe packetEncoder) error {
  39. if r.Coordinator == nil {
  40. r.Coordinator = new(Broker)
  41. r.Coordinator.id = r.CoordinatorID
  42. r.Coordinator.addr = net.JoinHostPort(r.CoordinatorHost, strconv.Itoa(int(r.CoordinatorPort)))
  43. }
  44. tmp := &FindCoordinatorResponse{
  45. Version: 0,
  46. Err: r.Err,
  47. Coordinator: r.Coordinator,
  48. }
  49. if err := tmp.encode(pe); err != nil {
  50. return err
  51. }
  52. return nil
  53. }
  54. func (r *ConsumerMetadataResponse) key() int16 {
  55. return 10
  56. }
  57. func (r *ConsumerMetadataResponse) version() int16 {
  58. return 0
  59. }
  60. func (r *ConsumerMetadataResponse) requiredVersion() KafkaVersion {
  61. return V0_8_2_0
  62. }