consumer_metadata_response.go 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. package sarama
  2. import (
  3. "net"
  4. "strconv"
  5. )
  6. type ConsumerMetadataResponse struct {
  7. Err KError
  8. Coordinator *Broker
  9. CoordinatorID int32 // deprecated: use Coordinator.ID()
  10. CoordinatorHost string // deprecated: use Coordinator.Addr()
  11. CoordinatorPort int32 // deprecated: use Coordinator.Addr()
  12. }
  13. func (r *ConsumerMetadataResponse) decode(pd packetDecoder) (err error) {
  14. tmp, err := pd.getInt16()
  15. if err != nil {
  16. return err
  17. }
  18. r.Err = KError(tmp)
  19. coordinator := new(Broker)
  20. if err := coordinator.decode(pd); err != nil {
  21. return err
  22. }
  23. if coordinator.addr == ":0" {
  24. return nil
  25. }
  26. r.Coordinator = coordinator
  27. // this can all go away in 2.0, but we have to fill in deprecated fields to maintain
  28. // backwards compatibility
  29. host, portstr, err := net.SplitHostPort(r.Coordinator.Addr())
  30. if err != nil {
  31. return err
  32. }
  33. port, err := strconv.ParseInt(portstr, 10, 32)
  34. if err != nil {
  35. return err
  36. }
  37. r.CoordinatorID = r.Coordinator.ID()
  38. r.CoordinatorHost = host
  39. r.CoordinatorPort = int32(port)
  40. return nil
  41. }
  42. func (r *ConsumerMetadataResponse) encode(pe packetEncoder) error {
  43. pe.putInt16(int16(r.Err))
  44. if r.Coordinator != nil {
  45. host, portstr, err := net.SplitHostPort(r.Coordinator.Addr())
  46. if err != nil {
  47. return err
  48. }
  49. port, err := strconv.ParseInt(portstr, 10, 32)
  50. if err != nil {
  51. return err
  52. }
  53. pe.putInt32(r.Coordinator.ID())
  54. if err := pe.putString(host); err != nil {
  55. return err
  56. }
  57. pe.putInt32(int32(port))
  58. return nil
  59. }
  60. pe.putInt32(r.CoordinatorID)
  61. if err := pe.putString(r.CoordinatorHost); err != nil {
  62. return err
  63. }
  64. pe.putInt32(r.CoordinatorPort)
  65. return nil
  66. }