consumer_metadata_response.go 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. package sarama
  2. import (
  3. "net"
  4. "strconv"
  5. )
  6. //ConsumerMetadataResponse holds the response for a consumer group meta data requests
  7. type ConsumerMetadataResponse struct {
  8. Err KError
  9. Coordinator *Broker
  10. CoordinatorID int32 // deprecated: use Coordinator.ID()
  11. CoordinatorHost string // deprecated: use Coordinator.Addr()
  12. CoordinatorPort int32 // deprecated: use Coordinator.Addr()
  13. }
  14. func (r *ConsumerMetadataResponse) decode(pd packetDecoder, version int16) (err error) {
  15. tmp := new(FindCoordinatorResponse)
  16. if err := tmp.decode(pd, version); err != nil {
  17. return err
  18. }
  19. r.Err = tmp.Err
  20. r.Coordinator = tmp.Coordinator
  21. if tmp.Coordinator == nil {
  22. return nil
  23. }
  24. // this can all go away in 2.0, but we have to fill in deprecated fields to maintain
  25. // backwards compatibility
  26. host, portstr, err := net.SplitHostPort(r.Coordinator.Addr())
  27. if err != nil {
  28. return err
  29. }
  30. port, err := strconv.ParseInt(portstr, 10, 32)
  31. if err != nil {
  32. return err
  33. }
  34. r.CoordinatorID = r.Coordinator.ID()
  35. r.CoordinatorHost = host
  36. r.CoordinatorPort = int32(port)
  37. return nil
  38. }
  39. func (r *ConsumerMetadataResponse) encode(pe packetEncoder) error {
  40. if r.Coordinator == nil {
  41. r.Coordinator = new(Broker)
  42. r.Coordinator.id = r.CoordinatorID
  43. r.Coordinator.addr = net.JoinHostPort(r.CoordinatorHost, strconv.Itoa(int(r.CoordinatorPort)))
  44. }
  45. tmp := &FindCoordinatorResponse{
  46. Version: 0,
  47. Err: r.Err,
  48. Coordinator: r.Coordinator,
  49. }
  50. if err := tmp.encode(pe); err != nil {
  51. return err
  52. }
  53. return nil
  54. }
  55. func (r *ConsumerMetadataResponse) key() int16 {
  56. return 10
  57. }
  58. func (r *ConsumerMetadataResponse) version() int16 {
  59. return 0
  60. }
  61. func (r *ConsumerMetadataResponse) requiredVersion() KafkaVersion {
  62. return V0_8_2_0
  63. }