simple_producer.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. package sarama
  2. import "sync"
  3. // SimpleProducer publishes Kafka messages. It routes messages to the correct broker, refreshing metadata as appropriate,
  4. // and parses responses for errors. You must call Close() on a producer to avoid leaks, it may not be garbage-collected automatically when
  5. // it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).
  6. type SimpleProducer struct {
  7. producer *Producer
  8. wg sync.WaitGroup
  9. }
  10. // NewSimpleProducer creates a new SimpleProducer using the given client and configuration.
  11. func NewSimpleProducer(client *Client, config *ProducerConfig) (*SimpleProducer, error) {
  12. if config == nil {
  13. config = NewProducerConfig()
  14. }
  15. config.AckSuccesses = true
  16. prod, err := NewProducer(client, config)
  17. if err != nil {
  18. return nil, err
  19. }
  20. sp := &SimpleProducer{producer: prod}
  21. sp.wg.Add(2)
  22. go withRecover(sp.handleSuccesses)
  23. go withRecover(sp.handleErrors)
  24. return sp, nil
  25. }
  26. // SendMessage produces a message to the given topic with the given key and value. To send strings as either key or value, see the StringEncoder type.
  27. func (sp *SimpleProducer) SendMessage(topic string, key, value Encoder) error {
  28. expectation := make(chan error, 1)
  29. msg := &ProducerMessage{Topic: topic, Key: key, Value: value, Metadata: expectation}
  30. sp.producer.Input() <- msg
  31. return <-expectation
  32. }
  33. func (sp *SimpleProducer) handleSuccesses() {
  34. defer sp.wg.Done()
  35. for msg := range sp.producer.Successes() {
  36. expectation := msg.Metadata.(chan error)
  37. expectation <- nil
  38. }
  39. }
  40. func (sp *SimpleProducer) handleErrors() {
  41. defer sp.wg.Done()
  42. for err := range sp.producer.Errors() {
  43. expectation := err.Msg.Metadata.(chan error)
  44. expectation <- err.Err
  45. }
  46. }
  47. // Close shuts down the producer and flushes any messages it may have buffered. You must call this function before
  48. // a producer object passes out of scope, as it may otherwise leak memory. You must call this before calling Close
  49. // on the underlying client.
  50. func (sp *SimpleProducer) Close() error {
  51. sp.producer.AsyncClose()
  52. sp.wg.Wait()
  53. return nil
  54. }