find_coordinator_response.go 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. package sarama
  2. import (
  3. "time"
  4. )
  5. var NoNode = &Broker{id: -1, addr: ":-1"}
  6. type FindCoordinatorResponse struct {
  7. Version int16
  8. ThrottleTime time.Duration
  9. Err KError
  10. ErrMsg *string
  11. Coordinator *Broker
  12. }
  13. func (f *FindCoordinatorResponse) decode(pd packetDecoder, version int16) (err error) {
  14. if version >= 1 {
  15. f.Version = version
  16. throttleTime, err := pd.getInt32()
  17. if err != nil {
  18. return err
  19. }
  20. f.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
  21. }
  22. tmp, err := pd.getInt16()
  23. if err != nil {
  24. return err
  25. }
  26. f.Err = KError(tmp)
  27. if version >= 1 {
  28. if f.ErrMsg, err = pd.getNullableString(); err != nil {
  29. return err
  30. }
  31. }
  32. coordinator := new(Broker)
  33. if err := coordinator.decode(pd, version); err != nil {
  34. return err
  35. }
  36. if coordinator.addr == ":0" {
  37. return nil
  38. }
  39. f.Coordinator = coordinator
  40. return nil
  41. }
  42. func (f *FindCoordinatorResponse) encode(pe packetEncoder) error {
  43. if f.Version >= 1 {
  44. pe.putInt32(int32(f.ThrottleTime / time.Millisecond))
  45. }
  46. pe.putInt16(int16(f.Err))
  47. if f.Version >= 1 {
  48. if err := pe.putNullableString(f.ErrMsg); err != nil {
  49. return err
  50. }
  51. }
  52. coordinator := f.Coordinator
  53. if coordinator == nil {
  54. coordinator = NoNode
  55. }
  56. if err := coordinator.encode(pe, f.Version); err != nil {
  57. return err
  58. }
  59. return nil
  60. }
  61. func (f *FindCoordinatorResponse) key() int16 {
  62. return 10
  63. }
  64. func (f *FindCoordinatorResponse) version() int16 {
  65. return f.Version
  66. }
  67. func (f *FindCoordinatorResponse) requiredVersion() KafkaVersion {
  68. switch f.Version {
  69. case 1:
  70. return V0_11_0_0
  71. default:
  72. return V0_8_2_0
  73. }
  74. }