simple_producer.go 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. package sarama
  2. // SimpleProducer publishes Kafka messages. It routes messages to the correct broker, refreshing metadata as appropriate,
  3. // and parses responses for errors. You must call Close() on a producer to avoid leaks, it may not be garbage-collected automatically when
  4. // it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).
  5. type SimpleProducer struct {
  6. producer *Producer
  7. topic string
  8. newExpectations chan *spExpect
  9. }
  10. type spExpect struct {
  11. msg *MessageToSend
  12. result chan error
  13. }
  14. // NewSimpleProducer creates a new SimpleProducer using the given client, topic and partitioner. If the
  15. // partitioner is nil, messages are partitioned by the hash of the key
  16. // (or randomly if there is no key).
  17. func NewSimpleProducer(client *Client, topic string, partitioner PartitionerConstructor) (*SimpleProducer, error) {
  18. if topic == "" {
  19. return nil, ConfigurationError("Empty topic")
  20. }
  21. config := NewProducerConfig()
  22. config.AckSuccesses = true
  23. if partitioner != nil {
  24. config.Partitioner = partitioner
  25. }
  26. prod, err := NewProducer(client, config)
  27. if err != nil {
  28. return nil, err
  29. }
  30. sp := &SimpleProducer{
  31. producer: prod,
  32. topic: topic,
  33. newExpectations: make(chan *spExpect), // this must be unbuffered
  34. }
  35. go withRecover(sp.matchResponses)
  36. return sp, nil
  37. }
  38. // SendMessage produces a message with the given key and value. To send strings as either key or value, see the StringEncoder type.
  39. func (sp *SimpleProducer) SendMessage(key, value Encoder) error {
  40. msg := &MessageToSend{Topic: sp.topic, Key: key, Value: value}
  41. expectation := &spExpect{msg: msg, result: make(chan error)}
  42. sp.newExpectations <- expectation
  43. sp.producer.Input() <- msg
  44. return <-expectation.result
  45. }
  46. func (sp *SimpleProducer) matchResponses() {
  47. newExpectations := sp.newExpectations
  48. unmatched := make(map[*MessageToSend]chan error)
  49. unmatched[nil] = nil // prevent it from emptying entirely
  50. for len(unmatched) > 0 {
  51. select {
  52. case expectation, ok := <-newExpectations:
  53. if !ok {
  54. delete(unmatched, nil) // let us exit when we've processed the last message
  55. newExpectations = nil // prevent spinning on a closed channel until that happens
  56. continue
  57. }
  58. unmatched[expectation.msg] = expectation.result
  59. case err := <-sp.producer.Errors():
  60. unmatched[err.Msg] <- err.Err
  61. delete(unmatched, err.Msg)
  62. case msg := <-sp.producer.Successes():
  63. close(unmatched[msg])
  64. delete(unmatched, msg)
  65. }
  66. }
  67. }
  68. // Close shuts down the producer and flushes any messages it may have buffered. You must call this function before
  69. // a producer object passes out of scope, as it may otherwise leak memory. You must call this before calling Close
  70. // on the underlying client.
  71. func (sp *SimpleProducer) Close() error {
  72. close(sp.newExpectations)
  73. return sp.producer.Close()
  74. }