produce_request.go 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. package protocol
  2. import enc "sarama/encoding"
  3. import "sarama/types"
  4. type ProduceRequest struct {
  5. RequiredAcks types.RequiredAcks
  6. Timeout int32
  7. msgSets map[string]map[int32]*MessageSet
  8. }
  9. func (p *ProduceRequest) Encode(pe enc.PacketEncoder) error {
  10. pe.PutInt16(int16(p.RequiredAcks))
  11. pe.PutInt32(p.Timeout)
  12. err := pe.PutArrayLength(len(p.msgSets))
  13. if err != nil {
  14. return err
  15. }
  16. for topic, partitions := range p.msgSets {
  17. err = pe.PutString(topic)
  18. if err != nil {
  19. return err
  20. }
  21. err = pe.PutArrayLength(len(partitions))
  22. if err != nil {
  23. return err
  24. }
  25. for id, msgSet := range partitions {
  26. pe.PutInt32(id)
  27. pe.Push(&enc.LengthField{})
  28. err = msgSet.Encode(pe)
  29. if err != nil {
  30. return err
  31. }
  32. err = pe.Pop()
  33. if err != nil {
  34. return err
  35. }
  36. }
  37. }
  38. return nil
  39. }
  40. func (p *ProduceRequest) key() int16 {
  41. return 0
  42. }
  43. func (p *ProduceRequest) version() int16 {
  44. return 0
  45. }
  46. func (p *ProduceRequest) AddMessage(topic string, partition int32, msg *Message) {
  47. if p.msgSets == nil {
  48. p.msgSets = make(map[string]map[int32]*MessageSet)
  49. }
  50. if p.msgSets[topic] == nil {
  51. p.msgSets[topic] = make(map[int32]*MessageSet)
  52. }
  53. set := p.msgSets[topic][partition]
  54. if set == nil {
  55. set = new(MessageSet)
  56. p.msgSets[topic][partition] = set
  57. }
  58. set.addMessage(msg)
  59. }