message_set.go 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. package sarama
  2. type MessageBlock struct {
  3. Offset int64
  4. Msg *Message
  5. }
  6. // Messages convenience helper which returns either all the
  7. // messages that are wrapped in this block
  8. func (msb *MessageBlock) Messages() []*MessageBlock {
  9. if msb.Msg.Set != nil {
  10. return msb.Msg.Set.Messages
  11. }
  12. return []*MessageBlock{msb}
  13. }
  14. func (msb *MessageBlock) encode(pe packetEncoder) error {
  15. pe.putInt64(msb.Offset)
  16. pe.push(&lengthField{})
  17. err := msb.Msg.encode(pe)
  18. if err != nil {
  19. return err
  20. }
  21. return pe.pop()
  22. }
  23. func (msb *MessageBlock) decode(pd packetDecoder) (err error) {
  24. msb.Offset, err = pd.getInt64()
  25. if err != nil {
  26. return err
  27. }
  28. pd.push(&lengthField{})
  29. if err != nil {
  30. return err
  31. }
  32. msb.Msg = new(Message)
  33. err = msb.Msg.decode(pd)
  34. if err != nil {
  35. return err
  36. }
  37. err = pd.pop()
  38. if err != nil {
  39. return err
  40. }
  41. return nil
  42. }
  43. type MessageSet struct {
  44. PartialTrailingMessage bool // whether the set on the wire contained an incomplete trailing MessageBlock
  45. Messages []*MessageBlock
  46. }
  47. func (ms *MessageSet) encode(pe packetEncoder) error {
  48. for i := range ms.Messages {
  49. err := ms.Messages[i].encode(pe)
  50. if err != nil {
  51. return err
  52. }
  53. }
  54. return nil
  55. }
  56. func (ms *MessageSet) decode(pd packetDecoder) (err error) {
  57. ms.Messages = nil
  58. for pd.remaining() > 0 {
  59. msb := new(MessageBlock)
  60. err = msb.decode(pd)
  61. switch err {
  62. case nil:
  63. ms.Messages = append(ms.Messages, msb)
  64. case InsufficientData:
  65. // As an optimization the server is allowed to return a partial message at the
  66. // end of the message set. Clients should handle this case. So we just ignore such things.
  67. ms.PartialTrailingMessage = true
  68. return nil
  69. default:
  70. return err
  71. }
  72. }
  73. return nil
  74. }
  75. func (ms *MessageSet) addMessage(msg *Message) {
  76. block := new(MessageBlock)
  77. block.Msg = msg
  78. ms.Messages = append(ms.Messages, block)
  79. }