123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293 |
- package sarama
- type MessageBlock struct {
- Offset int64
- Msg *Message
- }
- func (msb *MessageBlock) Messages() []*MessageBlock {
- if msb.Msg.Set != nil {
- return msb.Msg.Set.Messages
- }
- return []*MessageBlock{msb}
- }
- func (msb *MessageBlock) encode(pe packetEncoder) error {
- pe.putInt64(msb.Offset)
- pe.push(&lengthField{})
- err := msb.Msg.encode(pe)
- if err != nil {
- return err
- }
- return pe.pop()
- }
- func (msb *MessageBlock) decode(pd packetDecoder) (err error) {
- msb.Offset, err = pd.getInt64()
- if err != nil {
- return err
- }
- pd.push(&lengthField{})
- if err != nil {
- return err
- }
- msb.Msg = new(Message)
- err = msb.Msg.decode(pd)
- if err != nil {
- return err
- }
- err = pd.pop()
- if err != nil {
- return err
- }
- return nil
- }
- type MessageSet struct {
- PartialTrailingMessage bool
- Messages []*MessageBlock
- }
- func (ms *MessageSet) encode(pe packetEncoder) error {
- for i := range ms.Messages {
- err := ms.Messages[i].encode(pe)
- if err != nil {
- return err
- }
- }
- return nil
- }
- func (ms *MessageSet) decode(pd packetDecoder) (err error) {
- ms.Messages = nil
- for pd.remaining() > 0 {
- msb := new(MessageBlock)
- err = msb.decode(pd)
- switch err {
- case nil:
- ms.Messages = append(ms.Messages, msb)
- case InsufficientData:
-
-
- ms.PartialTrailingMessage = true
- return nil
- default:
- return err
- }
- }
- return nil
- }
- func (ms *MessageSet) addMessage(msg *Message) {
- block := new(MessageBlock)
- block.Msg = msg
- ms.Messages = append(ms.Messages, block)
- }
|