message_set.go 851 B

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
  1. package kafka
  2. type messageSetBlock struct {
  3. offset int64
  4. size int32
  5. msg message
  6. }
  7. func (msb *messageSetBlock) encode(pe packetEncoder) {
  8. pe.putInt64(msb.offset)
  9. pe.putInt32(msb.size)
  10. (&msb.msg).encode(pe)
  11. }
  12. func (msb *messageSetBlock) decode(pd packetDecoder) (err error) {
  13. msb.offset, err = pd.getInt64()
  14. if err != nil {
  15. return err
  16. }
  17. msb.size, err = pd.getInt32()
  18. if err != nil {
  19. return err
  20. }
  21. err = (&msb.message).decode(pd)
  22. if err != nil {
  23. return err
  24. }
  25. return nil
  26. }
  27. type messageSet struct {
  28. msgs []*messageSetBlock
  29. }
  30. func (ms *messageSet) encode(pe packetEncoder) {
  31. for i := range ms.msgs {
  32. ms.msgs[i].encode(pe)
  33. }
  34. }
  35. func (ms *messageSet) decode(pd packetDecoder) (err error) {
  36. ms.msgs = make([]*messageSetBlock)
  37. msb = new(messageSetBlock)
  38. err = msb.decode(pd)
  39. if err != nil {
  40. return err
  41. }
  42. return nil
  43. }