sync_producer.go 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. package sarama
  2. import "sync"
  3. // SyncProducer publishes Kafka messages. It routes messages to the correct broker, refreshing metadata as appropriate,
  4. // and parses responses for errors. You must call Close() on a producer to avoid leaks, it may not be garbage-collected automatically when
  5. // it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).
  6. type SyncProducer interface {
  7. // SendMessage produces a message to the given topic with the given key and value. To send strings as either key or value, see the StringEncoder type.
  8. // It returns the partition and offset of the successfully-produced message, or the error (if any).
  9. SendMessage(topic string, key, value Encoder) (partition int32, offset int64, err error)
  10. // Close shuts down the producer and flushes any messages it may have buffered. You must call this function before
  11. // a producer object passes out of scope, as it may otherwise leak memory. You must call this before calling Close
  12. // on the underlying client.
  13. Close() error
  14. }
  15. type syncProducer struct {
  16. producer *producer
  17. wg sync.WaitGroup
  18. }
  19. // NewSyncProducer creates a new SyncProducer using the given broker addresses and configuration.
  20. func NewSyncProducer(addrs []string, config *Config) (SyncProducer, error) {
  21. p, err := NewProducer(addrs, config)
  22. if err != nil {
  23. return nil, err
  24. }
  25. return newSyncProducerFromProducer(p.(*producer)), nil
  26. }
  27. // NewSyncProducerFromClient creates a new SyncProducer using the given client.
  28. func NewSyncProducerFromClient(client *Client) (SyncProducer, error) {
  29. p, err := NewProducerFromClient(client)
  30. if err != nil {
  31. return nil, err
  32. }
  33. return newSyncProducerFromProducer(p.(*producer)), nil
  34. }
  35. func newSyncProducerFromProducer(p *producer) *syncProducer {
  36. p.conf.Producer.AckSuccesses = true
  37. sp := &syncProducer{producer: p}
  38. sp.wg.Add(2)
  39. go withRecover(sp.handleSuccesses)
  40. go withRecover(sp.handleErrors)
  41. return sp
  42. }
  43. func (sp *syncProducer) SendMessage(topic string, key, value Encoder) (partition int32, offset int64, err error) {
  44. expectation := make(chan error, 1)
  45. msg := &ProducerMessage{Topic: topic, Key: key, Value: value, Metadata: expectation}
  46. sp.producer.Input() <- msg
  47. err = <-expectation
  48. partition = msg.Partition()
  49. offset = msg.Offset()
  50. return
  51. }
  52. func (sp *syncProducer) handleSuccesses() {
  53. defer sp.wg.Done()
  54. for msg := range sp.producer.Successes() {
  55. expectation := msg.Metadata.(chan error)
  56. expectation <- nil
  57. }
  58. }
  59. func (sp *syncProducer) handleErrors() {
  60. defer sp.wg.Done()
  61. for err := range sp.producer.Errors() {
  62. expectation := err.Msg.Metadata.(chan error)
  63. expectation <- err.Err
  64. }
  65. }
  66. func (sp *syncProducer) Close() error {
  67. sp.producer.AsyncClose()
  68. sp.wg.Wait()
  69. return nil
  70. }