1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495 |
- package sarama
- import "sync"
- type SyncProducer interface {
-
-
-
- SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error)
-
-
-
-
- Close() error
- }
- type syncProducer struct {
- producer *asyncProducer
- wg sync.WaitGroup
- }
- func NewSyncProducer(addrs []string, config *Config) (SyncProducer, error) {
- p, err := NewAsyncProducer(addrs, config)
- if err != nil {
- return nil, err
- }
- return newSyncProducerFromAsyncProducer(p.(*asyncProducer)), nil
- }
- func NewSyncProducerFromClient(client Client) (SyncProducer, error) {
- p, err := NewAsyncProducerFromClient(client)
- if err != nil {
- return nil, err
- }
- return newSyncProducerFromAsyncProducer(p.(*asyncProducer)), nil
- }
- func newSyncProducerFromAsyncProducer(p *asyncProducer) *syncProducer {
- p.conf.Producer.Return.Successes = true
- p.conf.Producer.Return.Errors = true
- sp := &syncProducer{producer: p}
- sp.wg.Add(2)
- go withRecover(sp.handleSuccesses)
- go withRecover(sp.handleErrors)
- return sp
- }
- func (sp *syncProducer) SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error) {
- oldMetadata := msg.Metadata
- defer func() {
- msg.Metadata = oldMetadata
- }()
- expectation := make(chan error, 1)
- msg.Metadata = expectation
- sp.producer.Input() <- msg
- if err := <-expectation; err != nil {
- return -1, -1, err
- }
- return msg.Partition, msg.Offset, nil
- }
- func (sp *syncProducer) handleSuccesses() {
- defer sp.wg.Done()
- for msg := range sp.producer.Successes() {
- expectation := msg.Metadata.(chan error)
- expectation <- nil
- }
- }
- func (sp *syncProducer) handleErrors() {
- defer sp.wg.Done()
- for err := range sp.producer.Errors() {
- expectation := err.Msg.Metadata.(chan error)
- expectation <- err.Err
- }
- }
- func (sp *syncProducer) Close() error {
- sp.producer.AsyncClose()
- sp.wg.Wait()
- return nil
- }
|