find_coordinator_response.go 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. package sarama
  2. import (
  3. "time"
  4. )
  5. var NoNode = &Broker{id: -1, addr: ":-1"}
  6. type FindCoordinatorResponse struct {
  7. Version int16
  8. ThrottleTime time.Duration
  9. Err KError
  10. ErrMsg *string
  11. Coordinator *Broker
  12. }
  13. func (f *FindCoordinatorResponse) decode(pd packetDecoder, version int16) (err error) {
  14. if version >= 1 {
  15. f.Version = version
  16. throttleTime, err := pd.getInt32()
  17. if err != nil {
  18. return err
  19. }
  20. f.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
  21. }
  22. tmp, err := pd.getInt16()
  23. if err != nil {
  24. return err
  25. }
  26. f.Err = KError(tmp)
  27. if version >= 1 {
  28. if f.ErrMsg, err = pd.getNullableString(); err != nil {
  29. return err
  30. }
  31. }
  32. coordinator := new(Broker)
  33. // The version is hardcoded to 0, as version 1 of the Broker-decode
  34. // contains the rack-field which is not present in the FindCoordinatorResponse.
  35. if err := coordinator.decode(pd, 0); err != nil {
  36. return err
  37. }
  38. if coordinator.addr == ":0" {
  39. return nil
  40. }
  41. f.Coordinator = coordinator
  42. return nil
  43. }
  44. func (f *FindCoordinatorResponse) encode(pe packetEncoder) error {
  45. if f.Version >= 1 {
  46. pe.putInt32(int32(f.ThrottleTime / time.Millisecond))
  47. }
  48. pe.putInt16(int16(f.Err))
  49. if f.Version >= 1 {
  50. if err := pe.putNullableString(f.ErrMsg); err != nil {
  51. return err
  52. }
  53. }
  54. coordinator := f.Coordinator
  55. if coordinator == nil {
  56. coordinator = NoNode
  57. }
  58. if err := coordinator.encode(pe, 0); err != nil {
  59. return err
  60. }
  61. return nil
  62. }
  63. func (f *FindCoordinatorResponse) key() int16 {
  64. return 10
  65. }
  66. func (f *FindCoordinatorResponse) version() int16 {
  67. return f.Version
  68. }
  69. func (f *FindCoordinatorResponse) requiredVersion() KafkaVersion {
  70. switch f.Version {
  71. case 1:
  72. return V0_11_0_0
  73. default:
  74. return V0_8_2_0
  75. }
  76. }