producer.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. package sarama
  2. // ProducerConfig is used to pass multiple configuration options to NewProducer.
  3. type ProducerConfig struct {
  4. Partitioner Partitioner // Chooses the partition to send messages to, or randomly if this is nil.
  5. RequiredAcks RequiredAcks // The level of acknowledgement reliability needed from the broker (defaults to no acknowledgement).
  6. Timeout int32 // The maximum time in ms the broker will wait the receipt of the number of RequiredAcks.
  7. }
  8. // Producer publishes Kafka messages on a given topic. It routes messages to the correct broker, refreshing metadata as appropriate,
  9. // and parses responses for errors. You must call Close() on a producer to avoid leaks, it may not be garbage-collected automatically when
  10. // it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).
  11. type Producer struct {
  12. client *Client
  13. topic string
  14. config ProducerConfig
  15. }
  16. // NewProducer creates a new Producer using the given client. The resulting producer will publish messages on the given topic.
  17. func NewProducer(client *Client, topic string, config *ProducerConfig) (*Producer, error) {
  18. if config == nil {
  19. config = new(ProducerConfig)
  20. }
  21. if config.RequiredAcks < -1 {
  22. return nil, ConfigurationError("Invalid RequiredAcks")
  23. }
  24. if config.Timeout < 0 {
  25. return nil, ConfigurationError("Invalid Timeout")
  26. }
  27. if config.Partitioner == nil {
  28. config.Partitioner = RandomPartitioner{}
  29. }
  30. p := new(Producer)
  31. p.client = client
  32. p.topic = topic
  33. p.config = *config
  34. return p, nil
  35. }
  36. // Close shuts down the producer and flushes any messages it may have buffered. You must call this function before
  37. // a producer object passes out of scope, as it may otherwise leak memory. You must call this before calling Close
  38. // on the underlying client.
  39. func (p *Producer) Close() error {
  40. // no-op for now, adding for consistency and so the API doesn't change when we add buffering
  41. // (which will require a goroutine, which will require a close method in order to flush the buffer).
  42. return nil
  43. }
  44. // SendMessage sends a message with the given key and value. The partition to send to is selected by the Producer's Partitioner.
  45. // To send strings as either key or value, see the StringEncoder type.
  46. func (p *Producer) SendMessage(key, value Encoder) error {
  47. return p.safeSendMessage(key, value, true)
  48. }
  49. func (p *Producer) choosePartition(key Encoder) (int32, error) {
  50. partitions, err := p.client.partitions(p.topic)
  51. if err != nil {
  52. return -1, err
  53. }
  54. choice := p.config.Partitioner.Partition(key, len(partitions))
  55. if choice >= len(partitions) {
  56. return -1, InvalidPartition
  57. }
  58. return partitions[choice], nil
  59. }
  60. func (p *Producer) safeSendMessage(key, value Encoder, retry bool) error {
  61. partition, err := p.choosePartition(key)
  62. if err != nil {
  63. return err
  64. }
  65. var keyBytes []byte
  66. var valBytes []byte
  67. if key != nil {
  68. keyBytes, err = key.Encode()
  69. if err != nil {
  70. return err
  71. }
  72. }
  73. valBytes, err = value.Encode()
  74. if err != nil {
  75. return err
  76. }
  77. broker, err := p.client.leader(p.topic, partition)
  78. if err != nil {
  79. return err
  80. }
  81. request := &ProduceRequest{RequiredAcks: p.config.RequiredAcks, Timeout: p.config.Timeout}
  82. request.AddMessage(p.topic, partition, &Message{Key: keyBytes, Value: valBytes})
  83. response, err := broker.Produce(p.client.id, request)
  84. switch err {
  85. case nil:
  86. break
  87. case EncodingError:
  88. return err
  89. default:
  90. if !retry {
  91. return err
  92. }
  93. p.client.disconnectBroker(broker)
  94. return p.safeSendMessage(key, value, false)
  95. }
  96. if response == nil {
  97. return nil
  98. }
  99. block := response.GetBlock(p.topic, partition)
  100. if block == nil {
  101. return IncompleteResponse
  102. }
  103. switch block.Err {
  104. case NO_ERROR:
  105. return nil
  106. case UNKNOWN_TOPIC_OR_PARTITION, NOT_LEADER_FOR_PARTITION, LEADER_NOT_AVAILABLE:
  107. if !retry {
  108. return block.Err
  109. }
  110. err = p.client.refreshTopic(p.topic)
  111. if err != nil {
  112. return err
  113. }
  114. return p.safeSendMessage(key, value, false)
  115. }
  116. return block.Err
  117. }