message_set.go 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. package kafka
  2. type MessageBlock struct {
  3. Offset int64
  4. Msg *Message
  5. }
  6. func (msb *MessageBlock) encode(pe packetEncoder) {
  7. pe.putInt64(msb.Offset)
  8. pe.pushLength32()
  9. msb.Msg.encode(pe)
  10. pe.pop()
  11. }
  12. func (msb *MessageBlock) decode(pd packetDecoder) (err error) {
  13. msb.Offset, err = pd.getInt64()
  14. if err != nil {
  15. return err
  16. }
  17. err = pd.pushLength32()
  18. if err != nil {
  19. return err
  20. }
  21. msb.Msg = new(Message)
  22. err = msb.Msg.decode(pd)
  23. if err != nil {
  24. return err
  25. }
  26. err = pd.pop()
  27. if err != nil {
  28. return err
  29. }
  30. return nil
  31. }
  32. type MessageSet struct {
  33. Messages []*MessageBlock
  34. }
  35. func (ms *MessageSet) encode(pe packetEncoder) {
  36. for i := range ms.Messages {
  37. ms.Messages[i].encode(pe)
  38. }
  39. }
  40. func (ms *MessageSet) decode(pd packetDecoder) (err error) {
  41. ms.Messages = nil
  42. for pd.remaining() > 0 {
  43. msb := new(MessageBlock)
  44. err = msb.decode(pd)
  45. if err != nil {
  46. return err
  47. }
  48. ms.Messages = append(ms.Messages, msb)
  49. }
  50. return nil
  51. }
  52. func (ms *MessageSet) addMessage(msg *Message) {
  53. block := new(MessageBlock)
  54. block.Msg = msg
  55. ms.Messages = append(ms.Messages, block)
  56. }
  57. func newMessageSet() *MessageSet {
  58. set := new(MessageSet)
  59. set.Messages = make([]*MessageBlock, 0)
  60. return set
  61. }