sync_producer.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. package sarama
  2. import "sync"
  3. // SyncProducer publishes Kafka messages, blocking until they have been acknowledged. It routes messages to the correct
  4. // broker, refreshing metadata as appropriate, and parses responses for errors. You must call Close() on a producer
  5. // to avoid leaks, it may not be garbage-collected automatically when it passes out of scope.
  6. //
  7. // The SyncProducer comes with two caveats: it will generally be less efficient than the AsyncProducer, and the actual
  8. // durability guarantee provided when a message is acknowledged depend on the configured value of `Producer.RequiredAcks`.
  9. // There are configurations where a message acknowledged by the SyncProducer can still sometimes be lost.
  10. //
  11. // For implementation reasons, the SyncProducer requires `Producer.Return.Errors` and `Producer.Return.Successes` to
  12. // be set to true in its configuration.
  13. type SyncProducer interface {
  14. // SendMessage produces a given message, and returns only when it either has
  15. // succeeded or failed to produce. It will return the partition and the offset
  16. // of the produced message, or an error if the message failed to produce.
  17. SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error)
  18. // SendMessages produces a given set of messages, and returns only when all
  19. // messages in the set have either succeeded or failed. Note that messages
  20. // can succeed and fail individually; if some succeed and some fail,
  21. // SendMessages will return an error.
  22. SendMessages(msgs []*ProducerMessage) error
  23. // Close shuts down the producer and waits for any buffered messages to be
  24. // flushed. You must call this function before a producer object passes out of
  25. // scope, as it may otherwise leak memory. You must call this before calling
  26. // Close on the underlying client.
  27. Close() error
  28. }
  29. type syncProducer struct {
  30. producer *asyncProducer
  31. wg sync.WaitGroup
  32. }
  33. // NewSyncProducer creates a new SyncProducer using the given broker addresses and configuration.
  34. func NewSyncProducer(addrs []string, config *Config) (SyncProducer, error) {
  35. if config == nil {
  36. config = NewConfig()
  37. config.Producer.Return.Successes = true
  38. }
  39. if err := verifyProducerConfig(config); err != nil {
  40. return nil, err
  41. }
  42. p, err := NewAsyncProducer(addrs, config)
  43. if err != nil {
  44. return nil, err
  45. }
  46. return newSyncProducerFromAsyncProducer(p.(*asyncProducer)), nil
  47. }
  48. // NewSyncProducerFromClient creates a new SyncProducer using the given client. It is still
  49. // necessary to call Close() on the underlying client when shutting down this producer.
  50. func NewSyncProducerFromClient(client Client) (SyncProducer, error) {
  51. if err := verifyProducerConfig(client.Config()); err != nil {
  52. return nil, err
  53. }
  54. p, err := NewAsyncProducerFromClient(client)
  55. if err != nil {
  56. return nil, err
  57. }
  58. return newSyncProducerFromAsyncProducer(p.(*asyncProducer)), nil
  59. }
  60. func newSyncProducerFromAsyncProducer(p *asyncProducer) *syncProducer {
  61. sp := &syncProducer{producer: p}
  62. sp.wg.Add(2)
  63. go withRecover(sp.handleSuccesses)
  64. go withRecover(sp.handleErrors)
  65. return sp
  66. }
  67. func verifyProducerConfig(config *Config) error {
  68. if !config.Producer.Return.Errors {
  69. return ConfigurationError("Producer.Return.Errors must be true to be used in a SyncProducer")
  70. }
  71. if !config.Producer.Return.Successes {
  72. return ConfigurationError("Producer.Return.Successes must be true to be used in a SyncProducer")
  73. }
  74. return nil
  75. }
  76. func (sp *syncProducer) SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error) {
  77. expectation := make(chan *ProducerError, 1)
  78. msg.expectation = expectation
  79. sp.producer.Input() <- msg
  80. if err := <-expectation; err != nil {
  81. return -1, -1, err.Err
  82. }
  83. return msg.Partition, msg.Offset, nil
  84. }
  85. func (sp *syncProducer) SendMessages(msgs []*ProducerMessage) error {
  86. expectations := make(chan chan *ProducerError, len(msgs))
  87. go func() {
  88. for _, msg := range msgs {
  89. expectation := make(chan *ProducerError, 1)
  90. msg.expectation = expectation
  91. sp.producer.Input() <- msg
  92. expectations <- expectation
  93. }
  94. close(expectations)
  95. }()
  96. var errors ProducerErrors
  97. for expectation := range expectations {
  98. if err := <-expectation; err != nil {
  99. errors = append(errors, err)
  100. }
  101. }
  102. if len(errors) > 0 {
  103. return errors
  104. }
  105. return nil
  106. }
  107. func (sp *syncProducer) handleSuccesses() {
  108. defer sp.wg.Done()
  109. for msg := range sp.producer.Successes() {
  110. expectation := msg.expectation
  111. expectation <- nil
  112. }
  113. }
  114. func (sp *syncProducer) handleErrors() {
  115. defer sp.wg.Done()
  116. for err := range sp.producer.Errors() {
  117. expectation := err.Msg.expectation
  118. expectation <- err
  119. }
  120. }
  121. func (sp *syncProducer) Close() error {
  122. sp.producer.AsyncClose()
  123. sp.wg.Wait()
  124. return nil
  125. }