records.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. package sarama
  2. import "fmt"
  3. const (
  4. unknownRecords = iota
  5. legacyRecords
  6. defaultRecords
  7. magicOffset = 16
  8. magicLength = 1
  9. )
  10. // Records implements a union type containing either a RecordBatch or a legacy MessageSet.
  11. type Records struct {
  12. recordsType int
  13. msgSet *MessageSet
  14. recordBatch *RecordBatch
  15. }
  16. func newLegacyRecords(msgSet *MessageSet) Records {
  17. return Records{recordsType: legacyRecords, msgSet: msgSet}
  18. }
  19. func newDefaultRecords(batch *RecordBatch) Records {
  20. return Records{recordsType: defaultRecords, recordBatch: batch}
  21. }
  22. // setTypeFromFields sets type of Records depending on which of msgSet or recordBatch is not nil.
  23. // The first return value indicates whether both fields are nil (and the type is not set).
  24. // If both fields are not nil, it returns an error.
  25. func (r *Records) setTypeFromFields() (bool, error) {
  26. if r.msgSet == nil && r.recordBatch == nil {
  27. return true, nil
  28. }
  29. if r.msgSet != nil && r.recordBatch != nil {
  30. return false, fmt.Errorf("both msgSet and recordBatch are set, but record type is unknown")
  31. }
  32. r.recordsType = defaultRecords
  33. if r.msgSet != nil {
  34. r.recordsType = legacyRecords
  35. }
  36. return false, nil
  37. }
  38. func (r *Records) encode(pe packetEncoder) error {
  39. if r.recordsType == unknownRecords {
  40. if empty, err := r.setTypeFromFields(); err != nil || empty {
  41. return err
  42. }
  43. }
  44. switch r.recordsType {
  45. case legacyRecords:
  46. if r.msgSet == nil {
  47. return nil
  48. }
  49. return r.msgSet.encode(pe)
  50. case defaultRecords:
  51. if r.recordBatch == nil {
  52. return nil
  53. }
  54. return r.recordBatch.encode(pe)
  55. }
  56. return fmt.Errorf("unknown records type: %v", r.recordsType)
  57. }
  58. func (r *Records) setTypeFromMagic(pd packetDecoder) error {
  59. dec, err := pd.peek(magicOffset, magicLength)
  60. if err != nil {
  61. return err
  62. }
  63. magic, err := dec.getInt8()
  64. if err != nil {
  65. return err
  66. }
  67. r.recordsType = defaultRecords
  68. if magic < 2 {
  69. r.recordsType = legacyRecords
  70. }
  71. return nil
  72. }
  73. func (r *Records) decode(pd packetDecoder) error {
  74. if r.recordsType == unknownRecords {
  75. if err := r.setTypeFromMagic(pd); err != nil {
  76. return nil
  77. }
  78. }
  79. switch r.recordsType {
  80. case legacyRecords:
  81. r.msgSet = &MessageSet{}
  82. return r.msgSet.decode(pd)
  83. case defaultRecords:
  84. r.recordBatch = &RecordBatch{}
  85. return r.recordBatch.decode(pd)
  86. }
  87. return fmt.Errorf("unknown records type: %v", r.recordsType)
  88. }
  89. func (r *Records) numRecords() (int, error) {
  90. if r.recordsType == unknownRecords {
  91. if empty, err := r.setTypeFromFields(); err != nil || empty {
  92. return 0, err
  93. }
  94. }
  95. switch r.recordsType {
  96. case legacyRecords:
  97. if r.msgSet == nil {
  98. return 0, nil
  99. }
  100. return len(r.msgSet.Messages), nil
  101. case defaultRecords:
  102. if r.recordBatch == nil {
  103. return 0, nil
  104. }
  105. return len(r.recordBatch.Records), nil
  106. }
  107. return 0, fmt.Errorf("unknown records type: %v", r.recordsType)
  108. }
  109. func (r *Records) isPartial() (bool, error) {
  110. if r.recordsType == unknownRecords {
  111. if empty, err := r.setTypeFromFields(); err != nil || empty {
  112. return false, err
  113. }
  114. }
  115. switch r.recordsType {
  116. case unknownRecords:
  117. return false, nil
  118. case legacyRecords:
  119. if r.msgSet == nil {
  120. return false, nil
  121. }
  122. return r.msgSet.PartialTrailingMessage, nil
  123. case defaultRecords:
  124. if r.recordBatch == nil {
  125. return false, nil
  126. }
  127. return r.recordBatch.PartialTrailingRecord, nil
  128. }
  129. return false, fmt.Errorf("unknown records type: %v", r.recordsType)
  130. }
  131. func (r *Records) isControl() (bool, error) {
  132. if r.recordsType == unknownRecords {
  133. if empty, err := r.setTypeFromFields(); err != nil || empty {
  134. return false, err
  135. }
  136. }
  137. switch r.recordsType {
  138. case legacyRecords:
  139. return false, nil
  140. case defaultRecords:
  141. if r.recordBatch == nil {
  142. return false, nil
  143. }
  144. return r.recordBatch.Control, nil
  145. }
  146. return false, fmt.Errorf("unknown records type: %v", r.recordsType)
  147. }