123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566 |
- package sarama
- import "sync"
- type SimpleProducer struct {
- producer *Producer
- wg sync.WaitGroup
- }
- func NewSimpleProducer(client *Client, config *ProducerConfig) (*SimpleProducer, error) {
- if config == nil {
- config = NewProducerConfig()
- }
- config.AckSuccesses = true
- prod, err := NewProducer(client, config)
- if err != nil {
- return nil, err
- }
- sp := &SimpleProducer{producer: prod}
- sp.wg.Add(2)
- go withRecover(sp.handleSuccesses)
- go withRecover(sp.handleErrors)
- return sp, nil
- }
- func (sp *SimpleProducer) SendMessage(topic string, key, value Encoder) error {
- expectation := make(chan error, 1)
- msg := &MessageToSend{Topic: topic, Key: key, Value: value, Metadata: expectation}
- sp.producer.Input() <- msg
- return <-expectation
- }
- func (sp *SimpleProducer) handleSuccesses() {
- defer sp.wg.Done()
- for msg := range sp.producer.Successes() {
- expectation := msg.Metadata.(chan error)
- expectation <- nil
- }
- }
- func (sp *SimpleProducer) handleErrors() {
- defer sp.wg.Done()
- for err := range sp.producer.Errors() {
- expectation := err.Msg.Metadata.(chan error)
- expectation <- err.Err
- }
- }
- func (sp *SimpleProducer) Close() error {
- sp.producer.AsyncClose()
- sp.wg.Wait()
- return nil
- }
|