|
|
@@ -4,9 +4,9 @@ package sarama
|
|
|
// and parses responses for errors. You must call Close() on a producer to avoid leaks, it may not be garbage-collected automatically when
|
|
|
// it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).
|
|
|
type SimpleProducer struct {
|
|
|
- producer *Producer
|
|
|
- topic string
|
|
|
- expect chan *spExpect
|
|
|
+ producer *Producer
|
|
|
+ topic string
|
|
|
+ newExpectations chan *spExpect
|
|
|
}
|
|
|
|
|
|
type spExpect struct {
|
|
|
@@ -35,9 +35,9 @@ func NewSimpleProducer(client *Client, topic string, partitioner PartitionerCons
|
|
|
}
|
|
|
|
|
|
sp := &SimpleProducer{
|
|
|
- producer: prod,
|
|
|
- topic: topic,
|
|
|
- expect: make(chan *spExpect), // this must be unbuffered
|
|
|
+ producer: prod,
|
|
|
+ topic: topic,
|
|
|
+ newExpectations: make(chan *spExpect), // this must be unbuffered
|
|
|
}
|
|
|
|
|
|
go withRecover(sp.matchResponses)
|
|
|
@@ -49,23 +49,23 @@ func NewSimpleProducer(client *Client, topic string, partitioner PartitionerCons
|
|
|
func (sp *SimpleProducer) SendMessage(key, value Encoder) error {
|
|
|
msg := &MessageToSend{Topic: sp.topic, Key: key, Value: value}
|
|
|
expectation := &spExpect{msg: msg, result: make(chan error)}
|
|
|
- sp.expect <- expectation
|
|
|
+ sp.newExpectations <- expectation
|
|
|
sp.producer.Input() <- msg
|
|
|
|
|
|
return <-expectation.result
|
|
|
}
|
|
|
|
|
|
func (sp *SimpleProducer) matchResponses() {
|
|
|
- expect := sp.expect
|
|
|
+ newExpectations := sp.newExpectations
|
|
|
unmatched := make(map[*MessageToSend]chan error)
|
|
|
unmatched[nil] = nil // prevent it from emptying entirely
|
|
|
|
|
|
for len(unmatched) > 0 {
|
|
|
select {
|
|
|
- case expectation, ok := <-expect:
|
|
|
+ case expectation, ok := <-newExpectations:
|
|
|
if !ok {
|
|
|
delete(unmatched, nil) // let us exit when we've processed the last message
|
|
|
- expect = nil // prevent spinning on a closed channel until that happens
|
|
|
+ newExpectations = nil // prevent spinning on a closed channel until that happens
|
|
|
continue
|
|
|
}
|
|
|
unmatched[expectation.msg] = expectation.result
|
|
|
@@ -83,6 +83,6 @@ func (sp *SimpleProducer) matchResponses() {
|
|
|
// a producer object passes out of scope, as it may otherwise leak memory. You must call this before calling Close
|
|
|
// on the underlying client.
|
|
|
func (sp *SimpleProducer) Close() error {
|
|
|
- close(sp.expect)
|
|
|
+ close(sp.newExpectations)
|
|
|
return sp.producer.Close()
|
|
|
}
|