produce_message.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. package sarama
  2. import "log"
  3. type produceMessage struct {
  4. tp topicPartition
  5. key, value []byte
  6. retried bool
  7. sync bool
  8. }
  9. type produceRequestBuilder []*produceMessage
  10. // If the message is synchronous, we manually send it and wait for a return.
  11. // Otherwise, we just hand it back to the producer to enqueue using the normal
  12. // method.
  13. func (msg *produceMessage) enqueue(p *Producer) error {
  14. if !msg.sync {
  15. return p.addMessage(msg)
  16. }
  17. var prb produceRequestBuilder = []*produceMessage{msg}
  18. bp, err := p.brokerProducerFor(msg.tp)
  19. if err != nil {
  20. return err
  21. }
  22. errs := make(chan error, 1)
  23. bp.flushRequest(p, prb, func(err error) {
  24. errs <- err
  25. })
  26. return <-errs
  27. }
  28. func (msg *produceMessage) reenqueue(p *Producer) error {
  29. if !msg.retried {
  30. msg.retried = true
  31. return msg.enqueue(p)
  32. }
  33. return nil
  34. }
  35. func (msg *produceMessage) hasTopicPartition(topic string, partition int32) bool {
  36. return msg.tp.partition == partition && msg.tp.topic == topic
  37. }
  38. func (b produceRequestBuilder) toRequest(config *ProducerConfig) *ProduceRequest {
  39. req := &ProduceRequest{RequiredAcks: config.RequiredAcks, Timeout: config.Timeout}
  40. // If compression is enabled, we need to group messages by topic-partition and
  41. // wrap them in MessageSets. We already discarded that grouping, so we
  42. // inefficiently re-sort them. This could be optimized (ie. pass a hash around
  43. // rather than an array. Not sure what the best way is.
  44. if config.Compression != CompressionNone {
  45. msgSets := make(map[topicPartition]*MessageSet)
  46. for _, pmsg := range b {
  47. msgSet, ok := msgSets[pmsg.tp]
  48. if !ok {
  49. msgSet = new(MessageSet)
  50. msgSets[pmsg.tp] = msgSet
  51. }
  52. msgSet.addMessage(&Message{Codec: CompressionNone, Key: pmsg.key, Value: pmsg.value})
  53. }
  54. for tp, msgSet := range msgSets {
  55. valBytes, err := encode(msgSet)
  56. if err != nil {
  57. log.Fatal(err) // if this happens, it's basically our fault.
  58. }
  59. msg := Message{Codec: config.Compression, Key: nil, Value: valBytes}
  60. req.AddMessage(tp.topic, tp.partition, &msg)
  61. }
  62. return req
  63. }
  64. // Compression is not enabled. Dumb-ly append each request directly to the
  65. // request, with no MessageSet wrapper.
  66. for _, pmsg := range b {
  67. msg := Message{Codec: config.Compression, Key: pmsg.key, Value: pmsg.value}
  68. req.AddMessage(pmsg.tp.topic, pmsg.tp.partition, &msg)
  69. }
  70. return req
  71. }
  72. func (msg *produceMessage) byteSize() uint32 {
  73. return uint32(len(msg.key) + len(msg.value))
  74. }
  75. func (b produceRequestBuilder) byteSize() uint32 {
  76. var size uint32
  77. for _, m := range b {
  78. size += m.byteSize()
  79. }
  80. return size
  81. }
  82. func (b produceRequestBuilder) reverseEach(fn func(m *produceMessage)) {
  83. for i := len(b) - 1; i >= 0; i-- {
  84. fn(b[i])
  85. }
  86. }