sync_producer.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. package sarama
  2. import "sync"
  3. // SyncProducer publishes Kafka messages, blocking until they have been acknowledged. It routes messages to the correct
  4. // broker, refreshing metadata as appropriate, and parses responses for errors. You must call Close() on a producer
  5. // to avoid leaks, it may not be garbage-collected automatically when it passes out of scope.
  6. //
  7. // The SyncProducer comes with two caveats: it will generally be less efficient than the AsyncProducer, and the actual
  8. // durability guarantee provided when a message is acknowledged depend on the configured value of `Producer.RequiredAcks`.
  9. // There are configurations where a message acknowledged by the SyncProducer can still sometimes be lost.
  10. type SyncProducer interface {
  11. // SendMessage produces a given message, and returns only when it either has
  12. // succeeded or failed to produce. It will return the partition and the offset
  13. // of the produced message, or an error if the message failed to produce.
  14. SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error)
  15. // Close shuts down the producer and flushes any messages it may have buffered.
  16. // You must call this function before a producer object passes out of scope, as
  17. // it may otherwise leak memory. You must call this before calling Close on the
  18. // underlying client.
  19. Close() error
  20. }
  21. type syncProducer struct {
  22. producer *asyncProducer
  23. wg sync.WaitGroup
  24. }
  25. // NewSyncProducer creates a new SyncProducer using the given broker addresses and configuration.
  26. func NewSyncProducer(addrs []string, config *Config) (SyncProducer, error) {
  27. p, err := NewAsyncProducer(addrs, config)
  28. if err != nil {
  29. return nil, err
  30. }
  31. return newSyncProducerFromAsyncProducer(p.(*asyncProducer)), nil
  32. }
  33. // NewSyncProducerFromClient creates a new SyncProducer using the given client. It is still
  34. // necessary to call Close() on the underlying client when shutting down this producer.
  35. func NewSyncProducerFromClient(client Client) (SyncProducer, error) {
  36. p, err := NewAsyncProducerFromClient(client)
  37. if err != nil {
  38. return nil, err
  39. }
  40. return newSyncProducerFromAsyncProducer(p.(*asyncProducer)), nil
  41. }
  42. func newSyncProducerFromAsyncProducer(p *asyncProducer) *syncProducer {
  43. p.conf.Producer.Return.Successes = true
  44. p.conf.Producer.Return.Errors = true
  45. sp := &syncProducer{producer: p}
  46. sp.wg.Add(2)
  47. go withRecover(sp.handleSuccesses)
  48. go withRecover(sp.handleErrors)
  49. return sp
  50. }
  51. func (sp *syncProducer) SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error) {
  52. oldMetadata := msg.Metadata
  53. defer func() {
  54. msg.Metadata = oldMetadata
  55. }()
  56. expectation := make(chan error, 1)
  57. msg.Metadata = expectation
  58. sp.producer.Input() <- msg
  59. if err := <-expectation; err != nil {
  60. return -1, -1, err
  61. }
  62. return msg.Partition, msg.Offset, nil
  63. }
  64. func (sp *syncProducer) handleSuccesses() {
  65. defer sp.wg.Done()
  66. for msg := range sp.producer.Successes() {
  67. expectation := msg.Metadata.(chan error)
  68. expectation <- nil
  69. }
  70. }
  71. func (sp *syncProducer) handleErrors() {
  72. defer sp.wg.Done()
  73. for err := range sp.producer.Errors() {
  74. expectation := err.Msg.Metadata.(chan error)
  75. expectation <- err.Err
  76. }
  77. }
  78. func (sp *syncProducer) Close() error {
  79. sp.producer.AsyncClose()
  80. sp.wg.Wait()
  81. return nil
  82. }