sync_producer.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. package sarama
  2. import "sync"
  3. // SyncProducer publishes Kafka messages, blocking until they have been acknowledged. It routes messages to the correct
  4. // broker, refreshing metadata as appropriate, and parses responses for errors. You must call Close() on a producer
  5. // to avoid leaks, it may not be garbage-collected automatically when it passes out of scope.
  6. //
  7. // The SyncProducer comes with two caveats: it will generally be less efficient than the AsyncProducer, and the actual
  8. // durability guarantee provided when a message is acknowledged depend on the configured value of `Producer.RequiredAcks`.
  9. // There are configurations where a message acknowledged by the SyncProducer can still sometimes be lost.
  10. type SyncProducer interface {
  11. // SendMessage produces a given message, and returns only when it either has
  12. // succeeded or failed to produce. It will return the partition and the offset
  13. // of the produced message, or an error if the message failed to produce.
  14. SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error)
  15. // SendMessages produces a given set of messages, and returns only when all
  16. // messages in the set have either succeeded or failed. Note that messages
  17. // can succeed and fail individually; if some succeed and some fail,
  18. // SendMessages will return an error.
  19. SendMessages(msgs []*ProducerMessage) error
  20. // Close shuts down the producer and flushes any messages it may have buffered.
  21. // You must call this function before a producer object passes out of scope, as
  22. // it may otherwise leak memory. You must call this before calling Close on the
  23. // underlying client.
  24. Close() error
  25. }
  26. type syncProducer struct {
  27. producer *asyncProducer
  28. wg sync.WaitGroup
  29. }
  30. // NewSyncProducer creates a new SyncProducer using the given broker addresses and configuration.
  31. func NewSyncProducer(addrs []string, config *Config) (SyncProducer, error) {
  32. p, err := NewAsyncProducer(addrs, config)
  33. if err != nil {
  34. return nil, err
  35. }
  36. return newSyncProducerFromAsyncProducer(p.(*asyncProducer)), nil
  37. }
  38. // NewSyncProducerFromClient creates a new SyncProducer using the given client. It is still
  39. // necessary to call Close() on the underlying client when shutting down this producer.
  40. func NewSyncProducerFromClient(client Client) (SyncProducer, error) {
  41. p, err := NewAsyncProducerFromClient(client)
  42. if err != nil {
  43. return nil, err
  44. }
  45. return newSyncProducerFromAsyncProducer(p.(*asyncProducer)), nil
  46. }
  47. func newSyncProducerFromAsyncProducer(p *asyncProducer) *syncProducer {
  48. p.conf.Producer.Return.Successes = true
  49. p.conf.Producer.Return.Errors = true
  50. sp := &syncProducer{producer: p}
  51. sp.wg.Add(2)
  52. go withRecover(sp.handleSuccesses)
  53. go withRecover(sp.handleErrors)
  54. return sp
  55. }
  56. func (sp *syncProducer) SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error) {
  57. oldMetadata := msg.Metadata
  58. defer func() {
  59. msg.Metadata = oldMetadata
  60. }()
  61. expectation := make(chan *ProducerError, 1)
  62. msg.Metadata = expectation
  63. sp.producer.Input() <- msg
  64. if err := <-expectation; err != nil {
  65. return -1, -1, err.Err
  66. }
  67. return msg.Partition, msg.Offset, nil
  68. }
  69. func (sp *syncProducer) SendMessages(msgs []*ProducerMessage) error {
  70. savedMetadata := make([]interface{}, len(msgs))
  71. for i := range msgs {
  72. savedMetadata[i] = msgs[i].Metadata
  73. }
  74. defer func() {
  75. for i := range msgs {
  76. msgs[i].Metadata = savedMetadata[i]
  77. }
  78. }()
  79. expectations := make(chan chan *ProducerError, len(msgs))
  80. go func() {
  81. for _, msg := range msgs {
  82. expectation := make(chan *ProducerError, 1)
  83. msg.Metadata = expectation
  84. sp.producer.Input() <- msg
  85. expectations <- expectation
  86. }
  87. close(expectations)
  88. }()
  89. var errors ProducerErrors
  90. for expectation := range expectations {
  91. if err := <-expectation; err != nil {
  92. errors = append(errors, err)
  93. }
  94. }
  95. if len(errors) > 0 {
  96. return errors
  97. }
  98. return nil
  99. }
  100. func (sp *syncProducer) handleSuccesses() {
  101. defer sp.wg.Done()
  102. for msg := range sp.producer.Successes() {
  103. expectation := msg.Metadata.(chan *ProducerError)
  104. expectation <- nil
  105. }
  106. }
  107. func (sp *syncProducer) handleErrors() {
  108. defer sp.wg.Done()
  109. for err := range sp.producer.Errors() {
  110. expectation := err.Msg.Metadata.(chan *ProducerError)
  111. expectation <- err
  112. }
  113. }
  114. func (sp *syncProducer) Close() error {
  115. sp.producer.AsyncClose()
  116. sp.wg.Wait()
  117. return nil
  118. }