simple_producer.go 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647
  1. package sarama
  2. // SimpleProducer publishes Kafka messages. It routes messages to the correct broker, refreshing metadata as appropriate,
  3. // and parses responses for errors. You must call Close() on a producer to avoid leaks, it may not be garbage-collected automatically when
  4. // it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).
  5. type SimpleProducer struct {
  6. producer *Producer
  7. topic string
  8. }
  9. // NewSimpleProducer creates a new SimpleProducer using the given client, topic and partitioner. If the
  10. // partitioner is nil, messages are partitioned randomly.
  11. func NewSimpleProducer(client *Client, topic string, partitioner Partitioner) (*SimpleProducer, error) {
  12. if topic == "" {
  13. return nil, ConfigurationError("Empty topic")
  14. }
  15. config := NewProducerConfig()
  16. config.AckSuccesses = true
  17. if partitioner != nil {
  18. config.Partitioner = partitioner
  19. }
  20. prod, err := NewProducer(client, config)
  21. if err != nil {
  22. return nil, err
  23. }
  24. return &SimpleProducer{prod, topic}, nil
  25. }
  26. // SendMessage produces a message with the given key and value. To send strings as either key or value, see the StringEncoder type.
  27. func (sp *SimpleProducer) SendMessage(key, value Encoder) error {
  28. sp.producer.Input() <- &MessageToSend{Topic: sp.topic, Key: key, Value: value}
  29. result := <-sp.producer.Errors() // we always get something because AckSuccesses is true
  30. return result.Err
  31. }
  32. // Close shuts down the producer and flushes any messages it may have buffered. You must call this function before
  33. // a producer object passes out of scope, as it may otherwise leak memory. You must call this before calling Close
  34. // on the underlying client.
  35. func (sp *SimpleProducer) Close() error {
  36. return sp.producer.Close()
  37. }