produce_request.go 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. package sarama
  2. // RequiredAcks is used in Produce Requests to tell the broker how many replica acknowledgements
  3. // it must see before responding. Any positive int16 value is valid, or the constants defined here.
  4. type RequiredAcks int16
  5. const (
  6. // NoResponse doesn't send any response, the TCP ACK is all you get.
  7. NoResponse RequiredAcks = 0
  8. // WaitForLocal waits for only the local commit to succeed before responding.
  9. WaitForLocal RequiredAcks = 1
  10. // WaitForAll waits for all replicas to commit before responding.
  11. WaitForAll RequiredAcks = -1
  12. )
  13. type ProduceRequest struct {
  14. RequiredAcks RequiredAcks
  15. Timeout int32
  16. msgSets map[string]map[int32]*MessageSet
  17. }
  18. func (p *ProduceRequest) encode(pe packetEncoder) error {
  19. pe.putInt16(int16(p.RequiredAcks))
  20. pe.putInt32(p.Timeout)
  21. err := pe.putArrayLength(len(p.msgSets))
  22. if err != nil {
  23. return err
  24. }
  25. for topic, partitions := range p.msgSets {
  26. err = pe.putString(topic)
  27. if err != nil {
  28. return err
  29. }
  30. err = pe.putArrayLength(len(partitions))
  31. if err != nil {
  32. return err
  33. }
  34. for id, msgSet := range partitions {
  35. pe.putInt32(id)
  36. pe.push(&lengthField{})
  37. err = msgSet.encode(pe)
  38. if err != nil {
  39. return err
  40. }
  41. err = pe.pop()
  42. if err != nil {
  43. return err
  44. }
  45. }
  46. }
  47. return nil
  48. }
  49. func (p *ProduceRequest) key() int16 {
  50. return 0
  51. }
  52. func (p *ProduceRequest) version() int16 {
  53. return 0
  54. }
  55. func (p *ProduceRequest) AddMessage(topic string, partition int32, msg *Message) {
  56. if p.msgSets == nil {
  57. p.msgSets = make(map[string]map[int32]*MessageSet)
  58. }
  59. if p.msgSets[topic] == nil {
  60. p.msgSets[topic] = make(map[int32]*MessageSet)
  61. }
  62. set := p.msgSets[topic][partition]
  63. if set == nil {
  64. set = new(MessageSet)
  65. p.msgSets[topic][partition] = set
  66. }
  67. set.addMessage(msg)
  68. }