| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485 |
- package protocol
- import enc "sarama/encoding"
- type MessageBlock struct {
- Offset int64
- Msg *Message
- }
- func (msb *MessageBlock) Encode(pe enc.PacketEncoder) error {
- pe.PutInt64(msb.Offset)
- pe.Push(&enc.LengthField{})
- err := msb.Msg.Encode(pe)
- if err != nil {
- return err
- }
- pe.Pop()
- }
- func (msb *MessageBlock) Decode(pd enc.PacketDecoder) (err error) {
- msb.Offset, err = pd.GetInt64()
- if err != nil {
- return err
- }
- err = pd.Push(&enc.LengthField{})
- if err != nil {
- return err
- }
- msb.Msg = new(Message)
- err = msb.Msg.Decode(pd)
- if err != nil {
- return err
- }
- err = pd.Pop()
- if err != nil {
- return err
- }
- return nil
- }
- type MessageSet struct {
- PartialTrailingMessage bool // whether the set on the wire contained an incomplete trailing MessageBlock
- Messages []*MessageBlock
- }
- func (ms *MessageSet) Encode(pe enc.PacketEncoder) error {
- for i := range ms.Messages {
- err := ms.Messages[i].Encode(pe)
- if err != nil {
- return err
- }
- }
- }
- func (ms *MessageSet) Decode(pd enc.PacketDecoder) (err error) {
- ms.Messages = nil
- for pd.Remaining() > 0 {
- msb := new(MessageBlock)
- err = msb.Decode(pd)
- switch err.(type) {
- case nil:
- ms.Messages = append(ms.Messages, msb)
- case enc.InsufficientData:
- // As an optimization the server is allowed to return a partial message at the
- // end of the message set. Clients should handle this case. So we just ignore such things.
- ms.PartialTrailingMessage = true
- return nil
- default:
- return err
- }
- }
- return nil
- }
- func (ms *MessageSet) addMessage(msg *Message) {
- block := new(MessageBlock)
- block.Msg = msg
- ms.Messages = append(ms.Messages, block)
- }
|