find_coordinator_response.go 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. package sarama
  2. import (
  3. "time"
  4. )
  5. var NoNode = &Broker{id: -1, addr: ":-1"}
  6. type FindCoordinatorResponse struct {
  7. Version int16
  8. ThrottleTime time.Duration
  9. Err KError
  10. ErrMsg *string
  11. Coordinator *Broker
  12. }
  13. func (f *FindCoordinatorResponse) decode(pd packetDecoder, version int16) (err error) {
  14. if version >= 1 {
  15. f.Version = version
  16. throttleTime, err := pd.getInt32()
  17. if err != nil {
  18. return err
  19. }
  20. f.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
  21. }
  22. tmp, err := pd.getInt16()
  23. if err != nil {
  24. return err
  25. }
  26. f.Err = KError(tmp)
  27. if version >= 1 {
  28. if f.ErrMsg, err = pd.getNullableString(); err != nil {
  29. return err
  30. }
  31. }
  32. coordinator := new(Broker)
  33. // The version is hardcoded to 0, as version 1 of the Broker-decode
  34. // contains the rack-field which is not present in the FindCoordinatorResponse.
  35. if err := coordinator.decode(pd, 0); err != nil {
  36. return err
  37. }
  38. if coordinator.addr == ":0" {
  39. return nil
  40. }
  41. f.Coordinator = coordinator
  42. return nil
  43. }
  44. func (f *FindCoordinatorResponse) encode(pe packetEncoder) error {
  45. if f.Version >= 1 {
  46. pe.putInt32(int32(f.ThrottleTime / time.Millisecond))
  47. }
  48. pe.putInt16(int16(f.Err))
  49. if f.Version >= 1 {
  50. if err := pe.putNullableString(f.ErrMsg); err != nil {
  51. return err
  52. }
  53. }
  54. coordinator := f.Coordinator
  55. if coordinator == nil {
  56. coordinator = NoNode
  57. }
  58. if err := coordinator.encode(pe, 0); err != nil {
  59. return err
  60. }
  61. return nil
  62. }
  63. func (f *FindCoordinatorResponse) key() int16 {
  64. return 10
  65. }
  66. func (f *FindCoordinatorResponse) version() int16 {
  67. return f.Version
  68. }
  69. func (r *FindCoordinatorResponse) headerVersion() int16 {
  70. return 0
  71. }
  72. func (f *FindCoordinatorResponse) requiredVersion() KafkaVersion {
  73. switch f.Version {
  74. case 1:
  75. return V0_11_0_0
  76. default:
  77. return V0_8_2_0
  78. }
  79. }