message_set.go 1.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. package kafka
  2. type messageSetBlock struct {
  3. offset int64
  4. msg message
  5. }
  6. func (msb *messageSetBlock) encode(pe packetEncoder) {
  7. pe.putInt64(msb.offset)
  8. pe.pushLength32()
  9. (&msb.msg).encode(pe)
  10. pe.pop()
  11. }
  12. func (msb *messageSetBlock) decode(pd packetDecoder) (err error) {
  13. msb.offset, err = pd.getInt64()
  14. if err != nil {
  15. return err
  16. }
  17. err = pd.pushLength32()
  18. if err != nil {
  19. return err
  20. }
  21. err = (&msb.msg).decode(pd)
  22. if err != nil {
  23. return err
  24. }
  25. err = pd.pop()
  26. if err != nil {
  27. return err
  28. }
  29. return nil
  30. }
  31. type messageSet struct {
  32. msgs []*messageSetBlock
  33. }
  34. func (ms *messageSet) encode(pe packetEncoder) {
  35. for i := range ms.msgs {
  36. ms.msgs[i].encode(pe)
  37. }
  38. }
  39. func (ms *messageSet) decode(pd packetDecoder) (err error) {
  40. ms.msgs = nil
  41. for pd.remaining() > 0 {
  42. msb := new(messageSetBlock)
  43. err = msb.decode(pd)
  44. if err != nil {
  45. return err
  46. }
  47. ms.msgs = append(ms.msgs, msb)
  48. }
  49. return nil
  50. }
  51. func newSingletonMessageSet(msg *message) *messageSet {
  52. tmp := make([]*messageSetBlock, 1)
  53. tmp[0] = &messageSetBlock{msg: *msg}
  54. return &messageSet{tmp}
  55. }