|
|
@@ -34,7 +34,9 @@ func NewSyncProducer(t ErrorReporter, config *sarama.Config) *SyncProducer {
|
|
|
|
|
|
// SendMessage corresponds with the SendMessage method of sarama's SyncProducer implementation.
|
|
|
// You have to set expectations on the mock producer before calling SendMessage, so it knows
|
|
|
-// how to handle them. If there is no more remaining expectations when SendMessage is called,
|
|
|
+// how to handle them. You can set a function in each expectation so that the message value
|
|
|
+// checked by this function and an error is returned if the match fails.
|
|
|
+// If there is no more remaining expectation when SendMessage is called,
|
|
|
// the mock producer will write an error to the test state object.
|
|
|
func (sp *SyncProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) {
|
|
|
sp.l.Lock()
|
|
|
@@ -43,7 +45,18 @@ func (sp *SyncProducer) SendMessage(msg *sarama.ProducerMessage) (partition int3
|
|
|
if len(sp.expectations) > 0 {
|
|
|
expectation := sp.expectations[0]
|
|
|
sp.expectations = sp.expectations[1:]
|
|
|
-
|
|
|
+ if expectation.CheckFunction != nil {
|
|
|
+ if val, err := msg.Value.Encode(); err != nil {
|
|
|
+ sp.t.Errorf("Input message encoding failed: %s", err.Error())
|
|
|
+ return -1, -1, err
|
|
|
+ } else {
|
|
|
+ err := expectation.CheckFunction(val)
|
|
|
+ if err != nil {
|
|
|
+ sp.t.Errorf("Check function returned an error: %s", err.Error())
|
|
|
+ return -1, -1, err
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
if expectation.Result == errProduceSuccess {
|
|
|
sp.lastOffset++
|
|
|
msg.Offset = sp.lastOffset
|
|
|
@@ -100,20 +113,36 @@ func (sp *SyncProducer) Close() error {
|
|
|
// Setting expectations
|
|
|
////////////////////////////////////////////////
|
|
|
|
|
|
+// ExpectSendMessageWithCheckerFunctionAndSucceed sets an expectation on the mock producer that SendMessage
|
|
|
+// will be called. The mock producer will first call the given function to check the message value.
|
|
|
+// It will cascade the error of the function, if any, or handle the message as if it produced
|
|
|
+// successfully, i.e. by returning a valid partition, and offset, and a nil error.
|
|
|
+func (sp *SyncProducer) ExpectSendMessageWithCheckerFunctionAndSucceed(cf ValueChecker) {
|
|
|
+ sp.l.Lock()
|
|
|
+ defer sp.l.Unlock()
|
|
|
+ sp.expectations = append(sp.expectations, &producerExpectation{Result: errProduceSuccess, CheckFunction: cf})
|
|
|
+}
|
|
|
+
|
|
|
+// ExpectSendMessageAndFail sets an expectation on the mock producer that SendMessage will be
|
|
|
+// called. The mock producer will first call the given function to check the message value.
|
|
|
+// It will cascade the error of the function, if any, or handle the message as if it failed
|
|
|
+// to produce successfully, i.e. by returning the provided error.
|
|
|
+func (sp *SyncProducer) ExpectSendMessageWithCheckerFunctionAndFail(cf ValueChecker, err error) {
|
|
|
+ sp.l.Lock()
|
|
|
+ defer sp.l.Unlock()
|
|
|
+ sp.expectations = append(sp.expectations, &producerExpectation{Result: err, CheckFunction: cf})
|
|
|
+}
|
|
|
+
|
|
|
// ExpectSendMessageAndSucceed sets an expectation on the mock producer that SendMessage will be
|
|
|
// called. The mock producer will handle the message as if it produced successfully, i.e. by
|
|
|
// returning a valid partition, and offset, and a nil error.
|
|
|
func (sp *SyncProducer) ExpectSendMessageAndSucceed() {
|
|
|
- sp.l.Lock()
|
|
|
- defer sp.l.Unlock()
|
|
|
- sp.expectations = append(sp.expectations, &producerExpectation{Result: errProduceSuccess})
|
|
|
+ sp.ExpectSendMessageWithCheckerFunctionAndSucceed(nil)
|
|
|
}
|
|
|
|
|
|
// ExpectSendMessageAndFail sets an expectation on the mock producer that SendMessage will be
|
|
|
// called. The mock producer will handle the message as if it failed to produce
|
|
|
// successfully, i.e. by returning the provided error.
|
|
|
func (sp *SyncProducer) ExpectSendMessageAndFail(err error) {
|
|
|
- sp.l.Lock()
|
|
|
- defer sp.l.Unlock()
|
|
|
- sp.expectations = append(sp.expectations, &producerExpectation{Result: err})
|
|
|
+ sp.ExpectSendMessageWithCheckerFunctionAndFail(nil, err)
|
|
|
}
|