sync_producer.go 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. package sarama
  2. import "sync"
  3. // SyncProducer publishes Kafka messages. It routes messages to the correct broker, refreshing metadata as appropriate,
  4. // and parses responses for errors. You must call Close() on a producer to avoid leaks, it may not be garbage-collected automatically when
  5. // it passes out of scope.
  6. type SyncProducer interface {
  7. // SendMessage produces a given message, and returns only when it either has succeeded or failed to produce.
  8. // It will return the partition and the offset of the produced message, or an error if the message
  9. // failed to produce.
  10. SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error)
  11. // Close shuts down the producer and flushes any messages it may have buffered. You must call this function before
  12. // a producer object passes out of scope, as it may otherwise leak memory. You must call this before calling Close
  13. // on the underlying client.
  14. Close() error
  15. }
  16. type syncProducer struct {
  17. producer *asyncProducer
  18. wg sync.WaitGroup
  19. }
  20. // NewSyncProducer creates a new SyncProducer using the given broker addresses and configuration.
  21. func NewSyncProducer(addrs []string, config *Config) (SyncProducer, error) {
  22. p, err := NewAsyncProducer(addrs, config)
  23. if err != nil {
  24. return nil, err
  25. }
  26. return newSyncProducerFromAsyncProducer(p.(*asyncProducer)), nil
  27. }
  28. // NewSyncProducerFromClient creates a new SyncProducer using the given client. It is still
  29. // necessary to call Close() on the underlying client when shutting down this producer.
  30. func NewSyncProducerFromClient(client Client) (SyncProducer, error) {
  31. p, err := NewAsyncProducerFromClient(client)
  32. if err != nil {
  33. return nil, err
  34. }
  35. return newSyncProducerFromAsyncProducer(p.(*asyncProducer)), nil
  36. }
  37. func newSyncProducerFromAsyncProducer(p *asyncProducer) *syncProducer {
  38. p.conf.Producer.Return.Successes = true
  39. p.conf.Producer.Return.Errors = true
  40. sp := &syncProducer{producer: p}
  41. sp.wg.Add(2)
  42. go withRecover(sp.handleSuccesses)
  43. go withRecover(sp.handleErrors)
  44. return sp
  45. }
  46. func (sp *syncProducer) SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error) {
  47. oldMetadata := msg.Metadata
  48. defer func() {
  49. msg.Metadata = oldMetadata
  50. }()
  51. expectation := make(chan error, 1)
  52. msg.Metadata = expectation
  53. sp.producer.Input() <- msg
  54. if err := <-expectation; err != nil {
  55. return -1, -1, err
  56. } else {
  57. return msg.Partition, msg.Offset, nil
  58. }
  59. }
  60. func (sp *syncProducer) handleSuccesses() {
  61. defer sp.wg.Done()
  62. for msg := range sp.producer.Successes() {
  63. expectation := msg.Metadata.(chan error)
  64. expectation <- nil
  65. }
  66. }
  67. func (sp *syncProducer) handleErrors() {
  68. defer sp.wg.Done()
  69. for err := range sp.producer.Errors() {
  70. expectation := err.Msg.Metadata.(chan error)
  71. expectation <- err.Err
  72. }
  73. }
  74. func (sp *syncProducer) Close() error {
  75. sp.producer.AsyncClose()
  76. sp.wg.Wait()
  77. return nil
  78. }