produce_request.go 1.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
  1. package kafka
  2. const (
  3. NO_RESPONSE int16 = 0
  4. WAIT_FOR_LOCAL int16 = 1
  5. WAIT_FOR_ALL int16 = -1
  6. )
  7. type ProduceRequest struct {
  8. ResponseCondition int16
  9. Timeout int32
  10. msgSets map[*string]map[int32]*messageSet
  11. }
  12. func (p *ProduceRequest) encode(pe packetEncoder) {
  13. pe.putInt16(p.ResponseCondition)
  14. pe.putInt32(p.Timeout)
  15. pe.putArrayCount(len(p.msgSets))
  16. for topic, partitions := range p.msgSets {
  17. pe.putString(topic)
  18. pe.putArrayCount(len(partitions))
  19. for id, msgSet := range partitions {
  20. pe.putInt32(id)
  21. msgSet.encode(pe)
  22. }
  23. }
  24. }
  25. func (p *ProduceRequest) key() int16 {
  26. return 0
  27. }
  28. func (p *ProduceRequest) version() int16 {
  29. return 0
  30. }
  31. func (p *ProduceRequest) AddMessage(topic *string, partition int32, msg *Message) {
  32. if p.msgSets == nil {
  33. p.msgSets = make(map[*string]map[int32]*messageSet)
  34. }
  35. if p.msgSets[topic] == nil {
  36. p.msgSets[topic] = make(map[int32]*messageSet)
  37. }
  38. set := p.msgSets[topic][partition]
  39. if set == nil {
  40. set = newMessageSet()
  41. p.msgSets[topic][partition] = set
  42. }
  43. set.addMessage(msg)
  44. }