sync_producer.go 2.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. package sarama
  2. import "sync"
  3. // SyncProducer publishes Kafka messages. It routes messages to the correct broker, refreshing metadata as appropriate,
  4. // and parses responses for errors. You must call Close() on a producer to avoid leaks, it may not be garbage-collected automatically when
  5. // it passes out of scope.
  6. type SyncProducer interface {
  7. // SendMessage produces a given message, and returns only when it either has
  8. // succeeded or failed to produce. It will return the partition and the offset
  9. // of the produced message, or an error if the message failed to produce.
  10. SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error)
  11. // Close shuts down the producer and flushes any messages it may have buffered.
  12. // You must call this function before a producer object passes out of scope, as
  13. // it may otherwise leak memory. You must call this before calling Close on the
  14. // underlying client.
  15. Close() error
  16. }
  17. type syncProducer struct {
  18. producer *asyncProducer
  19. wg sync.WaitGroup
  20. }
  21. // NewSyncProducer creates a new SyncProducer using the given broker addresses and configuration.
  22. func NewSyncProducer(addrs []string, config *Config) (SyncProducer, error) {
  23. p, err := NewAsyncProducer(addrs, config)
  24. if err != nil {
  25. return nil, err
  26. }
  27. return newSyncProducerFromAsyncProducer(p.(*asyncProducer)), nil
  28. }
  29. // NewSyncProducerFromClient creates a new SyncProducer using the given client. It is still
  30. // necessary to call Close() on the underlying client when shutting down this producer.
  31. func NewSyncProducerFromClient(client Client) (SyncProducer, error) {
  32. p, err := NewAsyncProducerFromClient(client)
  33. if err != nil {
  34. return nil, err
  35. }
  36. return newSyncProducerFromAsyncProducer(p.(*asyncProducer)), nil
  37. }
  38. func newSyncProducerFromAsyncProducer(p *asyncProducer) *syncProducer {
  39. p.conf.Producer.Return.Successes = true
  40. p.conf.Producer.Return.Errors = true
  41. sp := &syncProducer{producer: p}
  42. sp.wg.Add(2)
  43. go withRecover(sp.handleSuccesses)
  44. go withRecover(sp.handleErrors)
  45. return sp
  46. }
  47. func (sp *syncProducer) SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error) {
  48. oldMetadata := msg.Metadata
  49. defer func() {
  50. msg.Metadata = oldMetadata
  51. }()
  52. expectation := make(chan error, 1)
  53. msg.Metadata = expectation
  54. sp.producer.Input() <- msg
  55. if err := <-expectation; err != nil {
  56. return -1, -1, err
  57. }
  58. return msg.Partition, msg.Offset, nil
  59. }
  60. func (sp *syncProducer) handleSuccesses() {
  61. defer sp.wg.Done()
  62. for msg := range sp.producer.Successes() {
  63. expectation := msg.Metadata.(chan error)
  64. expectation <- nil
  65. }
  66. }
  67. func (sp *syncProducer) handleErrors() {
  68. defer sp.wg.Done()
  69. for err := range sp.producer.Errors() {
  70. expectation := err.Msg.Metadata.(chan error)
  71. expectation <- err.Err
  72. }
  73. }
  74. func (sp *syncProducer) Close() error {
  75. sp.producer.AsyncClose()
  76. sp.wg.Wait()
  77. return nil
  78. }