simple_producer.go 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. package sarama
  2. // SimpleProducer publishes Kafka messages. It routes messages to the correct broker, refreshing metadata as appropriate,
  3. // and parses responses for errors. You must call Close() on a producer to avoid leaks, it may not be garbage-collected automatically when
  4. // it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).
  5. type SimpleProducer struct {
  6. producer *Producer
  7. newExpectations chan *spExpect
  8. }
  9. type spExpect struct {
  10. msg *MessageToSend
  11. result chan error
  12. }
  13. // NewSimpleProducer creates a new SimpleProducer using the given client and configuration.
  14. func NewSimpleProducer(client *Client, config *ProducerConfig) (*SimpleProducer, error) {
  15. if config == nil {
  16. config = NewProducerConfig()
  17. }
  18. config.AckSuccesses = true
  19. prod, err := NewProducer(client, config)
  20. if err != nil {
  21. return nil, err
  22. }
  23. sp := &SimpleProducer{
  24. producer: prod,
  25. newExpectations: make(chan *spExpect), // this must be unbuffered
  26. }
  27. go withRecover(sp.matchResponses)
  28. return sp, nil
  29. }
  30. // SendMessage produces a message to the given topic with the given key and value. To send strings as either key or value, see the StringEncoder type.
  31. func (sp *SimpleProducer) SendMessage(topic string, key, value Encoder) error {
  32. msg := &MessageToSend{Topic: topic, Key: key, Value: value}
  33. expectation := &spExpect{msg: msg, result: make(chan error)}
  34. sp.newExpectations <- expectation
  35. sp.producer.Input() <- msg
  36. return <-expectation.result
  37. }
  38. func (sp *SimpleProducer) matchResponses() {
  39. newExpectations := sp.newExpectations
  40. unmatched := make(map[*MessageToSend]chan error)
  41. unmatched[nil] = nil // prevent it from emptying entirely
  42. for len(unmatched) > 0 {
  43. select {
  44. case expectation, ok := <-newExpectations:
  45. if !ok {
  46. delete(unmatched, nil) // let us exit when we've processed the last message
  47. newExpectations = nil // prevent spinning on a closed channel until that happens
  48. continue
  49. }
  50. unmatched[expectation.msg] = expectation.result
  51. case err := <-sp.producer.Errors():
  52. unmatched[err.Msg] <- err.Err
  53. delete(unmatched, err.Msg)
  54. case msg := <-sp.producer.Successes():
  55. close(unmatched[msg])
  56. delete(unmatched, msg)
  57. }
  58. }
  59. }
  60. // Close shuts down the producer and flushes any messages it may have buffered. You must call this function before
  61. // a producer object passes out of scope, as it may otherwise leak memory. You must call this before calling Close
  62. // on the underlying client.
  63. func (sp *SimpleProducer) Close() error {
  64. close(sp.newExpectations)
  65. return sp.producer.Close()
  66. }