records.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. package sarama
  2. import "fmt"
  3. const (
  4. legacyRecords = iota
  5. defaultRecords
  6. )
  7. // Records implements a union type containing either a RecordBatch or a legacy MessageSet.
  8. type Records struct {
  9. recordsType int
  10. msgSet *MessageSet
  11. recordBatch *RecordBatch
  12. }
  13. func newLegacyRecords(msgSet *MessageSet) Records {
  14. return Records{recordsType: legacyRecords, msgSet: msgSet}
  15. }
  16. func newDefaultRecords(batch *RecordBatch) Records {
  17. return Records{recordsType: defaultRecords, recordBatch: batch}
  18. }
  19. func (r *Records) encode(pe packetEncoder) error {
  20. switch r.recordsType {
  21. case legacyRecords:
  22. if r.msgSet == nil {
  23. return nil
  24. }
  25. return r.msgSet.encode(pe)
  26. case defaultRecords:
  27. if r.recordBatch == nil {
  28. return nil
  29. }
  30. return r.recordBatch.encode(pe)
  31. }
  32. return fmt.Errorf("unknown records type: %v", r.recordsType)
  33. }
  34. func (r *Records) decode(pd packetDecoder) error {
  35. switch r.recordsType {
  36. case legacyRecords:
  37. r.msgSet = &MessageSet{}
  38. return r.msgSet.decode(pd)
  39. case defaultRecords:
  40. r.recordBatch = &RecordBatch{}
  41. return r.recordBatch.decode(pd)
  42. }
  43. return fmt.Errorf("unknown records type: %v", r.recordsType)
  44. }
  45. func (r *Records) numRecords() (int, error) {
  46. switch r.recordsType {
  47. case legacyRecords:
  48. if r.msgSet == nil {
  49. return 0, nil
  50. }
  51. return len(r.msgSet.Messages), nil
  52. case defaultRecords:
  53. if r.recordBatch == nil {
  54. return 0, nil
  55. }
  56. return len(r.recordBatch.Records), nil
  57. }
  58. return 0, fmt.Errorf("unknown records type: %v", r.recordsType)
  59. }
  60. func (r *Records) isPartial() (bool, error) {
  61. switch r.recordsType {
  62. case legacyRecords:
  63. if r.msgSet == nil {
  64. return false, nil
  65. }
  66. return r.msgSet.PartialTrailingMessage, nil
  67. case defaultRecords:
  68. if r.recordBatch == nil {
  69. return false, nil
  70. }
  71. return r.recordBatch.PartialTrailingRecord, nil
  72. }
  73. return false, fmt.Errorf("unknown records type: %v", r.recordsType)
  74. }
  75. func (r *Records) isControl() (bool, error) {
  76. switch r.recordsType {
  77. case legacyRecords:
  78. return false, nil
  79. case defaultRecords:
  80. if r.recordBatch == nil {
  81. return false, nil
  82. }
  83. return r.recordBatch.Control, nil
  84. }
  85. return false, fmt.Errorf("unknown records type: %v", r.recordsType)
  86. }