records.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. package sarama
  2. import "fmt"
  3. const (
  4. unknownRecords = iota
  5. legacyRecords
  6. defaultRecords
  7. magicOffset = 16
  8. magicLength = 1
  9. )
  10. // Records implements a union type containing either a RecordBatch or a legacy MessageSet.
  11. type Records struct {
  12. recordsType int
  13. MsgSet *MessageSet
  14. RecordBatch *RecordBatch
  15. }
  16. func newLegacyRecords(msgSet *MessageSet) Records {
  17. return Records{recordsType: legacyRecords, MsgSet: msgSet}
  18. }
  19. func newDefaultRecords(batch *RecordBatch) Records {
  20. return Records{recordsType: defaultRecords, RecordBatch: batch}
  21. }
  22. func newControlRecords(cr ControlRecord) Records {
  23. //TODO
  24. return Records{}
  25. }
  26. // setTypeFromFields sets type of Records depending on which of MsgSet or RecordBatch is not nil.
  27. // The first return value indicates whether both fields are nil (and the type is not set).
  28. // If both fields are not nil, it returns an error.
  29. func (r *Records) setTypeFromFields() (bool, error) {
  30. if r.MsgSet == nil && r.RecordBatch == nil {
  31. return true, nil
  32. }
  33. if r.MsgSet != nil && r.RecordBatch != nil {
  34. return false, fmt.Errorf("both MsgSet and RecordBatch are set, but record type is unknown")
  35. }
  36. r.recordsType = defaultRecords
  37. if r.MsgSet != nil {
  38. r.recordsType = legacyRecords
  39. }
  40. return false, nil
  41. }
  42. func (r *Records) encode(pe packetEncoder) error {
  43. if r.recordsType == unknownRecords {
  44. if empty, err := r.setTypeFromFields(); err != nil || empty {
  45. return err
  46. }
  47. }
  48. switch r.recordsType {
  49. case legacyRecords:
  50. if r.MsgSet == nil {
  51. return nil
  52. }
  53. return r.MsgSet.encode(pe)
  54. case defaultRecords:
  55. if r.RecordBatch == nil {
  56. return nil
  57. }
  58. return r.RecordBatch.encode(pe)
  59. }
  60. return fmt.Errorf("unknown records type: %v", r.recordsType)
  61. }
  62. func (r *Records) setTypeFromMagic(pd packetDecoder) error {
  63. magic, err := magicValue(pd)
  64. if err != nil {
  65. return err
  66. }
  67. r.recordsType = defaultRecords
  68. if magic < 2 {
  69. r.recordsType = legacyRecords
  70. }
  71. return nil
  72. }
  73. func (r *Records) decode(pd packetDecoder) error {
  74. if r.recordsType == unknownRecords {
  75. if err := r.setTypeFromMagic(pd); err != nil {
  76. return err
  77. }
  78. }
  79. switch r.recordsType {
  80. case legacyRecords:
  81. r.MsgSet = &MessageSet{}
  82. return r.MsgSet.decode(pd)
  83. case defaultRecords:
  84. r.RecordBatch = &RecordBatch{}
  85. return r.RecordBatch.decode(pd)
  86. }
  87. return fmt.Errorf("unknown records type: %v", r.recordsType)
  88. }
  89. func (r *Records) numRecords() (int, error) {
  90. if r.recordsType == unknownRecords {
  91. if empty, err := r.setTypeFromFields(); err != nil || empty {
  92. return 0, err
  93. }
  94. }
  95. switch r.recordsType {
  96. case legacyRecords:
  97. if r.MsgSet == nil {
  98. return 0, nil
  99. }
  100. return len(r.MsgSet.Messages), nil
  101. case defaultRecords:
  102. if r.RecordBatch == nil {
  103. return 0, nil
  104. }
  105. return len(r.RecordBatch.Records), nil
  106. }
  107. return 0, fmt.Errorf("unknown records type: %v", r.recordsType)
  108. }
  109. func (r *Records) isPartial() (bool, error) {
  110. if r.recordsType == unknownRecords {
  111. if empty, err := r.setTypeFromFields(); err != nil || empty {
  112. return false, err
  113. }
  114. }
  115. switch r.recordsType {
  116. case unknownRecords:
  117. return false, nil
  118. case legacyRecords:
  119. if r.MsgSet == nil {
  120. return false, nil
  121. }
  122. return r.MsgSet.PartialTrailingMessage, nil
  123. case defaultRecords:
  124. if r.RecordBatch == nil {
  125. return false, nil
  126. }
  127. return r.RecordBatch.PartialTrailingRecord, nil
  128. }
  129. return false, fmt.Errorf("unknown records type: %v", r.recordsType)
  130. }
  131. func (r *Records) isControl() (bool, error) {
  132. //TODO i guess there is a type that we should support properly
  133. if r.recordsType == unknownRecords {
  134. if empty, err := r.setTypeFromFields(); err != nil || empty {
  135. return false, err
  136. }
  137. }
  138. switch r.recordsType {
  139. case legacyRecords:
  140. return false, nil
  141. case defaultRecords:
  142. if r.RecordBatch == nil {
  143. return false, nil
  144. }
  145. return r.RecordBatch.Control, nil
  146. }
  147. return false, fmt.Errorf("unknown records type: %v", r.recordsType)
  148. }
  149. func (r *Records) isOverflow() (bool, error) {
  150. if r.recordsType == unknownRecords {
  151. if empty, err := r.setTypeFromFields(); err != nil || empty {
  152. return false, err
  153. }
  154. }
  155. switch r.recordsType {
  156. case unknownRecords:
  157. return false, nil
  158. case legacyRecords:
  159. if r.MsgSet == nil {
  160. return false, nil
  161. }
  162. return r.MsgSet.OverflowMessage, nil
  163. case defaultRecords:
  164. return false, nil
  165. }
  166. return false, fmt.Errorf("unknown records type: %v", r.recordsType)
  167. }
  168. func magicValue(pd packetDecoder) (int8, error) {
  169. dec, err := pd.peek(magicOffset, magicLength)
  170. if err != nil {
  171. return 0, err
  172. }
  173. return dec.getInt8()
  174. }
  175. func (r *Records) getControlRecord() (ControlRecord, error) {
  176. if r.RecordBatch == nil || len(r.RecordBatch.Records) <= 0 {
  177. return ControlRecord{}, fmt.Errorf("cannot get control record, record batch is empty")
  178. }
  179. firstRecord := r.RecordBatch.Records[0]
  180. controlRecord := ControlRecord{}
  181. err := controlRecord.decode(&realDecoder{raw: firstRecord.Key}, &realDecoder{raw: firstRecord.Value})
  182. if err != nil {
  183. return ControlRecord{}, err
  184. }
  185. return controlRecord, nil
  186. }