|
|
@@ -10,11 +10,9 @@ import (
|
|
|
// Before you can send messages to it's Input channel, you have to set expectations
|
|
|
// so it knows how to handle the input. This way you can easily test success and
|
|
|
// failure scenarios.
|
|
|
-//
|
|
|
-// NOTE: the SyncProducer type currently does not fall under the API stability
|
|
|
-// guarantee of Sarama as it is stiull considered experimental.
|
|
|
type Producer struct {
|
|
|
l sync.Mutex
|
|
|
+ t ErrorReporter
|
|
|
expectations []*producerExpectation
|
|
|
closed chan struct{}
|
|
|
input chan *sarama.ProducerMessage
|
|
|
@@ -26,11 +24,12 @@ type Producer struct {
|
|
|
// be the *testing.T instance of your test method. An error will be written to it if
|
|
|
// an expectation is violated. The config argument is used to determine whether it
|
|
|
// should ack successes on the Successes channel.
|
|
|
-func NewProducer(t ExpectationViolationReporter, config *sarama.Config) *Producer {
|
|
|
+func NewProducer(t ErrorReporter, config *sarama.Config) *Producer {
|
|
|
if config == nil {
|
|
|
config = sarama.NewConfig()
|
|
|
}
|
|
|
mp := &Producer{
|
|
|
+ t: t,
|
|
|
closed: make(chan struct{}, 0),
|
|
|
expectations: make([]*producerExpectation, 0),
|
|
|
input: make(chan *sarama.ProducerMessage, config.ChannelBufferSize),
|
|
|
@@ -48,7 +47,7 @@ func NewProducer(t ExpectationViolationReporter, config *sarama.Config) *Produce
|
|
|
mp.l.Lock()
|
|
|
if mp.expectations == nil || len(mp.expectations) == 0 {
|
|
|
mp.expectations = nil
|
|
|
- t.Errorf("No more expectation set on this mock producer to handle the input message.")
|
|
|
+ mp.t.Errorf("No more expectation set on this mock producer to handle the input message.")
|
|
|
} else {
|
|
|
expectation := mp.expectations[0]
|
|
|
mp.expectations = mp.expectations[1:]
|
|
|
@@ -65,7 +64,7 @@ func NewProducer(t ExpectationViolationReporter, config *sarama.Config) *Produce
|
|
|
|
|
|
mp.l.Lock()
|
|
|
if len(mp.expectations) > 0 {
|
|
|
- t.Errorf("Expected to exhaust all expectations, but %d are left.", len(mp.expectations))
|
|
|
+ mp.t.Errorf("Expected to exhaust all expectations, but %d are left.", len(mp.expectations))
|
|
|
}
|
|
|
mp.l.Unlock()
|
|
|
|