produce_request.go 995 B

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
  1. package protocol
  2. type ProduceRequest struct {
  3. RequiredAcks int16
  4. Timeout int32
  5. msgSets map[string]map[int32]*MessageSet
  6. }
  7. func (p *ProduceRequest) encode(pe packetEncoder) {
  8. pe.putInt16(p.RequiredAcks)
  9. pe.putInt32(p.Timeout)
  10. pe.putArrayCount(len(p.msgSets))
  11. for topic, partitions := range p.msgSets {
  12. pe.putString(topic)
  13. pe.putArrayCount(len(partitions))
  14. for id, msgSet := range partitions {
  15. pe.putInt32(id)
  16. pe.pushLength32()
  17. msgSet.encode(pe)
  18. pe.pop()
  19. }
  20. }
  21. }
  22. func (p *ProduceRequest) key() int16 {
  23. return 0
  24. }
  25. func (p *ProduceRequest) version() int16 {
  26. return 0
  27. }
  28. func (p *ProduceRequest) AddMessage(topic string, partition int32, msg *Message) {
  29. if p.msgSets == nil {
  30. p.msgSets = make(map[string]map[int32]*MessageSet)
  31. }
  32. if p.msgSets[topic] == nil {
  33. p.msgSets[topic] = make(map[int32]*MessageSet)
  34. }
  35. set := p.msgSets[topic][partition]
  36. if set == nil {
  37. set = new(MessageSet)
  38. p.msgSets[topic][partition] = set
  39. }
  40. set.addMessage(msg)
  41. }