produce_message.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. package sarama
  2. import (
  3. "log"
  4. "time"
  5. )
  6. type produceMessage struct {
  7. tp topicPartition
  8. key, value []byte
  9. retried bool
  10. sync bool
  11. }
  12. type produceRequestBuilder []*produceMessage
  13. // If the message is synchronous, we manually send it and wait for a return.
  14. // Otherwise, we just hand it back to the producer to enqueue using the normal
  15. // method.
  16. func (msg *produceMessage) enqueue(p *Producer) error {
  17. if !msg.sync {
  18. return p.addMessage(msg)
  19. }
  20. var prb produceRequestBuilder = []*produceMessage{msg}
  21. bp, err := p.brokerProducerFor(msg.tp)
  22. if err != nil {
  23. return err
  24. }
  25. errs := make(chan error, 1)
  26. bp.flushRequest(p, prb, func(err error) {
  27. errs <- err
  28. close(errs)
  29. })
  30. return <-errs
  31. }
  32. func (msg *produceMessage) reenqueue(p *Producer) error {
  33. if msg.retried {
  34. return DroppedMessagesError{}
  35. }
  36. msg.retried = true
  37. return msg.enqueue(p)
  38. }
  39. func (msg *produceMessage) hasTopicPartition(topic string, partition int32) bool {
  40. return msg.tp.partition == partition && msg.tp.topic == topic
  41. }
  42. func (b produceRequestBuilder) toRequest(config *ProducerConfig) *ProduceRequest {
  43. req := &ProduceRequest{RequiredAcks: config.RequiredAcks, Timeout: int32(config.Timeout / time.Millisecond)}
  44. // If compression is enabled, we need to group messages by topic-partition and
  45. // wrap them in MessageSets. We already discarded that grouping, so we
  46. // inefficiently re-sort them. This could be optimized (ie. pass a hash around
  47. // rather than an array. Not sure what the best way is.
  48. if config.Compression != CompressionNone {
  49. msgSets := make(map[topicPartition]*MessageSet)
  50. for _, pmsg := range b {
  51. msgSet, ok := msgSets[pmsg.tp]
  52. if !ok {
  53. msgSet = new(MessageSet)
  54. msgSets[pmsg.tp] = msgSet
  55. }
  56. msgSet.addMessage(&Message{Codec: CompressionNone, Key: pmsg.key, Value: pmsg.value})
  57. }
  58. for tp, msgSet := range msgSets {
  59. valBytes, err := encode(msgSet)
  60. if err != nil {
  61. log.Fatal(err) // if this happens, it's basically our fault.
  62. }
  63. msg := Message{Codec: config.Compression, Key: nil, Value: valBytes}
  64. req.AddMessage(tp.topic, tp.partition, &msg)
  65. }
  66. return req
  67. }
  68. // Compression is not enabled. Dumb-ly append each request directly to the
  69. // request, with no MessageSet wrapper.
  70. for _, pmsg := range b {
  71. msg := Message{Codec: config.Compression, Key: pmsg.key, Value: pmsg.value}
  72. req.AddMessage(pmsg.tp.topic, pmsg.tp.partition, &msg)
  73. }
  74. return req
  75. }
  76. func (msg *produceMessage) byteSize() uint32 {
  77. return uint32(len(msg.key) + len(msg.value))
  78. }
  79. func (b produceRequestBuilder) byteSize() uint32 {
  80. var size uint32
  81. for _, m := range b {
  82. size += m.byteSize()
  83. }
  84. return size
  85. }
  86. func (b produceRequestBuilder) reverseEach(fn func(m *produceMessage)) {
  87. for i := len(b) - 1; i >= 0; i-- {
  88. fn(b[i])
  89. }
  90. }