produce_request.go 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. package protocol
  2. // Special values accepted by Kafka for the ResponseCondition member of produce requests.
  3. const (
  4. NO_RESPONSE int16 = 0 // Don't send any response, the TCP ACK is all you get.
  5. WAIT_FOR_LOCAL int16 = 1 // Wait for only the local commit to succeed before responding.
  6. WAIT_FOR_ALL int16 = -1 // Wait for all replicas to commit before responding.
  7. )
  8. type ProduceRequest struct {
  9. ResponseCondition int16
  10. Timeout int32
  11. msgSets map[string]map[int32]*MessageSet
  12. }
  13. func (p *ProduceRequest) encode(pe packetEncoder) {
  14. pe.putInt16(p.ResponseCondition)
  15. pe.putInt32(p.Timeout)
  16. pe.putArrayCount(len(p.msgSets))
  17. for topic, partitions := range p.msgSets {
  18. pe.putString(topic)
  19. pe.putArrayCount(len(partitions))
  20. for id, msgSet := range partitions {
  21. pe.putInt32(id)
  22. pe.pushLength32()
  23. msgSet.encode(pe)
  24. pe.pop()
  25. }
  26. }
  27. }
  28. func (p *ProduceRequest) key() int16 {
  29. return 0
  30. }
  31. func (p *ProduceRequest) version() int16 {
  32. return 0
  33. }
  34. func (p *ProduceRequest) AddMessage(topic string, partition int32, msg *Message) {
  35. if p.msgSets == nil {
  36. p.msgSets = make(map[string]map[int32]*MessageSet)
  37. }
  38. if p.msgSets[topic] == nil {
  39. p.msgSets[topic] = make(map[int32]*MessageSet)
  40. }
  41. set := p.msgSets[topic][partition]
  42. if set == nil {
  43. set = newMessageSet()
  44. p.msgSets[topic][partition] = set
  45. }
  46. set.addMessage(msg)
  47. }