|
|
@@ -2,6 +2,7 @@ package mocks
|
|
|
|
|
|
import (
|
|
|
"errors"
|
|
|
+ "sync"
|
|
|
|
|
|
"github.com/Shopify/sarama"
|
|
|
)
|
|
|
@@ -20,6 +21,7 @@ type producerExpectation struct {
|
|
|
}
|
|
|
|
|
|
type Producer struct {
|
|
|
+ l sync.Mutex
|
|
|
expectations []*producerExpectation
|
|
|
closed chan struct{}
|
|
|
input chan *sarama.ProducerMessage
|
|
|
@@ -46,23 +48,29 @@ func NewProducer(t TestReporter, config *sarama.Config) *Producer {
|
|
|
}()
|
|
|
|
|
|
for msg := range mp.input {
|
|
|
+ mp.l.Lock()
|
|
|
if mp.expectations == nil || len(mp.expectations) == 0 {
|
|
|
mp.expectations = nil
|
|
|
t.Errorf("No more expectation set on this mock producer to handle the input message.")
|
|
|
} else {
|
|
|
expectation := mp.expectations[0]
|
|
|
mp.expectations = mp.expectations[1:]
|
|
|
- if expectation.Result == errProduceSuccess && config.Producer.AckSuccesses {
|
|
|
- mp.successes <- msg
|
|
|
+ if expectation.Result == errProduceSuccess {
|
|
|
+ if config.Producer.AckSuccesses {
|
|
|
+ mp.successes <- msg
|
|
|
+ }
|
|
|
} else {
|
|
|
mp.errors <- &sarama.ProducerError{Err: expectation.Result, Msg: msg}
|
|
|
}
|
|
|
}
|
|
|
+ mp.l.Unlock()
|
|
|
}
|
|
|
|
|
|
+ mp.l.Lock()
|
|
|
if len(mp.expectations) > 0 {
|
|
|
t.Errorf("Expected to exhaust all expectations, but %d are left.", len(mp.expectations))
|
|
|
}
|
|
|
+ mp.l.Unlock()
|
|
|
|
|
|
close(mp.closed)
|
|
|
}()
|
|
|
@@ -97,9 +105,13 @@ func (mp *Producer) Errors() <-chan *sarama.ProducerError {
|
|
|
// Setting expectations
|
|
|
|
|
|
func (mp *Producer) ExpectInputAndSucceed() {
|
|
|
+ mp.l.Lock()
|
|
|
+ defer mp.l.Unlock()
|
|
|
mp.expectations = append(mp.expectations, &producerExpectation{Result: errProduceSuccess})
|
|
|
}
|
|
|
|
|
|
func (mp *Producer) ExpectInputAndFail(err error) {
|
|
|
+ mp.l.Lock()
|
|
|
+ defer mp.l.Unlock()
|
|
|
mp.expectations = append(mp.expectations, &producerExpectation{Result: err})
|
|
|
}
|