message_set.go 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. package protocol
  2. type MessageBlock struct {
  3. Offset int64
  4. Msg *Message
  5. }
  6. func (msb *MessageBlock) encode(pe packetEncoder) {
  7. pe.putInt64(msb.Offset)
  8. pe.pushLength32()
  9. msb.Msg.encode(pe)
  10. pe.pop()
  11. }
  12. func (msb *MessageBlock) decode(pd packetDecoder) (err error) {
  13. msb.Offset, err = pd.getInt64()
  14. if err != nil {
  15. return err
  16. }
  17. err = pd.pushLength32()
  18. if err != nil {
  19. return err
  20. }
  21. msb.Msg = new(Message)
  22. err = msb.Msg.decode(pd)
  23. if err != nil {
  24. return err
  25. }
  26. err = pd.pop()
  27. if err != nil {
  28. return err
  29. }
  30. return nil
  31. }
  32. type MessageSet struct {
  33. PartialTrailingMessage bool // whether the set on the wire contained an incomplete trailing MessageBlock
  34. Messages []*MessageBlock
  35. }
  36. func (ms *MessageSet) encode(pe packetEncoder) {
  37. for i := range ms.Messages {
  38. ms.Messages[i].encode(pe)
  39. }
  40. }
  41. func (ms *MessageSet) decode(pd packetDecoder) (err error) {
  42. ms.Messages = nil
  43. for pd.remaining() > 0 {
  44. msb := new(MessageBlock)
  45. err = msb.decode(pd)
  46. switch err.(type) {
  47. case nil:
  48. ms.Messages = append(ms.Messages, msb)
  49. case InsufficientData:
  50. // As an optimization the server is allowed to return a partial message at the
  51. // end of the message set. Clients should handle this case. So we just ignore such things.
  52. ms.PartialTrailingMessage = true
  53. return nil
  54. default:
  55. return err
  56. }
  57. }
  58. return nil
  59. }
  60. func (ms *MessageSet) addMessage(msg *Message) {
  61. block := new(MessageBlock)
  62. block.Msg = msg
  63. ms.Messages = append(ms.Messages, block)
  64. }