consumer_metadata_response.go 1.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. package sarama
  2. import (
  3. "net"
  4. "strconv"
  5. )
  6. type ConsumerMetadataResponse struct {
  7. Err KError
  8. Coordinator *Broker
  9. CoordinatorID int32 // deprecated: use Coordinator.ID()
  10. CoordinatorHost string // deprecated: use Coordinator.Addr()
  11. CoordinatorPort int32 // deprecated: use Coordinator.Addr()
  12. }
  13. func (r *ConsumerMetadataResponse) decode(pd packetDecoder) (err error) {
  14. tmp, err := pd.getInt16()
  15. if err != nil {
  16. return err
  17. }
  18. r.Err = KError(tmp)
  19. r.Coordinator = new(Broker)
  20. if err := r.Coordinator.decode(pd); err != nil {
  21. return err
  22. }
  23. // this can all go away in 2.0, but we have to fill in deprecated fields to maintain
  24. // backwards compatibility
  25. host, portstr, err := net.SplitHostPort(r.Coordinator.Addr())
  26. if err != nil {
  27. return err
  28. }
  29. port, err := strconv.ParseInt(portstr, 10, 32)
  30. if err != nil {
  31. return err
  32. }
  33. r.CoordinatorID = r.Coordinator.ID()
  34. r.CoordinatorHost = host
  35. r.CoordinatorPort = int32(port)
  36. return nil
  37. }
  38. func (r *ConsumerMetadataResponse) encode(pe packetEncoder) error {
  39. pe.putInt16(int16(r.Err))
  40. pe.putInt32(r.CoordinatorID)
  41. if err := pe.putString(r.CoordinatorHost); err != nil {
  42. return err
  43. }
  44. pe.putInt32(r.CoordinatorPort)
  45. return nil
  46. }