produce_message.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. package sarama
  2. import "log"
  3. type produceMessage struct {
  4. tp topicPartition
  5. key, value []byte
  6. failures uint32
  7. sync bool
  8. }
  9. const retryLimit = 1
  10. type produceRequestBuilder []*produceMessage
  11. // If the message is synchronous, we manually send it and wait for a return.
  12. // Otherwise, we just hand it back to the producer to enqueue using the normal
  13. // method.
  14. func (msg *produceMessage) enqueue(p *Producer) error {
  15. if !msg.sync {
  16. return p.addMessage(msg)
  17. }
  18. var prb produceRequestBuilder = []*produceMessage{msg}
  19. bp, err := p.brokerProducerFor(msg.tp)
  20. if err != nil {
  21. return err
  22. }
  23. errs := make(chan error, 1)
  24. bp.flushRequest(p, prb, func(err error) {
  25. errs <- err
  26. })
  27. return <-errs
  28. }
  29. func (msg *produceMessage) reenqueue(p *Producer) error {
  30. if msg.failures < retryLimit {
  31. msg.failures++
  32. return msg.enqueue(p)
  33. }
  34. return nil
  35. }
  36. func (msg *produceMessage) hasTopicPartition(topic string, partition int32) bool {
  37. return msg.tp.partition == partition && msg.tp.topic == topic
  38. }
  39. func (b produceRequestBuilder) toRequest(config *ProducerConfig) *ProduceRequest {
  40. req := &ProduceRequest{RequiredAcks: config.RequiredAcks, Timeout: config.Timeout}
  41. // If compression is enabled, we need to group messages by topic-partition and
  42. // wrap them in MessageSets. We already discarded that grouping, so we
  43. // inefficiently re-sort them. This could be optimized (ie. pass a hash around
  44. // rather than an array. Not sure what the best way is.
  45. if config.Compression != CompressionNone {
  46. msgSets := make(map[topicPartition]*MessageSet)
  47. for _, pmsg := range b {
  48. msgSet, ok := msgSets[pmsg.tp]
  49. if !ok {
  50. msgSet = new(MessageSet)
  51. msgSets[pmsg.tp] = msgSet
  52. }
  53. msgSet.addMessage(&Message{Codec: CompressionNone, Key: pmsg.key, Value: pmsg.value})
  54. }
  55. for tp, msgSet := range msgSets {
  56. valBytes, err := encode(msgSet)
  57. if err != nil {
  58. log.Fatal(err) // if this happens, it's basically our fault.
  59. }
  60. msg := Message{Codec: config.Compression, Key: nil, Value: valBytes}
  61. req.AddMessage(tp.topic, tp.partition, &msg)
  62. }
  63. return req
  64. }
  65. // Compression is not enabled. Dumb-ly append each request directly to the
  66. // request, with no MessageSet wrapper.
  67. for _, pmsg := range b {
  68. msg := Message{Codec: config.Compression, Key: pmsg.key, Value: pmsg.value}
  69. req.AddMessage(pmsg.tp.topic, pmsg.tp.partition, &msg)
  70. }
  71. return req
  72. }
  73. func (msg *produceMessage) byteSize() uint32 {
  74. return uint32(len(msg.key) + len(msg.value))
  75. }
  76. func (b produceRequestBuilder) byteSize() uint32 {
  77. var size uint32
  78. for _, m := range b {
  79. size += m.byteSize()
  80. }
  81. return size
  82. }
  83. func (b produceRequestBuilder) reverseEach(fn func(m *produceMessage)) {
  84. for i := len(b) - 1; i >= 0; i-- {
  85. fn(b[i])
  86. }
  87. }