message_set.go 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. package kafka
  2. type messageSetBlock struct {
  3. offset int64
  4. msg *Message
  5. }
  6. func (msb *messageSetBlock) encode(pe packetEncoder) {
  7. pe.putInt64(msb.offset)
  8. pe.pushLength32()
  9. msb.msg.encode(pe)
  10. pe.pop()
  11. }
  12. func (msb *messageSetBlock) decode(pd packetDecoder) (err error) {
  13. msb.offset, err = pd.getInt64()
  14. if err != nil {
  15. return err
  16. }
  17. err = pd.pushLength32()
  18. if err != nil {
  19. return err
  20. }
  21. msb.msg = new(Message)
  22. err = msb.msg.decode(pd)
  23. if err != nil {
  24. return err
  25. }
  26. err = pd.pop()
  27. if err != nil {
  28. return err
  29. }
  30. return nil
  31. }
  32. type messageSet struct {
  33. msgs []*messageSetBlock
  34. }
  35. func (ms *messageSet) encode(pe packetEncoder) {
  36. for i := range ms.msgs {
  37. ms.msgs[i].encode(pe)
  38. }
  39. }
  40. func (ms *messageSet) decode(pd packetDecoder) (err error) {
  41. ms.msgs = nil
  42. for pd.remaining() > 0 {
  43. msb := new(messageSetBlock)
  44. err = msb.decode(pd)
  45. if err != nil {
  46. return err
  47. }
  48. ms.msgs = append(ms.msgs, msb)
  49. }
  50. return nil
  51. }
  52. func newMessageSet() *messageSet {
  53. set := new(messageSet)
  54. set.msgs = make([]*messageSetBlock, 0)
  55. return set
  56. }
  57. func (ms *messageSet) addMessage(msg *Message) {
  58. block := new(messageSetBlock)
  59. block.msg = msg
  60. ms.msgs = append(ms.msgs, block)
  61. }