message_set.go 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. package kafka
  2. type MessageBlock struct {
  3. Offset int64
  4. Msg *Message
  5. }
  6. func (msb *MessageBlock) encode(pe packetEncoder) error {
  7. pe.putInt64(msb.Offset)
  8. pe.push(&lengthField{})
  9. err := msb.Msg.encode(pe)
  10. if err != nil {
  11. return err
  12. }
  13. return pe.pop()
  14. }
  15. func (msb *MessageBlock) decode(pd packetDecoder) (err error) {
  16. msb.Offset, err = pd.getInt64()
  17. if err != nil {
  18. return err
  19. }
  20. pd.push(&lengthField{})
  21. if err != nil {
  22. return err
  23. }
  24. msb.Msg = new(Message)
  25. err = msb.Msg.decode(pd)
  26. if err != nil {
  27. return err
  28. }
  29. err = pd.pop()
  30. if err != nil {
  31. return err
  32. }
  33. return nil
  34. }
  35. type MessageSet struct {
  36. PartialTrailingMessage bool // whether the set on the wire contained an incomplete trailing MessageBlock
  37. Messages []*MessageBlock
  38. }
  39. func (ms *MessageSet) encode(pe packetEncoder) error {
  40. for i := range ms.Messages {
  41. err := ms.Messages[i].encode(pe)
  42. if err != nil {
  43. return err
  44. }
  45. }
  46. return nil
  47. }
  48. func (ms *MessageSet) decode(pd packetDecoder) (err error) {
  49. ms.Messages = nil
  50. for pd.remaining() > 0 {
  51. msb := new(MessageBlock)
  52. err = msb.decode(pd)
  53. switch err {
  54. case nil:
  55. ms.Messages = append(ms.Messages, msb)
  56. case InsufficientData:
  57. // As an optimization the server is allowed to return a partial message at the
  58. // end of the message set. Clients should handle this case. So we just ignore such things.
  59. ms.PartialTrailingMessage = true
  60. return nil
  61. default:
  62. return err
  63. }
  64. }
  65. return nil
  66. }
  67. func (ms *MessageSet) addMessage(msg *Message) {
  68. block := new(MessageBlock)
  69. block.Msg = msg
  70. ms.Messages = append(ms.Messages, block)
  71. }