sync_producer.go 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. package sarama
  2. import "sync"
  3. // SyncProducer publishes Kafka messages. It routes messages to the correct broker, refreshing metadata as appropriate,
  4. // and parses responses for errors. You must call Close() on a producer to avoid leaks, it may not be garbage-collected automatically when
  5. // it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).
  6. type SyncProducer struct {
  7. producer *Producer
  8. wg sync.WaitGroup
  9. }
  10. // NewSyncProducer creates a new SyncProducer using the given client and configuration.
  11. func NewSyncProducer(client *Client, config *ProducerConfig) (*SyncProducer, error) {
  12. if config == nil {
  13. config = NewProducerConfig()
  14. }
  15. config.AckSuccesses = true
  16. prod, err := NewProducer(client, config)
  17. if err != nil {
  18. return nil, err
  19. }
  20. sp := &SyncProducer{producer: prod}
  21. sp.wg.Add(2)
  22. go withRecover(sp.handleSuccesses)
  23. go withRecover(sp.handleErrors)
  24. return sp, nil
  25. }
  26. // SendMessage produces a message to the given topic with the given key and value. To send strings as either key or value, see the StringEncoder type.
  27. // It returns the partition and offset of the successfully-produced message, or the error (if any).
  28. func (sp *SyncProducer) SendMessage(topic string, key, value Encoder) (partition int32, offset int64, err error) {
  29. expectation := make(chan error, 1)
  30. msg := &ProducerMessage{Topic: topic, Key: key, Value: value, Metadata: expectation}
  31. sp.producer.Input() <- msg
  32. err = <-expectation
  33. partition = msg.Partition()
  34. offset = msg.Offset()
  35. return
  36. }
  37. func (sp *SyncProducer) handleSuccesses() {
  38. defer sp.wg.Done()
  39. for msg := range sp.producer.Successes() {
  40. expectation := msg.Metadata.(chan error)
  41. expectation <- nil
  42. }
  43. }
  44. func (sp *SyncProducer) handleErrors() {
  45. defer sp.wg.Done()
  46. for err := range sp.producer.Errors() {
  47. expectation := err.Msg.Metadata.(chan error)
  48. expectation <- err.Err
  49. }
  50. }
  51. // Close shuts down the producer and flushes any messages it may have buffered. You must call this function before
  52. // a producer object passes out of scope, as it may otherwise leak memory. You must call this before calling Close
  53. // on the underlying client.
  54. func (sp *SyncProducer) Close() error {
  55. sp.producer.AsyncClose()
  56. sp.wg.Wait()
  57. return nil
  58. }