|
@@ -16,6 +16,12 @@ type SyncProducer interface {
|
|
|
// of the produced message, or an error if the message failed to produce.
|
|
// of the produced message, or an error if the message failed to produce.
|
|
|
SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error)
|
|
SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error)
|
|
|
|
|
|
|
|
|
|
+ // SendMessages produces a given set of messages, and returns only when all
|
|
|
|
|
+ // messages in the set have either succeeded or failed. Note that messages
|
|
|
|
|
+ // can succeed and fail individually; if some succeed and some fail,
|
|
|
|
|
+ // SendMessages will return an error.
|
|
|
|
|
+ SendMessages(msgs []*ProducerMessage) error
|
|
|
|
|
+
|
|
|
// Close shuts down the producer and flushes any messages it may have buffered.
|
|
// Close shuts down the producer and flushes any messages it may have buffered.
|
|
|
// You must call this function before a producer object passes out of scope, as
|
|
// You must call this function before a producer object passes out of scope, as
|
|
|
// it may otherwise leak memory. You must call this before calling Close on the
|
|
// it may otherwise leak memory. You must call this before calling Close on the
|
|
@@ -65,21 +71,56 @@ func (sp *syncProducer) SendMessage(msg *ProducerMessage) (partition int32, offs
|
|
|
msg.Metadata = oldMetadata
|
|
msg.Metadata = oldMetadata
|
|
|
}()
|
|
}()
|
|
|
|
|
|
|
|
- expectation := make(chan error, 1)
|
|
|
|
|
|
|
+ expectation := make(chan *ProducerError, 1)
|
|
|
msg.Metadata = expectation
|
|
msg.Metadata = expectation
|
|
|
sp.producer.Input() <- msg
|
|
sp.producer.Input() <- msg
|
|
|
|
|
|
|
|
if err := <-expectation; err != nil {
|
|
if err := <-expectation; err != nil {
|
|
|
- return -1, -1, err
|
|
|
|
|
|
|
+ return -1, -1, err.Err
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
return msg.Partition, msg.Offset, nil
|
|
return msg.Partition, msg.Offset, nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+func (sp *syncProducer) SendMessages(msgs []*ProducerMessage) error {
|
|
|
|
|
+ savedMetadata := make([]interface{}, len(msgs))
|
|
|
|
|
+ for i := range msgs {
|
|
|
|
|
+ savedMetadata[i] = msgs[i].Metadata
|
|
|
|
|
+ }
|
|
|
|
|
+ defer func() {
|
|
|
|
|
+ for i := range msgs {
|
|
|
|
|
+ msgs[i].Metadata = savedMetadata[i]
|
|
|
|
|
+ }
|
|
|
|
|
+ }()
|
|
|
|
|
+
|
|
|
|
|
+ expectations := make(chan chan *ProducerError, len(msgs))
|
|
|
|
|
+ go func() {
|
|
|
|
|
+ for _, msg := range msgs {
|
|
|
|
|
+ expectation := make(chan *ProducerError, 1)
|
|
|
|
|
+ msg.Metadata = expectation
|
|
|
|
|
+ sp.producer.Input() <- msg
|
|
|
|
|
+ expectations <- expectation
|
|
|
|
|
+ }
|
|
|
|
|
+ close(expectations)
|
|
|
|
|
+ }()
|
|
|
|
|
+
|
|
|
|
|
+ var errors ProducerErrors
|
|
|
|
|
+ for expectation := range expectations {
|
|
|
|
|
+ if err := <-expectation; err != nil {
|
|
|
|
|
+ errors = append(errors, err)
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if len(errors) > 0 {
|
|
|
|
|
+ return errors
|
|
|
|
|
+ }
|
|
|
|
|
+ return nil
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
func (sp *syncProducer) handleSuccesses() {
|
|
func (sp *syncProducer) handleSuccesses() {
|
|
|
defer sp.wg.Done()
|
|
defer sp.wg.Done()
|
|
|
for msg := range sp.producer.Successes() {
|
|
for msg := range sp.producer.Successes() {
|
|
|
- expectation := msg.Metadata.(chan error)
|
|
|
|
|
|
|
+ expectation := msg.Metadata.(chan *ProducerError)
|
|
|
expectation <- nil
|
|
expectation <- nil
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
@@ -87,8 +128,8 @@ func (sp *syncProducer) handleSuccesses() {
|
|
|
func (sp *syncProducer) handleErrors() {
|
|
func (sp *syncProducer) handleErrors() {
|
|
|
defer sp.wg.Done()
|
|
defer sp.wg.Done()
|
|
|
for err := range sp.producer.Errors() {
|
|
for err := range sp.producer.Errors() {
|
|
|
- expectation := err.Msg.Metadata.(chan error)
|
|
|
|
|
- expectation <- err.Err
|
|
|
|
|
|
|
+ expectation := err.Msg.Metadata.(chan *ProducerError)
|
|
|
|
|
+ expectation <- err
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|